2016-08-10 10:13:38 +02:00
|
|
|
// Copyright (c) 2014 Couchbase, Inc.
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
|
|
|
// except in compliance with the License. You may obtain a copy of the License at
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the
|
|
|
|
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
|
|
|
// either express or implied. See the License for the specific language governing permissions
|
|
|
|
// and limitations under the License.
|
|
|
|
//
|
2016-08-09 15:18:53 +02:00
|
|
|
package collectors
|
|
|
|
|
|
|
|
import (
|
|
|
|
"container/heap"
|
2016-08-10 10:13:38 +02:00
|
|
|
"time"
|
|
|
|
|
2016-08-09 15:18:53 +02:00
|
|
|
"github.com/blevesearch/bleve/index"
|
|
|
|
"github.com/blevesearch/bleve/search"
|
|
|
|
"golang.org/x/net/context"
|
|
|
|
)
|
|
|
|
|
|
|
|
type HeapCollector struct {
|
|
|
|
size int
|
|
|
|
skip int
|
|
|
|
total uint64
|
2016-08-12 20:23:55 +02:00
|
|
|
maxScore float64
|
2016-08-09 15:18:53 +02:00
|
|
|
took time.Duration
|
|
|
|
sort search.SortOrder
|
2016-08-12 20:23:55 +02:00
|
|
|
results search.DocumentMatchCollection
|
2016-08-09 15:18:53 +02:00
|
|
|
facetsBuilder *search.FacetsBuilder
|
2016-08-12 20:23:55 +02:00
|
|
|
|
2016-08-25 22:24:26 +02:00
|
|
|
needDocIds bool
|
|
|
|
neededFields []string
|
|
|
|
cachedScoring []bool
|
|
|
|
cachedDesc []bool
|
2016-08-25 01:02:22 +02:00
|
|
|
|
2016-08-12 20:23:55 +02:00
|
|
|
lowestMatchOutsideResults *search.DocumentMatch
|
2016-08-09 15:18:53 +02:00
|
|
|
}
|
|
|
|
|
2016-08-10 10:13:38 +02:00
|
|
|
var COLLECT_CHECK_DONE_EVERY = uint64(1024)
|
|
|
|
|
2016-08-12 20:23:55 +02:00
|
|
|
func NewHeapCollector(size int, skip int, sort search.SortOrder) *HeapCollector {
|
|
|
|
hc := &HeapCollector{size: size, skip: skip, sort: sort}
|
improved implementation to address perf regressions
primary change is going back to sort values be []string
and not []interface{}, this avoid allocatiosn converting
into the interface{}
that sounds obvious, so why didn't we just do that first?
because a common (default) sort is score, which is naturally
a number, not a string (like terms). converting into the
number was also expensive, and the common case.
so, this solution also makes the change to NOT put the score
into the sort value list. instead you see the dummy value
"_score". this is just a placeholder, the actual sort impl
knows that field of the sort is the score, and will sort
using the actual score.
also, several other aspets of the benchmark were cleaned up
so that unnecessary allocations do not pollute the cpu profiles
Here are the updated benchmarks:
$ go test -run=xxx -bench=. -benchmem -cpuprofile=cpu.out
BenchmarkTop10of100000Scores-4 3000 465809 ns/op 2548 B/op 33 allocs/op
BenchmarkTop100of100000Scores-4 2000 626488 ns/op 21484 B/op 213 allocs/op
BenchmarkTop10of1000000Scores-4 300 5107658 ns/op 2560 B/op 33 allocs/op
BenchmarkTop100of1000000Scores-4 300 5275403 ns/op 21624 B/op 213 allocs/op
PASS
ok github.com/blevesearch/bleve/search/collectors 7.188s
Prior to this PR, master reported:
$ go test -run=xxx -bench=. -benchmem
BenchmarkTop10of100000Scores-4 3000 453269 ns/op 360161 B/op 42 allocs/op
BenchmarkTop100of100000Scores-4 2000 519131 ns/op 388275 B/op 219 allocs/op
BenchmarkTop10of1000000Scores-4 200 7459004 ns/op 4628236 B/op 52 allocs/op
BenchmarkTop100of1000000Scores-4 200 8064864 ns/op 4656596 B/op 232 allocs/op
PASS
ok github.com/blevesearch/bleve/search/collectors 7.385s
So, we're pretty close on the smaller datasets, and we scale better on the larger datasets.
We also show fewer allocations and bytes in all cases (some of this is artificial due to test cleanup).
2016-08-25 21:47:07 +02:00
|
|
|
// pre-allocate space on the heap, we need size+skip results
|
|
|
|
// +1 additional while figuring out which to evict
|
|
|
|
hc.results = make(search.DocumentMatchCollection, 0, size+skip+1)
|
2016-08-09 15:18:53 +02:00
|
|
|
heap.Init(hc)
|
2016-08-25 01:02:22 +02:00
|
|
|
|
|
|
|
// these lookups traverse an interface, so do once up-front
|
|
|
|
if sort.RequiresDocID() {
|
|
|
|
hc.needDocIds = true
|
|
|
|
}
|
|
|
|
hc.neededFields = sort.RequiredFields()
|
2016-08-25 22:24:26 +02:00
|
|
|
hc.cachedScoring = sort.CacheIsScore()
|
|
|
|
hc.cachedDesc = sort.CacheDescending()
|
2016-08-25 01:02:22 +02:00
|
|
|
|
2016-08-09 15:18:53 +02:00
|
|
|
return hc
|
|
|
|
}
|
|
|
|
|
2016-08-12 20:23:55 +02:00
|
|
|
func (hc *HeapCollector) Collect(ctx context.Context, searcher search.Searcher, reader index.IndexReader) error {
|
2016-08-09 15:18:53 +02:00
|
|
|
startTime := time.Now()
|
|
|
|
var err error
|
|
|
|
var next *search.DocumentMatch
|
2016-08-12 20:23:55 +02:00
|
|
|
|
|
|
|
// search context with enough pre-allocated document matches
|
|
|
|
// we keep references to size+skip ourselves
|
|
|
|
// plus possibly one extra for the highestMatchOutsideResults
|
|
|
|
// plus the amount required by the searcher tree
|
|
|
|
searchContext := &search.SearchContext{
|
2016-08-25 01:02:22 +02:00
|
|
|
DocumentMatchPool: search.NewDocumentMatchPool(hc.size+hc.skip+1+searcher.DocumentMatchPoolSize(), len(hc.sort)),
|
2016-08-12 20:23:55 +02:00
|
|
|
}
|
|
|
|
|
2016-08-09 15:18:53 +02:00
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
2016-08-12 20:23:55 +02:00
|
|
|
next, err = searcher.Next(searchContext)
|
2016-08-09 15:18:53 +02:00
|
|
|
}
|
|
|
|
for err == nil && next != nil {
|
|
|
|
if hc.total%COLLECT_CHECK_DONE_EVERY == 0 {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if hc.facetsBuilder != nil {
|
|
|
|
err = hc.facetsBuilder.Update(next)
|
|
|
|
if err != nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
2016-08-12 20:23:55 +02:00
|
|
|
|
|
|
|
err = hc.collectSingle(searchContext, reader, next)
|
|
|
|
if err != nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
next, err = searcher.Next(searchContext)
|
2016-08-09 15:18:53 +02:00
|
|
|
}
|
|
|
|
// compute search duration
|
|
|
|
hc.took = time.Since(startTime)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-08-12 20:23:55 +02:00
|
|
|
// finalize actual results
|
|
|
|
err = hc.finalizeResults(reader)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-08-09 15:18:53 +02:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-08-12 20:23:55 +02:00
|
|
|
func (hc *HeapCollector) collectSingle(ctx *search.SearchContext, reader index.IndexReader, d *search.DocumentMatch) error {
|
2016-08-09 15:18:53 +02:00
|
|
|
// increment total hits
|
|
|
|
hc.total++
|
2016-08-12 20:23:55 +02:00
|
|
|
d.HitNumber = hc.total
|
|
|
|
|
|
|
|
// update max score
|
|
|
|
if d.Score > hc.maxScore {
|
|
|
|
hc.maxScore = d.Score
|
|
|
|
}
|
|
|
|
|
2016-08-09 15:18:53 +02:00
|
|
|
var err error
|
2016-08-12 20:23:55 +02:00
|
|
|
// see if we need to load ID (at this early stage, for example to sort on it)
|
2016-08-25 01:02:22 +02:00
|
|
|
if hc.needDocIds {
|
2016-08-12 20:23:55 +02:00
|
|
|
d.ID, err = reader.FinalizeDocID(d.IndexInternalID)
|
2016-08-09 15:18:53 +02:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2016-08-12 20:23:55 +02:00
|
|
|
|
|
|
|
// see if we need to load the stored fields
|
2016-08-25 01:02:22 +02:00
|
|
|
if len(hc.neededFields) > 0 {
|
2016-08-17 18:20:12 +02:00
|
|
|
// find out which fields haven't been loaded yet
|
2016-08-25 01:02:22 +02:00
|
|
|
fieldsToLoad := d.CachedFieldTerms.FieldsNotYetCached(hc.neededFields)
|
2016-08-17 18:20:12 +02:00
|
|
|
// look them up
|
|
|
|
fieldTerms, err := reader.DocumentFieldTerms(d.IndexInternalID, fieldsToLoad)
|
2016-08-12 20:23:55 +02:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-08-17 18:20:12 +02:00
|
|
|
// cache these as well
|
|
|
|
if d.CachedFieldTerms == nil {
|
|
|
|
d.CachedFieldTerms = make(map[string][]string)
|
|
|
|
}
|
|
|
|
d.CachedFieldTerms.Merge(fieldTerms)
|
2016-08-12 20:23:55 +02:00
|
|
|
}
|
|
|
|
|
2016-08-24 20:07:10 +02:00
|
|
|
// compute this hits sort value
|
2016-08-25 01:02:22 +02:00
|
|
|
hc.sort.Value(d)
|
2016-08-24 20:07:10 +02:00
|
|
|
|
2016-08-12 20:23:55 +02:00
|
|
|
// optimization, we track lowest sorting hit already removed from heap
|
|
|
|
// with this one comparision, we can avoid all heap operations if
|
|
|
|
// this hit would have been added and then immediately removed
|
|
|
|
if hc.lowestMatchOutsideResults != nil {
|
2016-08-25 22:24:26 +02:00
|
|
|
cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, d, hc.lowestMatchOutsideResults)
|
2016-08-12 20:23:55 +02:00
|
|
|
if cmp >= 0 {
|
|
|
|
// this hit can't possibly be in the result set, so avoid heap ops
|
|
|
|
ctx.DocumentMatchPool.Put(d)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
heap.Push(hc, d)
|
2016-08-10 10:13:38 +02:00
|
|
|
if hc.Len() > hc.size+hc.skip {
|
2016-08-12 20:23:55 +02:00
|
|
|
removed := heap.Pop(hc).(*search.DocumentMatch)
|
|
|
|
if hc.lowestMatchOutsideResults == nil {
|
|
|
|
hc.lowestMatchOutsideResults = removed
|
|
|
|
} else {
|
2016-08-25 22:24:26 +02:00
|
|
|
cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, removed, hc.lowestMatchOutsideResults)
|
2016-08-12 20:23:55 +02:00
|
|
|
if cmp < 0 {
|
|
|
|
tmp := hc.lowestMatchOutsideResults
|
|
|
|
hc.lowestMatchOutsideResults = removed
|
|
|
|
ctx.DocumentMatchPool.Put(tmp)
|
|
|
|
}
|
|
|
|
}
|
2016-08-10 10:13:38 +02:00
|
|
|
}
|
2016-08-12 20:23:55 +02:00
|
|
|
|
2016-08-09 15:18:53 +02:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (hc *HeapCollector) SetFacetsBuilder(facetsBuilder *search.FacetsBuilder) {
|
|
|
|
hc.facetsBuilder = facetsBuilder
|
|
|
|
}
|
|
|
|
|
2016-08-12 20:23:55 +02:00
|
|
|
// finalizeResults starts with the heap containing the final top size+skip
|
|
|
|
// it now throws away the results to be skipped
|
|
|
|
// and does final doc id lookup (if necessary)
|
|
|
|
func (hc *HeapCollector) finalizeResults(r index.IndexReader) error {
|
2016-08-10 10:13:38 +02:00
|
|
|
count := hc.Len()
|
|
|
|
size := count - hc.skip
|
|
|
|
rv := make(search.DocumentMatchCollection, size)
|
|
|
|
for count > 0 {
|
|
|
|
count--
|
2016-08-12 20:23:55 +02:00
|
|
|
|
2016-08-10 10:13:38 +02:00
|
|
|
if count >= hc.skip {
|
|
|
|
size--
|
2016-08-12 20:23:55 +02:00
|
|
|
doc := heap.Pop(hc).(*search.DocumentMatch)
|
|
|
|
rv[size] = doc
|
|
|
|
if doc.ID == "" {
|
|
|
|
// look up the id since we need it for lookup
|
|
|
|
var err error
|
|
|
|
doc.ID, err = r.FinalizeDocID(doc.IndexInternalID)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2016-08-10 10:13:38 +02:00
|
|
|
}
|
2016-08-09 15:18:53 +02:00
|
|
|
}
|
2016-08-12 20:23:55 +02:00
|
|
|
|
|
|
|
// no longer a heap
|
|
|
|
hc.results = rv
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (hc *HeapCollector) Results() search.DocumentMatchCollection {
|
|
|
|
return hc.results
|
2016-08-09 15:18:53 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (hc *HeapCollector) Total() uint64 {
|
|
|
|
return hc.total
|
|
|
|
}
|
|
|
|
|
|
|
|
func (hc *HeapCollector) MaxScore() float64 {
|
2016-08-12 20:23:55 +02:00
|
|
|
return hc.maxScore
|
2016-08-10 10:13:38 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (hc *HeapCollector) Took() time.Duration {
|
|
|
|
return hc.took
|
2016-08-09 15:18:53 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (hc *HeapCollector) FacetResults() search.FacetResults {
|
|
|
|
if hc.facetsBuilder != nil {
|
|
|
|
return hc.facetsBuilder.Results()
|
|
|
|
}
|
|
|
|
return search.FacetResults{}
|
|
|
|
}
|
|
|
|
|
2016-08-12 20:23:55 +02:00
|
|
|
// heap interface implementation
|
|
|
|
|
2016-08-09 15:18:53 +02:00
|
|
|
func (hc *HeapCollector) Len() int {
|
|
|
|
return len(hc.results)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (hc *HeapCollector) Less(i, j int) bool {
|
2016-08-25 22:24:26 +02:00
|
|
|
so := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, hc.results[i], hc.results[j])
|
2016-08-12 20:23:55 +02:00
|
|
|
return -so < 0
|
2016-08-09 15:18:53 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (hc *HeapCollector) Swap(i, j int) {
|
|
|
|
hc.results[i], hc.results[j] = hc.results[j], hc.results[i]
|
|
|
|
}
|
|
|
|
|
|
|
|
func (hc *HeapCollector) Push(x interface{}) {
|
2016-08-12 20:23:55 +02:00
|
|
|
hc.results = append(hc.results, x.(*search.DocumentMatch))
|
2016-08-09 15:18:53 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (hc *HeapCollector) Pop() interface{} {
|
2016-08-12 20:23:55 +02:00
|
|
|
var rv *search.DocumentMatch
|
|
|
|
rv, hc.results = hc.results[len(hc.results)-1], hc.results[:len(hc.results)-1]
|
|
|
|
return rv
|
2016-08-09 15:18:53 +02:00
|
|
|
}
|