3fd2a64872
The DumpXXX() methods were always documented as internal and unsupported. However, now they are being removed from the public top-level API. They are still available on the internal IndexReader, which can be accessed using the Advanced() method. The DocCount() and DumpXXX() methods on the internal index have moved to the internal index reader, since they logically operate on a snapshot of an index.
627 lines
12 KiB
Go
627 lines
12 KiB
Go
// Copyright (c) 2014 Couchbase, Inc.
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
|
// except in compliance with the License. You may obtain a copy of the License at
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the
|
|
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
|
// either express or implied. See the License for the specific language governing permissions
|
|
// and limitations under the License.
|
|
|
|
package bleve
|
|
|
|
import (
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/net/context"
|
|
|
|
"github.com/blevesearch/bleve/document"
|
|
"github.com/blevesearch/bleve/index"
|
|
"github.com/blevesearch/bleve/index/store"
|
|
"github.com/blevesearch/bleve/search"
|
|
)
|
|
|
|
type indexAliasImpl struct {
|
|
name string
|
|
indexes []Index
|
|
mutex sync.RWMutex
|
|
open bool
|
|
}
|
|
|
|
// NewIndexAlias creates a new IndexAlias over the provided
|
|
// Index objects.
|
|
func NewIndexAlias(indexes ...Index) *indexAliasImpl {
|
|
return &indexAliasImpl{
|
|
name: "alias",
|
|
indexes: indexes,
|
|
open: true,
|
|
}
|
|
}
|
|
|
|
func (i *indexAliasImpl) isAliasToSingleIndex() error {
|
|
if len(i.indexes) < 1 {
|
|
return ErrorAliasEmpty
|
|
} else if len(i.indexes) > 1 {
|
|
return ErrorAliasMulti
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (i *indexAliasImpl) Index(id string, data interface{}) error {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
if !i.open {
|
|
return ErrorIndexClosed
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return i.indexes[0].Index(id, data)
|
|
}
|
|
|
|
func (i *indexAliasImpl) Delete(id string) error {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
if !i.open {
|
|
return ErrorIndexClosed
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return i.indexes[0].Delete(id)
|
|
}
|
|
|
|
func (i *indexAliasImpl) Batch(b *Batch) error {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
if !i.open {
|
|
return ErrorIndexClosed
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return i.indexes[0].Batch(b)
|
|
}
|
|
|
|
func (i *indexAliasImpl) Document(id string) (*document.Document, error) {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
if !i.open {
|
|
return nil, ErrorIndexClosed
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return i.indexes[0].Document(id)
|
|
}
|
|
|
|
func (i *indexAliasImpl) DocCount() (uint64, error) {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
rv := uint64(0)
|
|
|
|
if !i.open {
|
|
return 0, ErrorIndexClosed
|
|
}
|
|
|
|
for _, index := range i.indexes {
|
|
otherCount, err := index.DocCount()
|
|
if err == nil {
|
|
rv += otherCount
|
|
}
|
|
// tolerate errors to produce partial counts
|
|
}
|
|
|
|
return rv, nil
|
|
}
|
|
|
|
func (i *indexAliasImpl) Search(req *SearchRequest) (*SearchResult, error) {
|
|
return i.SearchInContext(context.Background(), req)
|
|
}
|
|
|
|
func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest) (*SearchResult, error) {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
if !i.open {
|
|
return nil, ErrorIndexClosed
|
|
}
|
|
|
|
if len(i.indexes) < 1 {
|
|
return nil, ErrorAliasEmpty
|
|
}
|
|
|
|
// short circuit the simple case
|
|
if len(i.indexes) == 1 {
|
|
return i.indexes[0].SearchInContext(ctx, req)
|
|
}
|
|
|
|
return MultiSearch(ctx, req, i.indexes...)
|
|
}
|
|
|
|
func (i *indexAliasImpl) Fields() ([]string, error) {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
if !i.open {
|
|
return nil, ErrorIndexClosed
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return i.indexes[0].Fields()
|
|
}
|
|
|
|
func (i *indexAliasImpl) FieldDict(field string) (index.FieldDict, error) {
|
|
i.mutex.RLock()
|
|
|
|
if !i.open {
|
|
i.mutex.RUnlock()
|
|
return nil, ErrorIndexClosed
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
i.mutex.RUnlock()
|
|
return nil, err
|
|
}
|
|
|
|
fieldDict, err := i.indexes[0].FieldDict(field)
|
|
if err != nil {
|
|
i.mutex.RUnlock()
|
|
return nil, err
|
|
}
|
|
|
|
return &indexAliasImplFieldDict{
|
|
index: i,
|
|
fieldDict: fieldDict,
|
|
}, nil
|
|
}
|
|
|
|
func (i *indexAliasImpl) FieldDictRange(field string, startTerm []byte, endTerm []byte) (index.FieldDict, error) {
|
|
i.mutex.RLock()
|
|
|
|
if !i.open {
|
|
i.mutex.RUnlock()
|
|
return nil, ErrorIndexClosed
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
i.mutex.RUnlock()
|
|
return nil, err
|
|
}
|
|
|
|
fieldDict, err := i.indexes[0].FieldDictRange(field, startTerm, endTerm)
|
|
if err != nil {
|
|
i.mutex.RUnlock()
|
|
return nil, err
|
|
}
|
|
|
|
return &indexAliasImplFieldDict{
|
|
index: i,
|
|
fieldDict: fieldDict,
|
|
}, nil
|
|
}
|
|
|
|
func (i *indexAliasImpl) FieldDictPrefix(field string, termPrefix []byte) (index.FieldDict, error) {
|
|
i.mutex.RLock()
|
|
|
|
if !i.open {
|
|
i.mutex.RUnlock()
|
|
return nil, ErrorIndexClosed
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
i.mutex.RUnlock()
|
|
return nil, err
|
|
}
|
|
|
|
fieldDict, err := i.indexes[0].FieldDictPrefix(field, termPrefix)
|
|
if err != nil {
|
|
i.mutex.RUnlock()
|
|
return nil, err
|
|
}
|
|
|
|
return &indexAliasImplFieldDict{
|
|
index: i,
|
|
fieldDict: fieldDict,
|
|
}, nil
|
|
}
|
|
|
|
func (i *indexAliasImpl) Close() error {
|
|
i.mutex.Lock()
|
|
defer i.mutex.Unlock()
|
|
|
|
i.open = false
|
|
return nil
|
|
}
|
|
|
|
func (i *indexAliasImpl) Mapping() *IndexMapping {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
if !i.open {
|
|
return nil
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
return i.indexes[0].Mapping()
|
|
}
|
|
|
|
func (i *indexAliasImpl) Stats() *IndexStat {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
if !i.open {
|
|
return nil
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
return i.indexes[0].Stats()
|
|
}
|
|
|
|
func (i *indexAliasImpl) StatsMap() map[string]interface{} {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
if !i.open {
|
|
return nil
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
return i.indexes[0].StatsMap()
|
|
}
|
|
|
|
func (i *indexAliasImpl) GetInternal(key []byte) ([]byte, error) {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
if !i.open {
|
|
return nil, ErrorIndexClosed
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return i.indexes[0].GetInternal(key)
|
|
}
|
|
|
|
func (i *indexAliasImpl) SetInternal(key, val []byte) error {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
if !i.open {
|
|
return ErrorIndexClosed
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return i.indexes[0].SetInternal(key, val)
|
|
}
|
|
|
|
func (i *indexAliasImpl) DeleteInternal(key []byte) error {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
if !i.open {
|
|
return ErrorIndexClosed
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return i.indexes[0].DeleteInternal(key)
|
|
}
|
|
|
|
func (i *indexAliasImpl) Advanced() (index.Index, store.KVStore, error) {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
if !i.open {
|
|
return nil, nil, ErrorIndexClosed
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return i.indexes[0].Advanced()
|
|
}
|
|
|
|
func (i *indexAliasImpl) Add(indexes ...Index) {
|
|
i.mutex.Lock()
|
|
defer i.mutex.Unlock()
|
|
|
|
i.indexes = append(i.indexes, indexes...)
|
|
}
|
|
|
|
func (i *indexAliasImpl) removeSingle(index Index) {
|
|
for pos, in := range i.indexes {
|
|
if in == index {
|
|
i.indexes = append(i.indexes[:pos], i.indexes[pos+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (i *indexAliasImpl) Remove(indexes ...Index) {
|
|
i.mutex.Lock()
|
|
defer i.mutex.Unlock()
|
|
|
|
for _, in := range indexes {
|
|
i.removeSingle(in)
|
|
}
|
|
}
|
|
|
|
func (i *indexAliasImpl) Swap(in, out []Index) {
|
|
i.mutex.Lock()
|
|
defer i.mutex.Unlock()
|
|
|
|
// add
|
|
i.indexes = append(i.indexes, in...)
|
|
|
|
// delete
|
|
for _, ind := range out {
|
|
i.removeSingle(ind)
|
|
}
|
|
}
|
|
|
|
// createChildSearchRequest creates a separate
|
|
// request from the original
|
|
// For now, avoid data race on req structure.
|
|
// TODO disable highlight/field load on child
|
|
// requests, and add code to do this only on
|
|
// the actual final results.
|
|
// Perhaps that part needs to be optional,
|
|
// could be slower in remote usages.
|
|
func createChildSearchRequest(req *SearchRequest) *SearchRequest {
|
|
rv := SearchRequest{
|
|
Query: req.Query,
|
|
Size: req.Size + req.From,
|
|
From: 0,
|
|
Highlight: req.Highlight,
|
|
Fields: req.Fields,
|
|
Facets: req.Facets,
|
|
Explain: req.Explain,
|
|
Sort: req.Sort,
|
|
}
|
|
return &rv
|
|
}
|
|
|
|
type asyncSearchResult struct {
|
|
Name string
|
|
Result *SearchResult
|
|
Err error
|
|
}
|
|
|
|
func wrapSearch(ctx context.Context, in Index, req *SearchRequest) *asyncSearchResult {
|
|
rv := asyncSearchResult{Name: in.Name()}
|
|
rv.Result, rv.Err = in.SearchInContext(ctx, req)
|
|
return &rv
|
|
}
|
|
|
|
func wrapSearchTimeout(ctx context.Context, in Index, req *SearchRequest) *asyncSearchResult {
|
|
reschan := make(chan *asyncSearchResult)
|
|
go func() { reschan <- wrapSearch(ctx, in, req) }()
|
|
select {
|
|
case res := <-reschan:
|
|
return res
|
|
case <-ctx.Done():
|
|
return &asyncSearchResult{Name: in.Name(), Err: ctx.Err()}
|
|
}
|
|
}
|
|
|
|
// MultiSearch executes a SearchRequest across multiple
|
|
// Index objects, then merges the results.
|
|
func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
|
|
|
|
searchStart := time.Now()
|
|
asyncResults := make(chan *asyncSearchResult)
|
|
|
|
// run search on each index in separate go routine
|
|
var waitGroup sync.WaitGroup
|
|
|
|
var searchChildIndex = func(waitGroup *sync.WaitGroup, in Index, asyncResults chan *asyncSearchResult) {
|
|
childReq := createChildSearchRequest(req)
|
|
if ia, ok := in.(IndexAlias); ok {
|
|
// if the child index is another alias, trust it returns promptly on timeout/cancel
|
|
go func() {
|
|
defer waitGroup.Done()
|
|
asyncResults <- wrapSearch(ctx, ia, childReq)
|
|
}()
|
|
} else {
|
|
// if the child index is not an alias, enforce timeout here
|
|
go func() {
|
|
defer waitGroup.Done()
|
|
asyncResults <- wrapSearchTimeout(ctx, in, childReq)
|
|
}()
|
|
}
|
|
}
|
|
|
|
for _, in := range indexes {
|
|
waitGroup.Add(1)
|
|
searchChildIndex(&waitGroup, in, asyncResults)
|
|
}
|
|
|
|
// on another go routine, close after finished
|
|
go func() {
|
|
waitGroup.Wait()
|
|
close(asyncResults)
|
|
}()
|
|
|
|
var sr *SearchResult
|
|
indexErrors := make(map[string]error)
|
|
|
|
for asr := range asyncResults {
|
|
if asr.Err == nil {
|
|
if sr == nil {
|
|
// first result
|
|
sr = asr.Result
|
|
} else {
|
|
// merge with previous
|
|
sr.Merge(asr.Result)
|
|
}
|
|
} else {
|
|
indexErrors[asr.Name] = asr.Err
|
|
}
|
|
}
|
|
|
|
// merge just concatenated all the hits
|
|
// now lets clean it up
|
|
|
|
// handle case where no results were successful
|
|
if sr == nil {
|
|
sr = &SearchResult{
|
|
Status: &SearchStatus{
|
|
Errors: make(map[string]error),
|
|
},
|
|
}
|
|
}
|
|
|
|
// sort all hits with the requested order
|
|
if len(req.Sort) > 0 {
|
|
sorter := newMultiSearchHitSorter(req.Sort, sr.Hits)
|
|
sort.Sort(sorter)
|
|
}
|
|
|
|
// now skip over the correct From
|
|
if req.From > 0 && len(sr.Hits) > req.From {
|
|
sr.Hits = sr.Hits[req.From:]
|
|
} else if req.From > 0 {
|
|
sr.Hits = search.DocumentMatchCollection{}
|
|
}
|
|
|
|
// now trim to the correct size
|
|
if req.Size > 0 && len(sr.Hits) > req.Size {
|
|
sr.Hits = sr.Hits[0:req.Size]
|
|
}
|
|
|
|
// fix up facets
|
|
for name, fr := range req.Facets {
|
|
sr.Facets.Fixup(name, fr.Size)
|
|
}
|
|
|
|
// fix up original request
|
|
sr.Request = req
|
|
searchDuration := time.Since(searchStart)
|
|
sr.Took = searchDuration
|
|
|
|
// fix up errors
|
|
if len(indexErrors) > 0 {
|
|
if sr.Status.Errors == nil {
|
|
sr.Status.Errors = make(map[string]error)
|
|
}
|
|
for indexName, indexErr := range indexErrors {
|
|
sr.Status.Errors[indexName] = indexErr
|
|
sr.Status.Total++
|
|
sr.Status.Failed++
|
|
}
|
|
}
|
|
|
|
return sr, nil
|
|
}
|
|
|
|
func (i *indexAliasImpl) NewBatch() *Batch {
|
|
i.mutex.RLock()
|
|
defer i.mutex.RUnlock()
|
|
|
|
if !i.open {
|
|
return nil
|
|
}
|
|
|
|
err := i.isAliasToSingleIndex()
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
return i.indexes[0].NewBatch()
|
|
}
|
|
|
|
func (i *indexAliasImpl) Name() string {
|
|
return i.name
|
|
}
|
|
|
|
func (i *indexAliasImpl) SetName(name string) {
|
|
i.name = name
|
|
}
|
|
|
|
type indexAliasImplFieldDict struct {
|
|
index *indexAliasImpl
|
|
fieldDict index.FieldDict
|
|
}
|
|
|
|
func (f *indexAliasImplFieldDict) Next() (*index.DictEntry, error) {
|
|
return f.fieldDict.Next()
|
|
}
|
|
|
|
func (f *indexAliasImplFieldDict) Close() error {
|
|
defer f.index.mutex.RUnlock()
|
|
return f.fieldDict.Close()
|
|
}
|
|
|
|
type multiSearchHitSorter struct {
|
|
hits search.DocumentMatchCollection
|
|
sort search.SortOrder
|
|
cachedScoring []bool
|
|
cachedDesc []bool
|
|
}
|
|
|
|
func newMultiSearchHitSorter(sort search.SortOrder, hits search.DocumentMatchCollection) *multiSearchHitSorter {
|
|
return &multiSearchHitSorter{
|
|
sort: sort,
|
|
hits: hits,
|
|
cachedScoring: sort.CacheIsScore(),
|
|
cachedDesc: sort.CacheDescending(),
|
|
}
|
|
}
|
|
|
|
func (m *multiSearchHitSorter) Len() int { return len(m.hits) }
|
|
func (m *multiSearchHitSorter) Swap(i, j int) { m.hits[i], m.hits[j] = m.hits[j], m.hits[i] }
|
|
func (m *multiSearchHitSorter) Less(i, j int) bool {
|
|
c := m.sort.Compare(m.cachedScoring, m.cachedDesc, m.hits[i], m.hits[j])
|
|
return c < 0
|
|
}
|