SearchResult now includes a Status section
the Status section can report on the number of total/fail/success indexes when querying across multiple indexes through IndexAlias Further, searching an IndexAlias will now return partial results, the burden is on the caller to check the number of failed indexes and decide how to handle this situation.
This commit is contained in:
parent
74a52f94bb
commit
214b67ad66
|
@ -456,23 +456,31 @@ func createChildSearchRequest(req *SearchRequest) *SearchRequest {
|
|||
return &rv
|
||||
}
|
||||
|
||||
type errWrap struct {
|
||||
Name string
|
||||
Err error
|
||||
}
|
||||
|
||||
// MultiSearch executes a SearchRequest across multiple
|
||||
// Index objects, then merges the results.
|
||||
func MultiSearch(req *SearchRequest, indexes ...Index) (*SearchResult, error) {
|
||||
searchStart := time.Now()
|
||||
results := make(chan *SearchResult)
|
||||
errs := make(chan error)
|
||||
errs := make(chan *errWrap)
|
||||
|
||||
// run search on each index in separate go routine
|
||||
var waitGroup sync.WaitGroup
|
||||
|
||||
var searchChildIndex = func(waitGroup *sync.WaitGroup, in Index, results chan *SearchResult, errs chan error) {
|
||||
var searchChildIndex = func(waitGroup *sync.WaitGroup, in Index, results chan *SearchResult, errs chan *errWrap) {
|
||||
go func() {
|
||||
defer waitGroup.Done()
|
||||
childReq := createChildSearchRequest(req)
|
||||
searchResult, err := in.Search(childReq)
|
||||
if err != nil {
|
||||
errs <- err
|
||||
errs <- &errWrap{
|
||||
Name: in.Name(),
|
||||
Err: err,
|
||||
}
|
||||
} else {
|
||||
results <- searchResult
|
||||
}
|
||||
|
@ -492,8 +500,9 @@ func MultiSearch(req *SearchRequest, indexes ...Index) (*SearchResult, error) {
|
|||
}()
|
||||
|
||||
var sr *SearchResult
|
||||
var err error
|
||||
var ew *errWrap
|
||||
var result *SearchResult
|
||||
indexErrors := make(map[string]error)
|
||||
ok := true
|
||||
for ok {
|
||||
select {
|
||||
|
@ -507,11 +516,9 @@ func MultiSearch(req *SearchRequest, indexes ...Index) (*SearchResult, error) {
|
|||
sr.Merge(result)
|
||||
}
|
||||
}
|
||||
case err, ok = <-errs:
|
||||
// for now stop on any error
|
||||
// FIXME offer other behaviors
|
||||
if err != nil {
|
||||
return nil, err
|
||||
case ew, ok = <-errs:
|
||||
if ok {
|
||||
indexErrors[ew.Name] = ew.Err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -519,6 +526,15 @@ func MultiSearch(req *SearchRequest, indexes ...Index) (*SearchResult, error) {
|
|||
// merge just concatenated all the hits
|
||||
// now lets clean it up
|
||||
|
||||
// handle case where no results were successful
|
||||
if sr == nil {
|
||||
sr = &SearchResult{
|
||||
Status: &SearchStatus{
|
||||
Errors: make(map[string]error),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// first sort it by score
|
||||
sort.Sort(sr.Hits)
|
||||
|
||||
|
@ -544,6 +560,13 @@ func MultiSearch(req *SearchRequest, indexes ...Index) (*SearchResult, error) {
|
|||
searchDuration := time.Since(searchStart)
|
||||
sr.Took = searchDuration
|
||||
|
||||
// fix up errors
|
||||
for indexName, indexErr := range indexErrors {
|
||||
sr.Status.Errors[indexName] = indexErr
|
||||
sr.Status.Total++
|
||||
sr.Status.Failed++
|
||||
}
|
||||
|
||||
return sr, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -442,6 +442,11 @@ func TestIndexAliasMulti(t *testing.T) {
|
|||
err: nil,
|
||||
docCountResult: &ei1Count,
|
||||
searchResult: &SearchResult{
|
||||
Status: &SearchStatus{
|
||||
Total: 1,
|
||||
Successful: 1,
|
||||
Errors: make(map[string]error),
|
||||
},
|
||||
Total: 1,
|
||||
Hits: search.DocumentMatchCollection{
|
||||
&search.DocumentMatch{
|
||||
|
@ -456,6 +461,11 @@ func TestIndexAliasMulti(t *testing.T) {
|
|||
err: nil,
|
||||
docCountResult: &ei2Count,
|
||||
searchResult: &SearchResult{
|
||||
Status: &SearchStatus{
|
||||
Total: 1,
|
||||
Successful: 1,
|
||||
Errors: make(map[string]error),
|
||||
},
|
||||
Total: 1,
|
||||
Hits: search.DocumentMatchCollection{
|
||||
&search.DocumentMatch{
|
||||
|
@ -537,6 +547,11 @@ func TestIndexAliasMulti(t *testing.T) {
|
|||
// now a few things that should work
|
||||
sr := NewSearchRequest(NewTermQuery("test"))
|
||||
expected := &SearchResult{
|
||||
Status: &SearchStatus{
|
||||
Total: 2,
|
||||
Successful: 2,
|
||||
Errors: make(map[string]error),
|
||||
},
|
||||
Request: sr,
|
||||
Total: 2,
|
||||
Hits: search.DocumentMatchCollection{
|
||||
|
@ -570,6 +585,11 @@ func TestIndexAliasMulti(t *testing.T) {
|
|||
// TestMultiSearchNoError
|
||||
func TestMultiSearchNoError(t *testing.T) {
|
||||
ei1 := &stubIndex{err: nil, searchResult: &SearchResult{
|
||||
Status: &SearchStatus{
|
||||
Total: 1,
|
||||
Successful: 1,
|
||||
Errors: make(map[string]error),
|
||||
},
|
||||
Total: 1,
|
||||
Hits: search.DocumentMatchCollection{
|
||||
&search.DocumentMatch{
|
||||
|
@ -581,6 +601,11 @@ func TestMultiSearchNoError(t *testing.T) {
|
|||
MaxScore: 1.0,
|
||||
}}
|
||||
ei2 := &stubIndex{err: nil, searchResult: &SearchResult{
|
||||
Status: &SearchStatus{
|
||||
Total: 1,
|
||||
Successful: 1,
|
||||
Errors: make(map[string]error),
|
||||
},
|
||||
Total: 1,
|
||||
Hits: search.DocumentMatchCollection{
|
||||
&search.DocumentMatch{
|
||||
|
@ -594,6 +619,11 @@ func TestMultiSearchNoError(t *testing.T) {
|
|||
|
||||
sr := NewSearchRequest(NewTermQuery("test"))
|
||||
expected := &SearchResult{
|
||||
Status: &SearchStatus{
|
||||
Total: 2,
|
||||
Successful: 2,
|
||||
Errors: make(map[string]error),
|
||||
},
|
||||
Request: sr,
|
||||
Total: 2,
|
||||
Hits: search.DocumentMatchCollection{
|
||||
|
@ -624,7 +654,12 @@ func TestMultiSearchNoError(t *testing.T) {
|
|||
|
||||
// TestMultiSearchSomeError
|
||||
func TestMultiSearchSomeError(t *testing.T) {
|
||||
ei1 := &stubIndex{err: nil, searchResult: &SearchResult{
|
||||
ei1 := &stubIndex{name: "ei1", err: nil, searchResult: &SearchResult{
|
||||
Status: &SearchStatus{
|
||||
Total: 1,
|
||||
Successful: 1,
|
||||
Errors: make(map[string]error),
|
||||
},
|
||||
Total: 1,
|
||||
Hits: search.DocumentMatchCollection{
|
||||
&search.DocumentMatch{
|
||||
|
@ -635,23 +670,56 @@ func TestMultiSearchSomeError(t *testing.T) {
|
|||
Took: 1 * time.Second,
|
||||
MaxScore: 1.0,
|
||||
}}
|
||||
ei2 := &stubIndex{err: fmt.Errorf("deliberate error")}
|
||||
ei2 := &stubIndex{name: "ei2", err: fmt.Errorf("deliberate error")}
|
||||
sr := NewSearchRequest(NewTermQuery("test"))
|
||||
_, err := MultiSearch(sr, ei1, ei2)
|
||||
if err == nil {
|
||||
t.Errorf("expected error, got %v", err)
|
||||
res, err := MultiSearch(sr, ei1, ei2)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error, got %v", err)
|
||||
}
|
||||
if res.Status.Total != 2 {
|
||||
t.Errorf("expected 2 indexes to be queried, got %d", res.Status.Total)
|
||||
}
|
||||
if res.Status.Failed != 1 {
|
||||
t.Errorf("expected 1 index to fail, got %d", res.Status.Failed)
|
||||
}
|
||||
if res.Status.Successful != 1 {
|
||||
t.Errorf("expected 1 index to be successful, got %d", res.Status.Successful)
|
||||
}
|
||||
if len(res.Status.Errors) != 1 {
|
||||
t.Fatalf("expected 1 status error message, got %d", len(res.Status.Errors))
|
||||
}
|
||||
if res.Status.Errors["ei2"].Error() != "deliberate error" {
|
||||
t.Errorf("expected ei2 index error message 'deliberate error', got '%s'", res.Status.Errors["ei2"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestMultiSearchAllError
|
||||
// reproduces https://github.com/blevesearch/bleve/issues/126
|
||||
func TestMultiSearchAllError(t *testing.T) {
|
||||
ei1 := &stubIndex{err: fmt.Errorf("deliberate error")}
|
||||
ei2 := &stubIndex{err: fmt.Errorf("deliberate error")}
|
||||
ei1 := &stubIndex{name: "ei1", err: fmt.Errorf("deliberate error")}
|
||||
ei2 := &stubIndex{name: "ei2", err: fmt.Errorf("deliberate error")}
|
||||
sr := NewSearchRequest(NewTermQuery("test"))
|
||||
_, err := MultiSearch(sr, ei1, ei2)
|
||||
if err == nil {
|
||||
t.Errorf("expected error, got %v", err)
|
||||
res, err := MultiSearch(sr, ei1, ei2)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error, got %v", err)
|
||||
}
|
||||
if res.Status.Total != 2 {
|
||||
t.Errorf("expected 2 indexes to be queried, got %d", res.Status.Total)
|
||||
}
|
||||
if res.Status.Failed != 2 {
|
||||
t.Errorf("expected 2 indexes to fail, got %d", res.Status.Failed)
|
||||
}
|
||||
if res.Status.Successful != 0 {
|
||||
t.Errorf("expected 0 indexes to be successful, got %d", res.Status.Successful)
|
||||
}
|
||||
if len(res.Status.Errors) != 2 {
|
||||
t.Fatalf("expected 2 status error messages, got %d", len(res.Status.Errors))
|
||||
}
|
||||
if res.Status.Errors["ei1"].Error() != "deliberate error" {
|
||||
t.Errorf("expected ei1 index error message 'deliberate error', got '%s'", res.Status.Errors["ei1"])
|
||||
}
|
||||
if res.Status.Errors["ei2"].Error() != "deliberate error" {
|
||||
t.Errorf("expected ei2 index error message 'deliberate error', got '%s'", res.Status.Errors["ei2"])
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -667,11 +735,23 @@ func TestMultiSearchSecondPage(t *testing.T) {
|
|||
}
|
||||
|
||||
ei1 := &stubIndex{
|
||||
searchResult: &SearchResult{},
|
||||
searchResult: &SearchResult{
|
||||
Status: &SearchStatus{
|
||||
Total: 1,
|
||||
Successful: 1,
|
||||
Errors: make(map[string]error),
|
||||
},
|
||||
},
|
||||
checkRequest: checkRequest,
|
||||
}
|
||||
ei2 := &stubIndex{
|
||||
searchResult: &SearchResult{},
|
||||
searchResult: &SearchResult{
|
||||
Status: &SearchStatus{
|
||||
Total: 1,
|
||||
Successful: 1,
|
||||
Errors: make(map[string]error),
|
||||
},
|
||||
},
|
||||
checkRequest: checkRequest,
|
||||
}
|
||||
sr := NewSearchRequestOptions(NewTermQuery("test"), 10, 10, false)
|
||||
|
|
|
@ -528,6 +528,12 @@ func (i *indexImpl) Search(req *SearchRequest) (sr *SearchResult, err error) {
|
|||
}
|
||||
|
||||
return &SearchResult{
|
||||
Status: &SearchStatus{
|
||||
Total: 1,
|
||||
Failed: 0,
|
||||
Successful: 1,
|
||||
Errors: make(map[string]error),
|
||||
},
|
||||
Request: req,
|
||||
Hits: hits,
|
||||
Total: collector.Total(),
|
||||
|
|
35
search.go
35
search.go
|
@ -236,9 +236,42 @@ func NewSearchRequestOptions(q Query, size, from int, explain bool) *SearchReque
|
|||
}
|
||||
}
|
||||
|
||||
// IndexErrMap tracks errors with the name of the index where it occurred
|
||||
type IndexErrMap map[string]error
|
||||
|
||||
// MarshalJSON seralizes the error into a string for JSON consumption
|
||||
func (iem IndexErrMap) MarshalJSON() ([]byte, error) {
|
||||
tmp := make(map[string]string, len(iem))
|
||||
for k, v := range iem {
|
||||
tmp[k] = v.Error()
|
||||
}
|
||||
return json.Marshal(tmp)
|
||||
}
|
||||
|
||||
// SearchStatus is a secion in the SearchResult reporting how many
|
||||
// underlying indexes were queried, how many were successful/failed
|
||||
// and a map of any errors that were encountered
|
||||
type SearchStatus struct {
|
||||
Total int `json:"total"`
|
||||
Failed int `json:"failed"`
|
||||
Successful int `json:"successful"`
|
||||
Errors IndexErrMap `json:"errors,omitempty"`
|
||||
}
|
||||
|
||||
// Merge will merge together multiple SearchStatuses during a MultiSearch
|
||||
func (ss *SearchStatus) Merge(other *SearchStatus) {
|
||||
ss.Total += other.Total
|
||||
ss.Failed += other.Failed
|
||||
ss.Successful += other.Successful
|
||||
for otherIndex, otherError := range other.Errors {
|
||||
ss.Errors[otherIndex] = otherError
|
||||
}
|
||||
}
|
||||
|
||||
// A SearchResult describes the results of executing
|
||||
// a SearchRequest.
|
||||
type SearchResult struct {
|
||||
Status *SearchStatus `json:"status"`
|
||||
Request *SearchRequest `json:"request"`
|
||||
Hits search.DocumentMatchCollection `json:"hits"`
|
||||
Total uint64 `json:"total_hits"`
|
||||
|
@ -288,7 +321,9 @@ func (sr *SearchResult) String() string {
|
|||
return rv
|
||||
}
|
||||
|
||||
// Merge will merge together multiple SearchResults during a MultiSearch
|
||||
func (sr *SearchResult) Merge(other *SearchResult) {
|
||||
sr.Status.Merge(other.Status)
|
||||
sr.Hits = append(sr.Hits, other.Hits...)
|
||||
sr.Total += other.Total
|
||||
if other.MaxScore > sr.MaxScore {
|
||||
|
|
|
@ -73,6 +73,11 @@ func TestSearchResultString(t *testing.T) {
|
|||
|
||||
func TestSearchResultMerge(t *testing.T) {
|
||||
l := &SearchResult{
|
||||
Status: &SearchStatus{
|
||||
Total: 1,
|
||||
Successful: 1,
|
||||
Errors: make(map[string]error),
|
||||
},
|
||||
Total: 1,
|
||||
MaxScore: 1,
|
||||
Hits: search.DocumentMatchCollection{
|
||||
|
@ -84,6 +89,11 @@ func TestSearchResultMerge(t *testing.T) {
|
|||
}
|
||||
|
||||
r := &SearchResult{
|
||||
Status: &SearchStatus{
|
||||
Total: 1,
|
||||
Successful: 1,
|
||||
Errors: make(map[string]error),
|
||||
},
|
||||
Total: 1,
|
||||
MaxScore: 2,
|
||||
Hits: search.DocumentMatchCollection{
|
||||
|
@ -95,6 +105,11 @@ func TestSearchResultMerge(t *testing.T) {
|
|||
}
|
||||
|
||||
expected := &SearchResult{
|
||||
Status: &SearchStatus{
|
||||
Total: 2,
|
||||
Successful: 2,
|
||||
Errors: make(map[string]error),
|
||||
},
|
||||
Total: 2,
|
||||
MaxScore: 2,
|
||||
Hits: search.DocumentMatchCollection{
|
||||
|
|
Loading…
Reference in New Issue