0
0
Fork 0
bleve/search/collectors/collector_heap.go

243 lines
6.1 KiB
Go
Raw Normal View History

2016-08-10 10:13:38 +02:00
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
//
2016-08-09 15:18:53 +02:00
package collectors
import (
"container/heap"
2016-08-10 10:13:38 +02:00
"time"
2016-08-09 15:18:53 +02:00
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/search"
"golang.org/x/net/context"
)
type HeapCollector struct {
size int
skip int
total uint64
maxScore float64
2016-08-09 15:18:53 +02:00
took time.Duration
sort search.SortOrder
results search.DocumentMatchCollection
2016-08-09 15:18:53 +02:00
facetsBuilder *search.FacetsBuilder
lowestMatchOutsideResults *search.DocumentMatch
2016-08-09 15:18:53 +02:00
}
2016-08-10 10:13:38 +02:00
var COLLECT_CHECK_DONE_EVERY = uint64(1024)
func NewHeapCollector(size int, skip int, sort search.SortOrder) *HeapCollector {
hc := &HeapCollector{size: size, skip: skip, sort: sort}
2016-08-09 15:18:53 +02:00
heap.Init(hc)
return hc
}
func (hc *HeapCollector) Collect(ctx context.Context, searcher search.Searcher, reader index.IndexReader) error {
2016-08-09 15:18:53 +02:00
startTime := time.Now()
var err error
var next *search.DocumentMatch
// search context with enough pre-allocated document matches
// we keep references to size+skip ourselves
// plus possibly one extra for the highestMatchOutsideResults
// plus the amount required by the searcher tree
searchContext := &search.SearchContext{
DocumentMatchPool: search.NewDocumentMatchPool(hc.size + hc.skip + 1 + searcher.DocumentMatchPoolSize()),
}
2016-08-09 15:18:53 +02:00
select {
case <-ctx.Done():
return ctx.Err()
default:
next, err = searcher.Next(searchContext)
2016-08-09 15:18:53 +02:00
}
for err == nil && next != nil {
if hc.total%COLLECT_CHECK_DONE_EVERY == 0 {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
if hc.facetsBuilder != nil {
err = hc.facetsBuilder.Update(next)
if err != nil {
break
}
}
err = hc.collectSingle(searchContext, reader, next)
if err != nil {
break
}
next, err = searcher.Next(searchContext)
2016-08-09 15:18:53 +02:00
}
// compute search duration
hc.took = time.Since(startTime)
if err != nil {
return err
}
// finalize actual results
err = hc.finalizeResults(reader)
if err != nil {
return err
}
2016-08-09 15:18:53 +02:00
return nil
}
func (hc *HeapCollector) collectSingle(ctx *search.SearchContext, reader index.IndexReader, d *search.DocumentMatch) error {
2016-08-09 15:18:53 +02:00
// increment total hits
hc.total++
d.HitNumber = hc.total
// update max score
if d.Score > hc.maxScore {
hc.maxScore = d.Score
}
2016-08-09 15:18:53 +02:00
var err error
// see if we need to load ID (at this early stage, for example to sort on it)
if hc.sort.RequiresDocID() {
d.ID, err = reader.FinalizeDocID(d.IndexInternalID)
2016-08-09 15:18:53 +02:00
if err != nil {
return err
}
}
// see if we need to load the stored fields
if len(hc.sort.RequiredFields()) > 0 {
// find out which fields haven't been loaded yet
fieldsToLoad := d.CachedFieldTerms.FieldsNotYetCached(hc.sort.RequiredFields())
// look them up
fieldTerms, err := reader.DocumentFieldTerms(d.IndexInternalID, fieldsToLoad)
if err != nil {
return err
}
// cache these as well
if d.CachedFieldTerms == nil {
d.CachedFieldTerms = make(map[string][]string)
}
d.CachedFieldTerms.Merge(fieldTerms)
}
// optimization, we track lowest sorting hit already removed from heap
// with this one comparision, we can avoid all heap operations if
// this hit would have been added and then immediately removed
if hc.lowestMatchOutsideResults != nil {
cmp := hc.sort.Compare(d, hc.lowestMatchOutsideResults)
if cmp >= 0 {
// this hit can't possibly be in the result set, so avoid heap ops
ctx.DocumentMatchPool.Put(d)
return nil
}
}
heap.Push(hc, d)
2016-08-10 10:13:38 +02:00
if hc.Len() > hc.size+hc.skip {
removed := heap.Pop(hc).(*search.DocumentMatch)
if hc.lowestMatchOutsideResults == nil {
hc.lowestMatchOutsideResults = removed
} else {
cmp := hc.sort.Compare(removed, hc.lowestMatchOutsideResults)
if cmp < 0 {
tmp := hc.lowestMatchOutsideResults
hc.lowestMatchOutsideResults = removed
ctx.DocumentMatchPool.Put(tmp)
}
}
2016-08-10 10:13:38 +02:00
}
2016-08-09 15:18:53 +02:00
return nil
}
func (hc *HeapCollector) SetFacetsBuilder(facetsBuilder *search.FacetsBuilder) {
hc.facetsBuilder = facetsBuilder
}
// finalizeResults starts with the heap containing the final top size+skip
// it now throws away the results to be skipped
// and does final doc id lookup (if necessary)
func (hc *HeapCollector) finalizeResults(r index.IndexReader) error {
2016-08-10 10:13:38 +02:00
count := hc.Len()
size := count - hc.skip
rv := make(search.DocumentMatchCollection, size)
for count > 0 {
count--
2016-08-10 10:13:38 +02:00
if count >= hc.skip {
size--
doc := heap.Pop(hc).(*search.DocumentMatch)
rv[size] = doc
if doc.ID == "" {
// look up the id since we need it for lookup
var err error
doc.ID, err = r.FinalizeDocID(doc.IndexInternalID)
if err != nil {
return err
}
}
2016-08-10 10:13:38 +02:00
}
2016-08-09 15:18:53 +02:00
}
// no longer a heap
hc.results = rv
return nil
}
func (hc *HeapCollector) Results() search.DocumentMatchCollection {
return hc.results
2016-08-09 15:18:53 +02:00
}
func (hc *HeapCollector) Total() uint64 {
return hc.total
}
func (hc *HeapCollector) MaxScore() float64 {
return hc.maxScore
2016-08-10 10:13:38 +02:00
}
func (hc *HeapCollector) Took() time.Duration {
return hc.took
2016-08-09 15:18:53 +02:00
}
func (hc *HeapCollector) FacetResults() search.FacetResults {
if hc.facetsBuilder != nil {
return hc.facetsBuilder.Results()
}
return search.FacetResults{}
}
// heap interface implementation
2016-08-09 15:18:53 +02:00
func (hc *HeapCollector) Len() int {
return len(hc.results)
}
func (hc *HeapCollector) Less(i, j int) bool {
so := hc.sort.Compare(hc.results[i], hc.results[j])
return -so < 0
2016-08-09 15:18:53 +02:00
}
func (hc *HeapCollector) Swap(i, j int) {
hc.results[i], hc.results[j] = hc.results[j], hc.results[i]
}
func (hc *HeapCollector) Push(x interface{}) {
hc.results = append(hc.results, x.(*search.DocumentMatch))
2016-08-09 15:18:53 +02:00
}
func (hc *HeapCollector) Pop() interface{} {
var rv *search.DocumentMatch
rv, hc.results = hc.results[len(hc.results)-1], hc.results[:len(hc.results)-1]
return rv
2016-08-09 15:18:53 +02:00
}