0
0

firestorm lookuper notified via batch

Previously, the firestorm.Batch() would notify the lookuper goroutine
on a document by document basis.  If the lookuper input channel became
full, then that would block the firestorm.Batch() operation.

With this change, lookuper is notified once, with a "batch" that is an
[]*InFlightItem.

This change also reuses that same []*InFlightItem to invoke the
compensator.MutateBatch().

This also has the advantage of only converting the docID's from string
to []byte just once, outside of the lock that's used by the
compensator.

Micro-benchmark of this change with null-firestorm bleve-blast does
not show large impact, neither degradation or improvement.
This commit is contained in:
Steve Yen 2015-12-30 20:43:31 -08:00
parent 38d50ed8b5
commit 7ae696d661
4 changed files with 40 additions and 39 deletions

View File

@ -15,7 +15,6 @@ import (
"sort" "sort"
"sync" "sync"
"github.com/blevesearch/bleve/document"
"github.com/steveyen/gtreap" "github.com/steveyen/gtreap"
"github.com/willf/bitset" "github.com/willf/bitset"
) )
@ -80,17 +79,13 @@ func (c *Compensator) Mutate(docID []byte, docNum uint64) {
} }
} }
func (c *Compensator) MutateBatch(docs map[string]*document.Document, docNum uint64) { func (c *Compensator) MutateBatch(inflightItems []*InFlightItem, lastDocNum uint64) {
c.inFlightMutex.Lock() c.inFlightMutex.Lock()
defer c.inFlightMutex.Unlock() defer c.inFlightMutex.Unlock()
for docID, doc := range docs { for _, item := range inflightItems {
if doc != nil { c.inFlight = c.inFlight.Upsert(item, rand.Int())
c.inFlight = c.inFlight.Upsert(&InFlightItem{docID: []byte(docID), docNum: doc.Number}, rand.Int())
} else {
c.inFlight = c.inFlight.Upsert(&InFlightItem{docID: []byte(docID), docNum: 0}, rand.Int())
}
} }
c.maxRead = docNum c.maxRead = lastDocNum
} }
func (c *Compensator) Migrate(docID []byte, docNum uint64, oldDocNums []uint64) { func (c *Compensator) Migrate(docID []byte, docNum uint64, oldDocNums []uint64) {

View File

@ -175,7 +175,7 @@ func (f *Firestorm) Update(doc *document.Document) (err error) {
} }
f.compensator.Mutate([]byte(doc.ID), doc.Number) f.compensator.Mutate([]byte(doc.ID), doc.Number)
f.lookuper.Notify(doc.Number, []byte(doc.ID)) f.lookuper.NotifyBatch([]*InFlightItem{&InFlightItem{[]byte(doc.ID), doc.Number}})
f.dictUpdater.NotifyBatch(dictionaryDeltas) f.dictUpdater.NotifyBatch(dictionaryDeltas)
atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart))) atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart)))
@ -185,7 +185,7 @@ func (f *Firestorm) Update(doc *document.Document) (err error) {
func (f *Firestorm) Delete(id string) error { func (f *Firestorm) Delete(id string) error {
indexStart := time.Now() indexStart := time.Now()
f.compensator.Mutate([]byte(id), 0) f.compensator.Mutate([]byte(id), 0)
f.lookuper.Notify(0, []byte(id)) f.lookuper.NotifyBatch([]*InFlightItem{&InFlightItem{[]byte(id), 0}})
atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart))) atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart)))
return nil return nil
} }
@ -322,6 +322,17 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) {
} }
} }
inflightItems := make([]*InFlightItem, 0, len(batch.IndexOps))
for docID, doc := range batch.IndexOps {
if doc != nil {
inflightItems = append(inflightItems,
&InFlightItem{[]byte(docID), doc.Number})
} else {
inflightItems = append(inflightItems,
&InFlightItem{[]byte(docID), 0})
}
}
indexStart := time.Now() indexStart := time.Now()
// start a writer for this batch // start a writer for this batch
var kvwriter store.KVWriter var kvwriter store.KVWriter
@ -338,14 +349,8 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) {
return return
} }
f.compensator.MutateBatch(batch.IndexOps, lastDocNumber) f.compensator.MutateBatch(inflightItems, lastDocNumber)
for docID, doc := range batch.IndexOps { f.lookuper.NotifyBatch(inflightItems)
if doc != nil {
f.lookuper.Notify(doc.Number, []byte(doc.ID))
} else {
f.lookuper.Notify(0, []byte(docID))
}
}
f.dictUpdater.NotifyBatch(dictionaryDeltas) f.dictUpdater.NotifyBatch(dictionaryDeltas)
err = kvwriter.Close() err = kvwriter.Close()

View File

@ -18,14 +18,9 @@ import (
const channelBufferSize = 1000 const channelBufferSize = 1000
type lookupTask struct {
docID []byte
docNum uint64
}
type Lookuper struct { type Lookuper struct {
f *Firestorm f *Firestorm
workChan chan *lookupTask workChan chan []*InFlightItem
quit chan struct{} quit chan struct{}
closeWait sync.WaitGroup closeWait sync.WaitGroup
@ -36,15 +31,15 @@ type Lookuper struct {
func NewLookuper(f *Firestorm) *Lookuper { func NewLookuper(f *Firestorm) *Lookuper {
rv := Lookuper{ rv := Lookuper{
f: f, f: f,
workChan: make(chan *lookupTask, channelBufferSize), workChan: make(chan []*InFlightItem, channelBufferSize),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
return &rv return &rv
} }
func (l *Lookuper) Notify(docNum uint64, docID []byte) { func (l *Lookuper) NotifyBatch(items []*InFlightItem) {
atomic.AddUint64(&l.tasksQueued, 1) atomic.AddUint64(&l.tasksQueued, 1)
l.workChan <- &lookupTask{docID: docID, docNum: docNum} l.workChan <- items
} }
func (l *Lookuper) Start() { func (l *Lookuper) Start() {
@ -65,17 +60,24 @@ func (l *Lookuper) run() {
logger.Printf("lookuper asked to quit") logger.Printf("lookuper asked to quit")
l.closeWait.Done() l.closeWait.Done()
return return
case task, ok := <-l.workChan: case items, ok := <-l.workChan:
if !ok { if !ok {
logger.Printf("lookuper work channel closed unexpectedly, stopping") logger.Printf("lookuper work channel closed unexpectedly, stopping")
return return
} }
l.lookup(task) l.lookupItems(items)
} }
} }
} }
func (l *Lookuper) lookup(task *lookupTask) { func (l *Lookuper) lookupItems(items []*InFlightItem) {
for _, item := range items {
l.lookup(item)
}
atomic.AddUint64(&l.tasksDone, 1)
}
func (l *Lookuper) lookup(item *InFlightItem) {
reader, err := l.f.store.Reader() reader, err := l.f.store.Reader()
if err != nil { if err != nil {
logger.Printf("lookuper fatal: %v", err) logger.Printf("lookuper fatal: %v", err)
@ -87,7 +89,7 @@ func (l *Lookuper) lookup(task *lookupTask) {
} }
}() }()
prefix := TermFreqPrefixFieldTermDocId(0, nil, task.docID) prefix := TermFreqPrefixFieldTermDocId(0, nil, item.docID)
logger.Printf("lookuper prefix - % x", prefix) logger.Printf("lookuper prefix - % x", prefix)
docNums := make(DocNumberList, 0) docNums := make(DocNumberList, 0)
err = visitPrefix(reader, prefix, func(key, val []byte) (bool, error) { err = visitPrefix(reader, prefix, func(key, val []byte) (bool, error) {
@ -106,20 +108,19 @@ func (l *Lookuper) lookup(task *lookupTask) {
} }
oldDocNums := make(DocNumberList, 0, len(docNums)) oldDocNums := make(DocNumberList, 0, len(docNums))
for _, docNum := range docNums { for _, docNum := range docNums {
if task.docNum == 0 || docNum < task.docNum { if item.docNum == 0 || docNum < item.docNum {
oldDocNums = append(oldDocNums, docNum) oldDocNums = append(oldDocNums, docNum)
} }
} }
logger.Printf("lookup migrating '%s' - %d - oldDocNums: %v", task.docID, task.docNum, oldDocNums) logger.Printf("lookup migrating '%s' - %d - oldDocNums: %v", item.docID, item.docNum, oldDocNums)
l.f.compensator.Migrate(task.docID, task.docNum, oldDocNums) l.f.compensator.Migrate(item.docID, item.docNum, oldDocNums)
if len(oldDocNums) == 0 && task.docNum != 0 { if len(oldDocNums) == 0 && item.docNum != 0 {
// this was an add, not an update // this was an add, not an update
atomic.AddUint64(l.f.docCount, 1) atomic.AddUint64(l.f.docCount, 1)
} else if len(oldDocNums) > 0 && task.docNum == 0 { } else if len(oldDocNums) > 0 && item.docNum == 0 {
// this was a delete (and it previously existed) // this was a delete (and it previously existed)
atomic.AddUint64(l.f.docCount, ^uint64(0)) atomic.AddUint64(l.f.docCount, ^uint64(0))
} }
atomic.AddUint64(&l.tasksDone, 1)
} }
// this is not intended to be used publicly, only for unit tests // this is not intended to be used publicly, only for unit tests

View File

@ -62,7 +62,7 @@ func TestLookups(t *testing.T) {
if val == nil { if val == nil {
t.Errorf("expected key: % x to be in the inflight list", tfr.DocID()) t.Errorf("expected key: % x to be in the inflight list", tfr.DocID())
} }
f.(*Firestorm).lookuper.lookup(&lookupTask{docID: tfr.DocID(), docNum: tfr.DocNum()}) f.(*Firestorm).lookuper.lookup(&InFlightItem{docID: tfr.DocID(), docNum: tfr.DocNum()})
// now expect this mutation to NOT be in the in-flight list // now expect this mutation to NOT be in the in-flight list
val = f.(*Firestorm).compensator.inFlight.Get(&InFlightItem{docID: tfr.DocID()}) val = f.(*Firestorm).compensator.inFlight.Get(&InFlightItem{docID: tfr.DocID()})
if val != nil { if val != nil {