From 7ae696d66129f7c362143c6327483c3029a9adf9 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 30 Dec 2015 20:43:31 -0800 Subject: [PATCH] firestorm lookuper notified via batch Previously, the firestorm.Batch() would notify the lookuper goroutine on a document by document basis. If the lookuper input channel became full, then that would block the firestorm.Batch() operation. With this change, lookuper is notified once, with a "batch" that is an []*InFlightItem. This change also reuses that same []*InFlightItem to invoke the compensator.MutateBatch(). This also has the advantage of only converting the docID's from string to []byte just once, outside of the lock that's used by the compensator. Micro-benchmark of this change with null-firestorm bleve-blast does not show large impact, neither degradation or improvement. --- index/firestorm/comp.go | 13 ++++-------- index/firestorm/firestorm.go | 25 +++++++++++++--------- index/firestorm/lookup.go | 39 +++++++++++++++++----------------- index/firestorm/lookup_test.go | 2 +- 4 files changed, 40 insertions(+), 39 deletions(-) diff --git a/index/firestorm/comp.go b/index/firestorm/comp.go index bd0bfb01..a4138823 100644 --- a/index/firestorm/comp.go +++ b/index/firestorm/comp.go @@ -15,7 +15,6 @@ import ( "sort" "sync" - "github.com/blevesearch/bleve/document" "github.com/steveyen/gtreap" "github.com/willf/bitset" ) @@ -80,17 +79,13 @@ func (c *Compensator) Mutate(docID []byte, docNum uint64) { } } -func (c *Compensator) MutateBatch(docs map[string]*document.Document, docNum uint64) { +func (c *Compensator) MutateBatch(inflightItems []*InFlightItem, lastDocNum uint64) { c.inFlightMutex.Lock() defer c.inFlightMutex.Unlock() - for docID, doc := range docs { - if doc != nil { - c.inFlight = c.inFlight.Upsert(&InFlightItem{docID: []byte(docID), docNum: doc.Number}, rand.Int()) - } else { - c.inFlight = c.inFlight.Upsert(&InFlightItem{docID: []byte(docID), docNum: 0}, rand.Int()) - } + for _, item := range inflightItems { + c.inFlight = c.inFlight.Upsert(item, rand.Int()) } - c.maxRead = docNum + c.maxRead = lastDocNum } func (c *Compensator) Migrate(docID []byte, docNum uint64, oldDocNums []uint64) { diff --git a/index/firestorm/firestorm.go b/index/firestorm/firestorm.go index b73d0fe0..662646ca 100644 --- a/index/firestorm/firestorm.go +++ b/index/firestorm/firestorm.go @@ -175,7 +175,7 @@ func (f *Firestorm) Update(doc *document.Document) (err error) { } f.compensator.Mutate([]byte(doc.ID), doc.Number) - f.lookuper.Notify(doc.Number, []byte(doc.ID)) + f.lookuper.NotifyBatch([]*InFlightItem{&InFlightItem{[]byte(doc.ID), doc.Number}}) f.dictUpdater.NotifyBatch(dictionaryDeltas) atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart))) @@ -185,7 +185,7 @@ func (f *Firestorm) Update(doc *document.Document) (err error) { func (f *Firestorm) Delete(id string) error { indexStart := time.Now() f.compensator.Mutate([]byte(id), 0) - f.lookuper.Notify(0, []byte(id)) + f.lookuper.NotifyBatch([]*InFlightItem{&InFlightItem{[]byte(id), 0}}) atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart))) return nil } @@ -322,6 +322,17 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { } } + inflightItems := make([]*InFlightItem, 0, len(batch.IndexOps)) + for docID, doc := range batch.IndexOps { + if doc != nil { + inflightItems = append(inflightItems, + &InFlightItem{[]byte(docID), doc.Number}) + } else { + inflightItems = append(inflightItems, + &InFlightItem{[]byte(docID), 0}) + } + } + indexStart := time.Now() // start a writer for this batch var kvwriter store.KVWriter @@ -338,14 +349,8 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { return } - f.compensator.MutateBatch(batch.IndexOps, lastDocNumber) - for docID, doc := range batch.IndexOps { - if doc != nil { - f.lookuper.Notify(doc.Number, []byte(doc.ID)) - } else { - f.lookuper.Notify(0, []byte(docID)) - } - } + f.compensator.MutateBatch(inflightItems, lastDocNumber) + f.lookuper.NotifyBatch(inflightItems) f.dictUpdater.NotifyBatch(dictionaryDeltas) err = kvwriter.Close() diff --git a/index/firestorm/lookup.go b/index/firestorm/lookup.go index c07ea525..0964f29d 100644 --- a/index/firestorm/lookup.go +++ b/index/firestorm/lookup.go @@ -18,14 +18,9 @@ import ( const channelBufferSize = 1000 -type lookupTask struct { - docID []byte - docNum uint64 -} - type Lookuper struct { f *Firestorm - workChan chan *lookupTask + workChan chan []*InFlightItem quit chan struct{} closeWait sync.WaitGroup @@ -36,15 +31,15 @@ type Lookuper struct { func NewLookuper(f *Firestorm) *Lookuper { rv := Lookuper{ f: f, - workChan: make(chan *lookupTask, channelBufferSize), + workChan: make(chan []*InFlightItem, channelBufferSize), quit: make(chan struct{}), } return &rv } -func (l *Lookuper) Notify(docNum uint64, docID []byte) { +func (l *Lookuper) NotifyBatch(items []*InFlightItem) { atomic.AddUint64(&l.tasksQueued, 1) - l.workChan <- &lookupTask{docID: docID, docNum: docNum} + l.workChan <- items } func (l *Lookuper) Start() { @@ -65,17 +60,24 @@ func (l *Lookuper) run() { logger.Printf("lookuper asked to quit") l.closeWait.Done() return - case task, ok := <-l.workChan: + case items, ok := <-l.workChan: if !ok { logger.Printf("lookuper work channel closed unexpectedly, stopping") return } - l.lookup(task) + l.lookupItems(items) } } } -func (l *Lookuper) lookup(task *lookupTask) { +func (l *Lookuper) lookupItems(items []*InFlightItem) { + for _, item := range items { + l.lookup(item) + } + atomic.AddUint64(&l.tasksDone, 1) +} + +func (l *Lookuper) lookup(item *InFlightItem) { reader, err := l.f.store.Reader() if err != nil { logger.Printf("lookuper fatal: %v", err) @@ -87,7 +89,7 @@ func (l *Lookuper) lookup(task *lookupTask) { } }() - prefix := TermFreqPrefixFieldTermDocId(0, nil, task.docID) + prefix := TermFreqPrefixFieldTermDocId(0, nil, item.docID) logger.Printf("lookuper prefix - % x", prefix) docNums := make(DocNumberList, 0) err = visitPrefix(reader, prefix, func(key, val []byte) (bool, error) { @@ -106,20 +108,19 @@ func (l *Lookuper) lookup(task *lookupTask) { } oldDocNums := make(DocNumberList, 0, len(docNums)) for _, docNum := range docNums { - if task.docNum == 0 || docNum < task.docNum { + if item.docNum == 0 || docNum < item.docNum { oldDocNums = append(oldDocNums, docNum) } } - logger.Printf("lookup migrating '%s' - %d - oldDocNums: %v", task.docID, task.docNum, oldDocNums) - l.f.compensator.Migrate(task.docID, task.docNum, oldDocNums) - if len(oldDocNums) == 0 && task.docNum != 0 { + logger.Printf("lookup migrating '%s' - %d - oldDocNums: %v", item.docID, item.docNum, oldDocNums) + l.f.compensator.Migrate(item.docID, item.docNum, oldDocNums) + if len(oldDocNums) == 0 && item.docNum != 0 { // this was an add, not an update atomic.AddUint64(l.f.docCount, 1) - } else if len(oldDocNums) > 0 && task.docNum == 0 { + } else if len(oldDocNums) > 0 && item.docNum == 0 { // this was a delete (and it previously existed) atomic.AddUint64(l.f.docCount, ^uint64(0)) } - atomic.AddUint64(&l.tasksDone, 1) } // this is not intended to be used publicly, only for unit tests diff --git a/index/firestorm/lookup_test.go b/index/firestorm/lookup_test.go index 8097ba8a..bc33828f 100644 --- a/index/firestorm/lookup_test.go +++ b/index/firestorm/lookup_test.go @@ -62,7 +62,7 @@ func TestLookups(t *testing.T) { if val == nil { t.Errorf("expected key: % x to be in the inflight list", tfr.DocID()) } - f.(*Firestorm).lookuper.lookup(&lookupTask{docID: tfr.DocID(), docNum: tfr.DocNum()}) + f.(*Firestorm).lookuper.lookup(&InFlightItem{docID: tfr.DocID(), docNum: tfr.DocNum()}) // now expect this mutation to NOT be in the in-flight list val = f.(*Firestorm).compensator.inFlight.Get(&InFlightItem{docID: tfr.DocID()}) if val != nil {