// Copyright (c) 2014 Couchbase, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package bleve import ( "context" "encoding/json" "fmt" "os" "sync" "sync/atomic" "time" "github.com/blevesearch/bleve/document" "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/index/store" "github.com/blevesearch/bleve/index/upsidedown" "github.com/blevesearch/bleve/mapping" "github.com/blevesearch/bleve/registry" "github.com/blevesearch/bleve/search" "github.com/blevesearch/bleve/search/collector" "github.com/blevesearch/bleve/search/facet" "github.com/blevesearch/bleve/search/highlight" ) type indexImpl struct { path string name string meta *indexMeta i index.Index m mapping.IndexMapping mutex sync.RWMutex open bool stats *IndexStat } const storePath = "store" var mappingInternalKey = []byte("_mapping") const SearchQueryStartCallbackKey = "_search_query_start_callback_key" const SearchQueryEndCallbackKey = "_search_query_end_callback_key" type SearchQueryStartCallbackFn func(size uint64) error type SearchQueryEndCallbackFn func(size uint64) error func indexStorePath(path string) string { return path + string(os.PathSeparator) + storePath } func newIndexUsing(path string, mapping mapping.IndexMapping, indexType string, kvstore string, kvconfig map[string]interface{}) (*indexImpl, error) { // first validate the mapping err := mapping.Validate() if err != nil { return nil, err } if kvconfig == nil { kvconfig = map[string]interface{}{} } if kvstore == "" { return nil, fmt.Errorf("bleve not configured for file based indexing") } rv := indexImpl{ path: path, name: path, m: mapping, meta: newIndexMeta(indexType, kvstore, kvconfig), } rv.stats = &IndexStat{i: &rv} // at this point there is hope that we can be successful, so save index meta if path != "" { err = rv.meta.Save(path) if err != nil { return nil, err } kvconfig["create_if_missing"] = true kvconfig["error_if_exists"] = true kvconfig["path"] = indexStorePath(path) } else { kvconfig["path"] = "" } // open the index indexTypeConstructor := registry.IndexTypeConstructorByName(rv.meta.IndexType) if indexTypeConstructor == nil { return nil, ErrorUnknownIndexType } rv.i, err = indexTypeConstructor(rv.meta.Storage, kvconfig, Config.analysisQueue) if err != nil { return nil, err } err = rv.i.Open() if err != nil { if err == index.ErrorUnknownStorageType { return nil, ErrorUnknownStorageType } return nil, err } // now persist the mapping mappingBytes, err := json.Marshal(mapping) if err != nil { return nil, err } err = rv.i.SetInternal(mappingInternalKey, mappingBytes) if err != nil { return nil, err } // mark the index as open rv.mutex.Lock() defer rv.mutex.Unlock() rv.open = true indexStats.Register(&rv) return &rv, nil } func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *indexImpl, err error) { rv = &indexImpl{ path: path, name: path, } rv.stats = &IndexStat{i: rv} rv.meta, err = openIndexMeta(path) if err != nil { return nil, err } // backwards compatibility if index type is missing if rv.meta.IndexType == "" { rv.meta.IndexType = upsidedown.Name } storeConfig := rv.meta.Config if storeConfig == nil { storeConfig = map[string]interface{}{} } storeConfig["path"] = indexStorePath(path) storeConfig["create_if_missing"] = false storeConfig["error_if_exists"] = false for rck, rcv := range runtimeConfig { storeConfig[rck] = rcv } // open the index indexTypeConstructor := registry.IndexTypeConstructorByName(rv.meta.IndexType) if indexTypeConstructor == nil { return nil, ErrorUnknownIndexType } rv.i, err = indexTypeConstructor(rv.meta.Storage, storeConfig, Config.analysisQueue) if err != nil { return nil, err } err = rv.i.Open() if err != nil { if err == index.ErrorUnknownStorageType { return nil, ErrorUnknownStorageType } return nil, err } // now load the mapping indexReader, err := rv.i.Reader() if err != nil { return nil, err } defer func() { if cerr := indexReader.Close(); cerr != nil && err == nil { err = cerr } }() mappingBytes, err := indexReader.GetInternal(mappingInternalKey) if err != nil { return nil, err } var im *mapping.IndexMappingImpl err = json.Unmarshal(mappingBytes, &im) if err != nil { return nil, fmt.Errorf("error parsing mapping JSON: %v\nmapping contents:\n%s", err, string(mappingBytes)) } // mark the index as open rv.mutex.Lock() defer rv.mutex.Unlock() rv.open = true // validate the mapping err = im.Validate() if err != nil { // note even if the mapping is invalid // we still return an open usable index return rv, err } rv.m = im indexStats.Register(rv) return rv, err } // Advanced returns implementation internals // necessary ONLY for advanced usage. func (i *indexImpl) Advanced() (index.Index, store.KVStore, error) { s, err := i.i.Advanced() if err != nil { return nil, nil, err } return i.i, s, nil } // Mapping returns the IndexMapping in use by this // Index. func (i *indexImpl) Mapping() mapping.IndexMapping { return i.m } // Index the object with the specified identifier. // The IndexMapping for this index will determine // how the object is indexed. func (i *indexImpl) Index(id string, data interface{}) (err error) { if id == "" { return ErrorEmptyID } i.mutex.RLock() defer i.mutex.RUnlock() if !i.open { return ErrorIndexClosed } doc := document.NewDocument(id) err = i.m.MapDocument(doc, data) if err != nil { return } err = i.i.Update(doc) return } // IndexAdvanced takes a document.Document object // skips the mapping and indexes it. func (i *indexImpl) IndexAdvanced(doc *document.Document) (err error) { if doc.ID == "" { return ErrorEmptyID } i.mutex.RLock() defer i.mutex.RUnlock() if !i.open { return ErrorIndexClosed } err = i.i.Update(doc) return } // Delete entries for the specified identifier from // the index. func (i *indexImpl) Delete(id string) (err error) { if id == "" { return ErrorEmptyID } i.mutex.RLock() defer i.mutex.RUnlock() if !i.open { return ErrorIndexClosed } err = i.i.Delete(id) return } // Batch executes multiple Index and Delete // operations at the same time. There are often // significant performance benefits when performing // operations in a batch. func (i *indexImpl) Batch(b *Batch) error { i.mutex.RLock() defer i.mutex.RUnlock() if !i.open { return ErrorIndexClosed } return i.i.Batch(b.internal) } // Document is used to find the values of all the // stored fields for a document in the index. These // stored fields are put back into a Document object // and returned. func (i *indexImpl) Document(id string) (doc *document.Document, err error) { i.mutex.RLock() defer i.mutex.RUnlock() if !i.open { return nil, ErrorIndexClosed } indexReader, err := i.i.Reader() if err != nil { return nil, err } defer func() { if cerr := indexReader.Close(); err == nil && cerr != nil { err = cerr } }() doc, err = indexReader.Document(id) if err != nil { return nil, err } return doc, nil } // DocCount returns the number of documents in the // index. func (i *indexImpl) DocCount() (count uint64, err error) { i.mutex.RLock() defer i.mutex.RUnlock() if !i.open { return 0, ErrorIndexClosed } // open a reader for this search indexReader, err := i.i.Reader() if err != nil { return 0, fmt.Errorf("error opening index reader %v", err) } defer func() { if cerr := indexReader.Close(); err == nil && cerr != nil { err = cerr } }() count, err = indexReader.DocCount() return } // Search executes a search request operation. // Returns a SearchResult object or an error. func (i *indexImpl) Search(req *SearchRequest) (sr *SearchResult, err error) { return i.SearchInContext(context.Background(), req) } // memNeededForSearch is a helper function that returns an estimate of RAM // needed to execute a search request. func memNeededForSearch(req *SearchRequest, searcher search.Searcher, topnCollector *collector.TopNCollector) uint64 { backingSize := req.Size + req.From + 1 if req.Size+req.From > collector.PreAllocSizeSkipCap { backingSize = collector.PreAllocSizeSkipCap + 1 } numDocMatches := backingSize + searcher.DocumentMatchPoolSize() estimate := 0 // overhead, size in bytes from collector estimate += topnCollector.Size() var dm search.DocumentMatch sizeOfDocumentMatch := dm.Size() // pre-allocing DocumentMatchPool var sc search.SearchContext estimate += sc.Size() + numDocMatches*sizeOfDocumentMatch // searcher overhead estimate += searcher.Size() // overhead from results, lowestMatchOutsideResults estimate += (numDocMatches + 1) * sizeOfDocumentMatch // additional overhead from SearchResult var sr SearchResult estimate += sr.Size() // overhead from facet results if req.Facets != nil { var fr search.FacetResult estimate += len(req.Facets) * fr.Size() } // highlighting, store var d document.Document if len(req.Fields) > 0 || req.Highlight != nil { for i := 0; i < (req.Size + req.From); i++ { // size + from => number of hits estimate += (req.Size + req.From) * d.Size() } } return uint64(estimate) } // SearchInContext executes a search request operation within the provided // Context. Returns a SearchResult object or an error. func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr *SearchResult, err error) { i.mutex.RLock() defer i.mutex.RUnlock() searchStart := time.Now() if !i.open { return nil, ErrorIndexClosed } collector := collector.NewTopNCollector(req.Size, req.From, req.Sort) // open a reader for this search indexReader, err := i.i.Reader() if err != nil { return nil, fmt.Errorf("error opening index reader %v", err) } defer func() { if cerr := indexReader.Close(); err == nil && cerr != nil { err = cerr } }() searcher, err := req.Query.Searcher(indexReader, i.m, search.SearcherOptions{ Explain: req.Explain, IncludeTermVectors: req.IncludeLocations || req.Highlight != nil, }) if err != nil { return nil, err } defer func() { if serr := searcher.Close(); err == nil && serr != nil { err = serr } }() if req.Facets != nil { facetsBuilder := search.NewFacetsBuilder(indexReader) for facetName, facetRequest := range req.Facets { if facetRequest.NumericRanges != nil { // build numeric range facet facetBuilder := facet.NewNumericFacetBuilder(facetRequest.Field, facetRequest.Size) for _, nr := range facetRequest.NumericRanges { facetBuilder.AddRange(nr.Name, nr.Min, nr.Max) } facetsBuilder.Add(facetName, facetBuilder) } else if facetRequest.DateTimeRanges != nil { // build date range facet facetBuilder := facet.NewDateTimeFacetBuilder(facetRequest.Field, facetRequest.Size) dateTimeParser := i.m.DateTimeParserNamed("") for _, dr := range facetRequest.DateTimeRanges { start, end := dr.ParseDates(dateTimeParser) facetBuilder.AddRange(dr.Name, start, end) } facetsBuilder.Add(facetName, facetBuilder) } else { // build terms facet facetBuilder := facet.NewTermsFacetBuilder(facetRequest.Field, facetRequest.Size) facetsBuilder.Add(facetName, facetBuilder) } } collector.SetFacetsBuilder(facetsBuilder) } memNeeded := memNeededForSearch(req, searcher, collector) if cb := ctx.Value(SearchQueryStartCallbackKey); cb != nil { if cbF, ok := cb.(SearchQueryStartCallbackFn); ok { err = cbF(memNeeded) } } if err != nil { return nil, err } if cb := ctx.Value(SearchQueryEndCallbackKey); cb != nil { if cbF, ok := cb.(SearchQueryEndCallbackFn); ok { defer func() { _ = cbF(memNeeded) }() } } err = collector.Collect(ctx, searcher, indexReader) if err != nil { return nil, err } hits := collector.Results() var highlighter highlight.Highlighter if req.Highlight != nil { // get the right highlighter highlighter, err = Config.Cache.HighlighterNamed(Config.DefaultHighlighter) if err != nil { return nil, err } if req.Highlight.Style != nil { highlighter, err = Config.Cache.HighlighterNamed(*req.Highlight.Style) if err != nil { return nil, err } } if highlighter == nil { return nil, fmt.Errorf("no highlighter named `%s` registered", *req.Highlight.Style) } } for _, hit := range hits { if len(req.Fields) > 0 || highlighter != nil { doc, err := indexReader.Document(hit.ID) if err == nil && doc != nil { if len(req.Fields) > 0 { fieldsToLoad := deDuplicate(req.Fields) for _, f := range fieldsToLoad { for _, docF := range doc.Fields { if f == "*" || docF.Name() == f { var value interface{} switch docF := docF.(type) { case *document.TextField: value = string(docF.Value()) case *document.NumericField: num, err := docF.Number() if err == nil { value = num } case *document.DateTimeField: datetime, err := docF.DateTime() if err == nil { value = datetime.Format(time.RFC3339) } case *document.BooleanField: boolean, err := docF.Boolean() if err == nil { value = boolean } case *document.GeoPointField: lon, err := docF.Lon() if err == nil { lat, err := docF.Lat() if err == nil { value = []float64{lon, lat} } } } if value != nil { hit.AddFieldValue(docF.Name(), value) } } } } } if highlighter != nil { highlightFields := req.Highlight.Fields if highlightFields == nil { // add all fields with matches highlightFields = make([]string, 0, len(hit.Locations)) for k := range hit.Locations { highlightFields = append(highlightFields, k) } } for _, hf := range highlightFields { highlighter.BestFragmentsInField(hit, doc, hf, 1) } } } else if doc == nil { // unexpected case, a doc ID that was found as a search hit // was unable to be found during document lookup return nil, ErrorIndexReadInconsistency } } if i.name != "" { hit.Index = i.name } } atomic.AddUint64(&i.stats.searches, 1) searchDuration := time.Since(searchStart) atomic.AddUint64(&i.stats.searchTime, uint64(searchDuration)) if Config.SlowSearchLogThreshold > 0 && searchDuration > Config.SlowSearchLogThreshold { logger.Printf("slow search took %s - %v", searchDuration, req) } return &SearchResult{ Status: &SearchStatus{ Total: 1, Failed: 0, Successful: 1, Errors: make(map[string]error), }, Request: req, Hits: hits, Total: collector.Total(), MaxScore: collector.MaxScore(), Took: searchDuration, Facets: collector.FacetResults(), }, nil } // Fields returns the name of all the fields this // Index has operated on. func (i *indexImpl) Fields() (fields []string, err error) { i.mutex.RLock() defer i.mutex.RUnlock() if !i.open { return nil, ErrorIndexClosed } indexReader, err := i.i.Reader() if err != nil { return nil, err } defer func() { if cerr := indexReader.Close(); err == nil && cerr != nil { err = cerr } }() fields, err = indexReader.Fields() if err != nil { return nil, err } return fields, nil } func (i *indexImpl) FieldDict(field string) (index.FieldDict, error) { i.mutex.RLock() if !i.open { i.mutex.RUnlock() return nil, ErrorIndexClosed } indexReader, err := i.i.Reader() if err != nil { i.mutex.RUnlock() return nil, err } fieldDict, err := indexReader.FieldDict(field) if err != nil { i.mutex.RUnlock() return nil, err } return &indexImplFieldDict{ index: i, indexReader: indexReader, fieldDict: fieldDict, }, nil } func (i *indexImpl) FieldDictRange(field string, startTerm []byte, endTerm []byte) (index.FieldDict, error) { i.mutex.RLock() if !i.open { i.mutex.RUnlock() return nil, ErrorIndexClosed } indexReader, err := i.i.Reader() if err != nil { i.mutex.RUnlock() return nil, err } fieldDict, err := indexReader.FieldDictRange(field, startTerm, endTerm) if err != nil { i.mutex.RUnlock() return nil, err } return &indexImplFieldDict{ index: i, indexReader: indexReader, fieldDict: fieldDict, }, nil } func (i *indexImpl) FieldDictPrefix(field string, termPrefix []byte) (index.FieldDict, error) { i.mutex.RLock() if !i.open { i.mutex.RUnlock() return nil, ErrorIndexClosed } indexReader, err := i.i.Reader() if err != nil { i.mutex.RUnlock() return nil, err } fieldDict, err := indexReader.FieldDictPrefix(field, termPrefix) if err != nil { i.mutex.RUnlock() return nil, err } return &indexImplFieldDict{ index: i, indexReader: indexReader, fieldDict: fieldDict, }, nil } func (i *indexImpl) Close() error { i.mutex.Lock() defer i.mutex.Unlock() indexStats.UnRegister(i) i.open = false return i.i.Close() } func (i *indexImpl) Stats() *IndexStat { return i.stats } func (i *indexImpl) StatsMap() map[string]interface{} { return i.stats.statsMap() } func (i *indexImpl) GetInternal(key []byte) (val []byte, err error) { i.mutex.RLock() defer i.mutex.RUnlock() if !i.open { return nil, ErrorIndexClosed } reader, err := i.i.Reader() if err != nil { return nil, err } defer func() { if cerr := reader.Close(); err == nil && cerr != nil { err = cerr } }() val, err = reader.GetInternal(key) if err != nil { return nil, err } return val, nil } func (i *indexImpl) SetInternal(key, val []byte) error { i.mutex.RLock() defer i.mutex.RUnlock() if !i.open { return ErrorIndexClosed } return i.i.SetInternal(key, val) } func (i *indexImpl) DeleteInternal(key []byte) error { i.mutex.RLock() defer i.mutex.RUnlock() if !i.open { return ErrorIndexClosed } return i.i.DeleteInternal(key) } // NewBatch creates a new empty batch. func (i *indexImpl) NewBatch() *Batch { return &Batch{ index: i, internal: index.NewBatch(), } } func (i *indexImpl) Name() string { return i.name } func (i *indexImpl) SetName(name string) { indexStats.UnRegister(i) i.name = name indexStats.Register(i) } type indexImplFieldDict struct { index *indexImpl indexReader index.IndexReader fieldDict index.FieldDict } func (f *indexImplFieldDict) Next() (*index.DictEntry, error) { return f.fieldDict.Next() } func (f *indexImplFieldDict) Close() error { defer f.index.mutex.RUnlock() err := f.fieldDict.Close() if err != nil { return err } return f.indexReader.Close() } // helper function to remove duplicate entries from slice of strings func deDuplicate(fields []string) []string { entries := make(map[string]struct{}) ret := []string{} for _, entry := range fields { if _, exists := entries[entry]; !exists { entries[entry] = struct{}{} ret = append(ret, entry) } } return ret }