0
0
Fork 0
bleve/search/collectors/collector_heap.go

269 lines
6.9 KiB
Go
Raw Normal View History

2016-08-10 10:13:38 +02:00
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
//
2016-08-09 15:18:53 +02:00
package collectors
import (
"container/heap"
2016-08-10 10:13:38 +02:00
"time"
2016-08-09 15:18:53 +02:00
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/search"
"golang.org/x/net/context"
)
type HeapCollector struct {
size int
skip int
total uint64
maxScore float64
2016-08-09 15:18:53 +02:00
took time.Duration
sort search.SortOrder
results search.DocumentMatchCollection
2016-08-09 15:18:53 +02:00
facetsBuilder *search.FacetsBuilder
needDocIds bool
neededFields []string
cachedScoring []bool
cachedDesc []bool
lowestMatchOutsideResults *search.DocumentMatch
2016-08-09 15:18:53 +02:00
}
2016-08-10 10:13:38 +02:00
var COLLECT_CHECK_DONE_EVERY = uint64(1024)
func NewHeapCollector(size int, skip int, sort search.SortOrder) *HeapCollector {
hc := &HeapCollector{size: size, skip: skip, sort: sort}
improved implementation to address perf regressions primary change is going back to sort values be []string and not []interface{}, this avoid allocatiosn converting into the interface{} that sounds obvious, so why didn't we just do that first? because a common (default) sort is score, which is naturally a number, not a string (like terms). converting into the number was also expensive, and the common case. so, this solution also makes the change to NOT put the score into the sort value list. instead you see the dummy value "_score". this is just a placeholder, the actual sort impl knows that field of the sort is the score, and will sort using the actual score. also, several other aspets of the benchmark were cleaned up so that unnecessary allocations do not pollute the cpu profiles Here are the updated benchmarks: $ go test -run=xxx -bench=. -benchmem -cpuprofile=cpu.out BenchmarkTop10of100000Scores-4 3000 465809 ns/op 2548 B/op 33 allocs/op BenchmarkTop100of100000Scores-4 2000 626488 ns/op 21484 B/op 213 allocs/op BenchmarkTop10of1000000Scores-4 300 5107658 ns/op 2560 B/op 33 allocs/op BenchmarkTop100of1000000Scores-4 300 5275403 ns/op 21624 B/op 213 allocs/op PASS ok github.com/blevesearch/bleve/search/collectors 7.188s Prior to this PR, master reported: $ go test -run=xxx -bench=. -benchmem BenchmarkTop10of100000Scores-4 3000 453269 ns/op 360161 B/op 42 allocs/op BenchmarkTop100of100000Scores-4 2000 519131 ns/op 388275 B/op 219 allocs/op BenchmarkTop10of1000000Scores-4 200 7459004 ns/op 4628236 B/op 52 allocs/op BenchmarkTop100of1000000Scores-4 200 8064864 ns/op 4656596 B/op 232 allocs/op PASS ok github.com/blevesearch/bleve/search/collectors 7.385s So, we're pretty close on the smaller datasets, and we scale better on the larger datasets. We also show fewer allocations and bytes in all cases (some of this is artificial due to test cleanup).
2016-08-25 21:47:07 +02:00
// pre-allocate space on the heap, we need size+skip results
// +1 additional while figuring out which to evict
hc.results = make(search.DocumentMatchCollection, 0, size+skip+1)
2016-08-09 15:18:53 +02:00
heap.Init(hc)
// these lookups traverse an interface, so do once up-front
if sort.RequiresDocID() {
hc.needDocIds = true
}
hc.neededFields = sort.RequiredFields()
hc.cachedScoring = sort.CacheIsScore()
hc.cachedDesc = sort.CacheDescending()
2016-08-09 15:18:53 +02:00
return hc
}
func (hc *HeapCollector) Collect(ctx context.Context, searcher search.Searcher, reader index.IndexReader) error {
2016-08-09 15:18:53 +02:00
startTime := time.Now()
var err error
var next *search.DocumentMatch
// search context with enough pre-allocated document matches
// we keep references to size+skip ourselves
// plus possibly one extra for the highestMatchOutsideResults
// plus the amount required by the searcher tree
searchContext := &search.SearchContext{
DocumentMatchPool: search.NewDocumentMatchPool(hc.size+hc.skip+1+searcher.DocumentMatchPoolSize(), len(hc.sort)),
}
2016-08-09 15:18:53 +02:00
select {
case <-ctx.Done():
return ctx.Err()
default:
next, err = searcher.Next(searchContext)
2016-08-09 15:18:53 +02:00
}
for err == nil && next != nil {
if hc.total%COLLECT_CHECK_DONE_EVERY == 0 {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
if hc.facetsBuilder != nil {
err = hc.facetsBuilder.Update(next)
if err != nil {
break
}
}
err = hc.collectSingle(searchContext, reader, next)
if err != nil {
break
}
next, err = searcher.Next(searchContext)
2016-08-09 15:18:53 +02:00
}
// compute search duration
hc.took = time.Since(startTime)
if err != nil {
return err
}
// finalize actual results
err = hc.finalizeResults(reader)
if err != nil {
return err
}
2016-08-09 15:18:53 +02:00
return nil
}
var sortByScoreOpt = []string{"_score"}
func (hc *HeapCollector) collectSingle(ctx *search.SearchContext, reader index.IndexReader, d *search.DocumentMatch) error {
2016-08-09 15:18:53 +02:00
// increment total hits
hc.total++
d.HitNumber = hc.total
// update max score
if d.Score > hc.maxScore {
hc.maxScore = d.Score
}
2016-08-09 15:18:53 +02:00
var err error
// see if we need to load ID (at this early stage, for example to sort on it)
if hc.needDocIds {
d.ID, err = reader.FinalizeDocID(d.IndexInternalID)
2016-08-09 15:18:53 +02:00
if err != nil {
return err
}
}
// see if we need to load the stored fields
if len(hc.neededFields) > 0 {
// find out which fields haven't been loaded yet
fieldsToLoad := d.CachedFieldTerms.FieldsNotYetCached(hc.neededFields)
// look them up
fieldTerms, err := reader.DocumentFieldTerms(d.IndexInternalID, fieldsToLoad)
if err != nil {
return err
}
// cache these as well
if d.CachedFieldTerms == nil {
d.CachedFieldTerms = make(map[string][]string)
}
d.CachedFieldTerms.Merge(fieldTerms)
}
// compute this hits sort value
if len(hc.sort) == 1 && hc.cachedScoring[0] {
d.Sort = sortByScoreOpt
} else {
hc.sort.Value(d)
}
// optimization, we track lowest sorting hit already removed from heap
// with this one comparision, we can avoid all heap operations if
// this hit would have been added and then immediately removed
if hc.lowestMatchOutsideResults != nil {
cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, d, hc.lowestMatchOutsideResults)
if cmp >= 0 {
// this hit can't possibly be in the result set, so avoid heap ops
ctx.DocumentMatchPool.Put(d)
return nil
}
}
heap.Push(hc, d)
2016-08-10 10:13:38 +02:00
if hc.Len() > hc.size+hc.skip {
removed := heap.Pop(hc).(*search.DocumentMatch)
if hc.lowestMatchOutsideResults == nil {
hc.lowestMatchOutsideResults = removed
} else {
cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, removed, hc.lowestMatchOutsideResults)
if cmp < 0 {
tmp := hc.lowestMatchOutsideResults
hc.lowestMatchOutsideResults = removed
ctx.DocumentMatchPool.Put(tmp)
}
}
2016-08-10 10:13:38 +02:00
}
2016-08-09 15:18:53 +02:00
return nil
}
func (hc *HeapCollector) SetFacetsBuilder(facetsBuilder *search.FacetsBuilder) {
hc.facetsBuilder = facetsBuilder
}
// finalizeResults starts with the heap containing the final top size+skip
// it now throws away the results to be skipped
// and does final doc id lookup (if necessary)
func (hc *HeapCollector) finalizeResults(r index.IndexReader) error {
2016-08-10 10:13:38 +02:00
count := hc.Len()
size := count - hc.skip
rv := make(search.DocumentMatchCollection, size)
for count > 0 {
count--
2016-08-10 10:13:38 +02:00
if count >= hc.skip {
size--
doc := heap.Pop(hc).(*search.DocumentMatch)
rv[size] = doc
if doc.ID == "" {
// look up the id since we need it for lookup
var err error
doc.ID, err = r.FinalizeDocID(doc.IndexInternalID)
if err != nil {
return err
}
}
2016-08-10 10:13:38 +02:00
}
2016-08-09 15:18:53 +02:00
}
// no longer a heap
hc.results = rv
return nil
}
func (hc *HeapCollector) Results() search.DocumentMatchCollection {
return hc.results
2016-08-09 15:18:53 +02:00
}
func (hc *HeapCollector) Total() uint64 {
return hc.total
}
func (hc *HeapCollector) MaxScore() float64 {
return hc.maxScore
2016-08-10 10:13:38 +02:00
}
func (hc *HeapCollector) Took() time.Duration {
return hc.took
2016-08-09 15:18:53 +02:00
}
func (hc *HeapCollector) FacetResults() search.FacetResults {
if hc.facetsBuilder != nil {
return hc.facetsBuilder.Results()
}
return search.FacetResults{}
}
// heap interface implementation
2016-08-09 15:18:53 +02:00
func (hc *HeapCollector) Len() int {
return len(hc.results)
}
func (hc *HeapCollector) Less(i, j int) bool {
so := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, hc.results[i], hc.results[j])
return -so < 0
2016-08-09 15:18:53 +02:00
}
func (hc *HeapCollector) Swap(i, j int) {
hc.results[i], hc.results[j] = hc.results[j], hc.results[i]
}
func (hc *HeapCollector) Push(x interface{}) {
hc.results = append(hc.results, x.(*search.DocumentMatch))
2016-08-09 15:18:53 +02:00
}
func (hc *HeapCollector) Pop() interface{} {
var rv *search.DocumentMatch
rv, hc.results = hc.results[len(hc.results)-1], hc.results[:len(hc.results)-1]
return rv
2016-08-09 15:18:53 +02:00
}