refactored data structure out of collector
the TopNCollector now can either use a heap or a list i did not code it to use an interface, because this is a very hot loop during searching. rather, it lets bleve developers easily toggle between the two (or other ideas) by changing 2 lines The list is faster in the benchmark, but causes more allocations. The list is once again the default (for now). To switch to the heap implementation, change: store *collectStoreList to store *collectStoreHeap and newStoreList(... to newStoreHeap(...
This commit is contained in:
parent
3f8757c05b
commit
47c239ca7b
|
@ -384,7 +384,7 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
|
|||
return nil, ErrorIndexClosed
|
||||
}
|
||||
|
||||
collector := collectors.NewHeapCollector(req.Size, req.From, req.Sort)
|
||||
collector := collectors.NewTopNCollector(req.Size, req.From, req.Sort)
|
||||
|
||||
// open a reader for this search
|
||||
indexReader, err := i.i.Reader()
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
// Copyright (c) 2014 Couchbase, Inc.
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
||||
// except in compliance with the License. You may obtain a copy of the License at
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the
|
||||
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||||
// either express or implied. See the License for the specific language governing permissions
|
||||
// and limitations under the License.
|
||||
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
|
||||
"github.com/blevesearch/bleve/search"
|
||||
)
|
||||
|
||||
type collectStoreHeap struct {
|
||||
heap search.DocumentMatchCollection
|
||||
compare collectorCompare
|
||||
}
|
||||
|
||||
func newStoreHeap(cap int, compare collectorCompare) *collectStoreHeap {
|
||||
rv := &collectStoreHeap{
|
||||
heap: make(search.DocumentMatchCollection, 0, cap),
|
||||
compare: compare,
|
||||
}
|
||||
heap.Init(rv)
|
||||
return rv
|
||||
}
|
||||
|
||||
func (c *collectStoreHeap) Add(doc *search.DocumentMatch) {
|
||||
heap.Push(c, doc)
|
||||
}
|
||||
|
||||
func (c *collectStoreHeap) RemoveLast() *search.DocumentMatch {
|
||||
return heap.Pop(c).(*search.DocumentMatch)
|
||||
}
|
||||
|
||||
func (c *collectStoreHeap) Final(skip int, fixup collectorFixup) (search.DocumentMatchCollection, error) {
|
||||
count := c.Len()
|
||||
size := count - skip
|
||||
rv := make(search.DocumentMatchCollection, size)
|
||||
for count > 0 {
|
||||
count--
|
||||
|
||||
if count >= skip {
|
||||
size--
|
||||
doc := heap.Pop(c).(*search.DocumentMatch)
|
||||
rv[size] = doc
|
||||
err := fixup(doc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
// heap interface implementation
|
||||
|
||||
func (c *collectStoreHeap) Len() int {
|
||||
return len(c.heap)
|
||||
}
|
||||
|
||||
func (c *collectStoreHeap) Less(i, j int) bool {
|
||||
so := c.compare(c.heap[i], c.heap[j])
|
||||
return -so < 0
|
||||
}
|
||||
|
||||
func (c *collectStoreHeap) Swap(i, j int) {
|
||||
c.heap[i], c.heap[j] = c.heap[j], c.heap[i]
|
||||
}
|
||||
|
||||
func (c *collectStoreHeap) Push(x interface{}) {
|
||||
c.heap = append(c.heap, x.(*search.DocumentMatch))
|
||||
}
|
||||
|
||||
func (c *collectStoreHeap) Pop() interface{} {
|
||||
var rv *search.DocumentMatch
|
||||
rv, c.heap = c.heap[len(c.heap)-1], c.heap[:len(c.heap)-1]
|
||||
return rv
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
// Copyright (c) 2014 Couchbase, Inc.
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
||||
// except in compliance with the License. You may obtain a copy of the License at
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the
|
||||
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||||
// either express or implied. See the License for the specific language governing permissions
|
||||
// and limitations under the License.
|
||||
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
|
||||
"github.com/blevesearch/bleve/search"
|
||||
)
|
||||
|
||||
type collectStoreList struct {
|
||||
results *list.List
|
||||
compare collectorCompare
|
||||
}
|
||||
|
||||
func newStoreList(cap int, compare collectorCompare) *collectStoreList {
|
||||
rv := &collectStoreList{
|
||||
results: list.New(),
|
||||
compare: compare,
|
||||
}
|
||||
|
||||
return rv
|
||||
}
|
||||
|
||||
func (c *collectStoreList) Add(doc *search.DocumentMatch) {
|
||||
for e := c.results.Front(); e != nil; e = e.Next() {
|
||||
curr := e.Value.(*search.DocumentMatch)
|
||||
if c.compare(doc, curr) >= 0 {
|
||||
c.results.InsertBefore(doc, e)
|
||||
return
|
||||
}
|
||||
}
|
||||
// if we got to the end, we still have to add it
|
||||
c.results.PushBack(doc)
|
||||
}
|
||||
|
||||
func (c *collectStoreList) RemoveLast() *search.DocumentMatch {
|
||||
return c.results.Remove(c.results.Front()).(*search.DocumentMatch)
|
||||
}
|
||||
|
||||
func (c *collectStoreList) Final(skip int, fixup collectorFixup) (search.DocumentMatchCollection, error) {
|
||||
if c.results.Len()-skip > 0 {
|
||||
rv := make(search.DocumentMatchCollection, c.results.Len()-skip)
|
||||
i := 0
|
||||
skipped := 0
|
||||
for e := c.results.Back(); e != nil; e = e.Prev() {
|
||||
if skipped < skip {
|
||||
skipped++
|
||||
continue
|
||||
}
|
||||
|
||||
rv[i] = e.Value.(*search.DocumentMatch)
|
||||
err := fixup(rv[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
i++
|
||||
}
|
||||
return rv, nil
|
||||
}
|
||||
return search.DocumentMatchCollection{}, nil
|
||||
}
|
||||
|
||||
func (c *collectStoreList) Len() int {
|
||||
return c.results.Len()
|
||||
}
|
|
@ -6,11 +6,10 @@
|
|||
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||||
// either express or implied. See the License for the specific language governing permissions
|
||||
// and limitations under the License.
|
||||
//
|
||||
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"time"
|
||||
|
||||
"github.com/blevesearch/bleve/index"
|
||||
|
@ -18,7 +17,12 @@ import (
|
|||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type HeapCollector struct {
|
||||
type collectorCompare func(i, j *search.DocumentMatch) int
|
||||
|
||||
type collectorFixup func(d *search.DocumentMatch) error
|
||||
|
||||
// TopNCollector collects the top N hits, optionally skipping some results
|
||||
type TopNCollector struct {
|
||||
size int
|
||||
skip int
|
||||
total uint64
|
||||
|
@ -28,6 +32,8 @@ type HeapCollector struct {
|
|||
results search.DocumentMatchCollection
|
||||
facetsBuilder *search.FacetsBuilder
|
||||
|
||||
store *collectStoreList
|
||||
|
||||
needDocIds bool
|
||||
neededFields []string
|
||||
cachedScoring []bool
|
||||
|
@ -36,14 +42,19 @@ type HeapCollector struct {
|
|||
lowestMatchOutsideResults *search.DocumentMatch
|
||||
}
|
||||
|
||||
var COLLECT_CHECK_DONE_EVERY = uint64(1024)
|
||||
// CheckDoneEvery controls how frequently we check the context deadline
|
||||
const CheckDoneEvery = uint64(1024)
|
||||
|
||||
func NewHeapCollector(size int, skip int, sort search.SortOrder) *HeapCollector {
|
||||
hc := &HeapCollector{size: size, skip: skip, sort: sort}
|
||||
// NewTopNCollector builds a collector to find the top 'size' hits
|
||||
// skipping over the first 'skip' hits
|
||||
// ordering hits by the provided sort order
|
||||
func NewTopNCollector(size int, skip int, sort search.SortOrder) *TopNCollector {
|
||||
hc := &TopNCollector{size: size, skip: skip, sort: sort}
|
||||
// pre-allocate space on the heap, we need size+skip results
|
||||
// +1 additional while figuring out which to evict
|
||||
hc.results = make(search.DocumentMatchCollection, 0, size+skip+1)
|
||||
heap.Init(hc)
|
||||
hc.store = newStoreList(size+skip+1, func(i, j *search.DocumentMatch) int {
|
||||
return hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, i, j)
|
||||
})
|
||||
|
||||
// these lookups traverse an interface, so do once up-front
|
||||
if sort.RequiresDocID() {
|
||||
|
@ -56,7 +67,8 @@ func NewHeapCollector(size int, skip int, sort search.SortOrder) *HeapCollector
|
|||
return hc
|
||||
}
|
||||
|
||||
func (hc *HeapCollector) Collect(ctx context.Context, searcher search.Searcher, reader index.IndexReader) error {
|
||||
// Collect goes to the index to find the matching documents
|
||||
func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher, reader index.IndexReader) error {
|
||||
startTime := time.Now()
|
||||
var err error
|
||||
var next *search.DocumentMatch
|
||||
|
@ -76,7 +88,7 @@ func (hc *HeapCollector) Collect(ctx context.Context, searcher search.Searcher,
|
|||
next, err = searcher.Next(searchContext)
|
||||
}
|
||||
for err == nil && next != nil {
|
||||
if hc.total%COLLECT_CHECK_DONE_EVERY == 0 {
|
||||
if hc.total%CheckDoneEvery == 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
|
@ -112,7 +124,7 @@ func (hc *HeapCollector) Collect(ctx context.Context, searcher search.Searcher,
|
|||
|
||||
var sortByScoreOpt = []string{"_score"}
|
||||
|
||||
func (hc *HeapCollector) collectSingle(ctx *search.SearchContext, reader index.IndexReader, d *search.DocumentMatch) error {
|
||||
func (hc *TopNCollector) collectSingle(ctx *search.SearchContext, reader index.IndexReader, d *search.DocumentMatch) error {
|
||||
// increment total hits
|
||||
hc.total++
|
||||
d.HitNumber = hc.total
|
||||
|
@ -166,9 +178,9 @@ func (hc *HeapCollector) collectSingle(ctx *search.SearchContext, reader index.I
|
|||
}
|
||||
}
|
||||
|
||||
heap.Push(hc, d)
|
||||
if hc.Len() > hc.size+hc.skip {
|
||||
removed := heap.Pop(hc).(*search.DocumentMatch)
|
||||
hc.store.Add(d)
|
||||
if hc.store.Len() > hc.size+hc.skip {
|
||||
removed := hc.store.RemoveLast()
|
||||
if hc.lowestMatchOutsideResults == nil {
|
||||
hc.lowestMatchOutsideResults = removed
|
||||
} else {
|
||||
|
@ -184,85 +196,55 @@ func (hc *HeapCollector) collectSingle(ctx *search.SearchContext, reader index.I
|
|||
return nil
|
||||
}
|
||||
|
||||
func (hc *HeapCollector) SetFacetsBuilder(facetsBuilder *search.FacetsBuilder) {
|
||||
// SetFacetsBuilder registers a facet builder for this collector
|
||||
func (hc *TopNCollector) SetFacetsBuilder(facetsBuilder *search.FacetsBuilder) {
|
||||
hc.facetsBuilder = facetsBuilder
|
||||
}
|
||||
|
||||
// finalizeResults starts with the heap containing the final top size+skip
|
||||
// it now throws away the results to be skipped
|
||||
// and does final doc id lookup (if necessary)
|
||||
func (hc *HeapCollector) finalizeResults(r index.IndexReader) error {
|
||||
count := hc.Len()
|
||||
size := count - hc.skip
|
||||
rv := make(search.DocumentMatchCollection, size)
|
||||
for count > 0 {
|
||||
count--
|
||||
|
||||
if count >= hc.skip {
|
||||
size--
|
||||
doc := heap.Pop(hc).(*search.DocumentMatch)
|
||||
rv[size] = doc
|
||||
if doc.ID == "" {
|
||||
// look up the id since we need it for lookup
|
||||
var err error
|
||||
doc.ID, err = r.FinalizeDocID(doc.IndexInternalID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
func (hc *TopNCollector) finalizeResults(r index.IndexReader) error {
|
||||
var err error
|
||||
hc.results, err = hc.store.Final(hc.skip, func(doc *search.DocumentMatch) error {
|
||||
if doc.ID == "" {
|
||||
// look up the id since we need it for lookup
|
||||
var err error
|
||||
doc.ID, err = r.FinalizeDocID(doc.IndexInternalID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// no longer a heap
|
||||
hc.results = rv
|
||||
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (hc *HeapCollector) Results() search.DocumentMatchCollection {
|
||||
// Results returns the collected hits
|
||||
func (hc *TopNCollector) Results() search.DocumentMatchCollection {
|
||||
return hc.results
|
||||
}
|
||||
|
||||
func (hc *HeapCollector) Total() uint64 {
|
||||
// Total returns the total number of hits
|
||||
func (hc *TopNCollector) Total() uint64 {
|
||||
return hc.total
|
||||
}
|
||||
|
||||
func (hc *HeapCollector) MaxScore() float64 {
|
||||
// MaxScore returns the maximum score seen across all the hits
|
||||
func (hc *TopNCollector) MaxScore() float64 {
|
||||
return hc.maxScore
|
||||
}
|
||||
|
||||
func (hc *HeapCollector) Took() time.Duration {
|
||||
// Took returns the time spent collecting hits
|
||||
func (hc *TopNCollector) Took() time.Duration {
|
||||
return hc.took
|
||||
}
|
||||
|
||||
func (hc *HeapCollector) FacetResults() search.FacetResults {
|
||||
// FacetResults returns the computed facets results
|
||||
func (hc *TopNCollector) FacetResults() search.FacetResults {
|
||||
if hc.facetsBuilder != nil {
|
||||
return hc.facetsBuilder.Results()
|
||||
}
|
||||
return search.FacetResults{}
|
||||
}
|
||||
|
||||
// heap interface implementation
|
||||
|
||||
func (hc *HeapCollector) Len() int {
|
||||
return len(hc.results)
|
||||
}
|
||||
|
||||
func (hc *HeapCollector) Less(i, j int) bool {
|
||||
so := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, hc.results[i], hc.results[j])
|
||||
return -so < 0
|
||||
}
|
||||
|
||||
func (hc *HeapCollector) Swap(i, j int) {
|
||||
hc.results[i], hc.results[j] = hc.results[j], hc.results[i]
|
||||
}
|
||||
|
||||
func (hc *HeapCollector) Push(x interface{}) {
|
||||
hc.results = append(hc.results, x.(*search.DocumentMatch))
|
||||
}
|
||||
|
||||
func (hc *HeapCollector) Pop() interface{} {
|
||||
var rv *search.DocumentMatch
|
||||
rv, hc.results = hc.results[len(hc.results)-1], hc.results[:len(hc.results)-1]
|
||||
return rv
|
||||
}
|
|
@ -84,7 +84,7 @@ func TestTop10Scores(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
collector := NewHeapCollector(10, 0, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
collector := NewTopNCollector(10, 0, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
err := collector.Collect(context.Background(), searcher, &stubReader{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -103,6 +103,7 @@ func TestTop10Scores(t *testing.T) {
|
|||
results := collector.Results()
|
||||
|
||||
if len(results) != 10 {
|
||||
t.Logf("results: %v", results)
|
||||
t.Fatalf("expected 10 results, got %d", len(results))
|
||||
}
|
||||
|
||||
|
@ -192,7 +193,7 @@ func TestTop10ScoresSkip10(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
collector := NewHeapCollector(10, 10, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
collector := NewTopNCollector(10, 10, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
err := collector.Collect(context.Background(), searcher, &stubReader{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -289,7 +290,7 @@ func TestPaginationSameScores(t *testing.T) {
|
|||
}
|
||||
|
||||
// first get first 5 hits
|
||||
collector := NewHeapCollector(5, 0, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
collector := NewTopNCollector(5, 0, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
err := collector.Collect(context.Background(), searcher, &stubReader{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -375,7 +376,7 @@ func TestPaginationSameScores(t *testing.T) {
|
|||
}
|
||||
|
||||
// now get next 5 hits
|
||||
collector = NewHeapCollector(5, 5, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
collector = NewTopNCollector(5, 5, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
err = collector.Collect(context.Background(), searcher, &stubReader{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -402,24 +403,24 @@ func TestPaginationSameScores(t *testing.T) {
|
|||
|
||||
func BenchmarkTop10of100000Scores(b *testing.B) {
|
||||
benchHelper(10000, func() search.Collector {
|
||||
return NewHeapCollector(10, 0, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
return NewTopNCollector(10, 0, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
}, b)
|
||||
}
|
||||
|
||||
func BenchmarkTop100of100000Scores(b *testing.B) {
|
||||
benchHelper(10000, func() search.Collector {
|
||||
return NewHeapCollector(100, 0, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
return NewTopNCollector(100, 0, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
}, b)
|
||||
}
|
||||
|
||||
func BenchmarkTop10of1000000Scores(b *testing.B) {
|
||||
benchHelper(100000, func() search.Collector {
|
||||
return NewHeapCollector(10, 0, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
return NewTopNCollector(10, 0, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
}, b)
|
||||
}
|
||||
|
||||
func BenchmarkTop100of1000000Scores(b *testing.B) {
|
||||
benchHelper(100000, func() search.Collector {
|
||||
return NewHeapCollector(100, 0, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
return NewTopNCollector(100, 0, search.SortOrder{&search.SortScore{Desc: true}})
|
||||
}, b)
|
||||
}
|
Loading…
Reference in New Issue