From b605224106aab338a91f8c33465c10968fba2c95 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Tue, 29 Dec 2015 22:14:45 -0800 Subject: [PATCH 01/17] use shorter go idiom --- index/firestorm/firestorm.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/index/firestorm/firestorm.go b/index/firestorm/firestorm.go index 839665e2..00257296 100644 --- a/index/firestorm/firestorm.go +++ b/index/firestorm/firestorm.go @@ -144,10 +144,9 @@ func (f *Firestorm) Update(doc *document.Document) (err error) { analysisStart := time.Now() resultChan := make(chan *index.AnalysisResult) aw := index.NewAnalysisWork(f, doc, resultChan) + // put the work on the queue - go func() { - f.analysisQueue.Queue(aw) - }() + go f.analysisQueue.Queue(aw) // wait for the result result := <-resultChan From fd287bdfa461de8570ef424966bb6e3781903911 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 30 Dec 2015 10:41:56 -0800 Subject: [PATCH 02/17] firestorm.md markdown fixes --- index/firestorm/firestorm.md | 76 +++++++++++++++++------------------- 1 file changed, 36 insertions(+), 40 deletions(-) diff --git a/index/firestorm/firestorm.md b/index/firestorm/firestorm.md index 2b9d6b86..7e25383c 100644 --- a/index/firestorm/firestorm.md +++ b/index/firestorm/firestorm.md @@ -4,7 +4,7 @@ A new indexing scheme for Bleve. ## Background -### Goals +### Goals - Avoid a single writer that must pause writing to perform computation - either by allowing multiple writers, if computation cannot be avoided @@ -58,7 +58,6 @@ Once the Delete operation returns, the document should no longer be returned by ## Details - ### Terminology Document ID (`doc_id`) @@ -74,15 +73,15 @@ DocIdNumber By including a new unique identifier as a part of every row generated, the index operation no longer concerns itself with updating existing values or deleting previous values. -Removal of old rows is handled indepenently by a separate thread. +Removal of old rows is handled indepenently by separate threads. -Ensuring of correct semantics with respect to added/updated/deleted documents is maintained through synchronized in memory data structures, to compensate for the decoupling of these other operations. +Ensuring of correct semantics with respect to added/updated/deleted documents is maintained through synchronized in-memory data structures, to compensate for the decoupling of these other operations. The Dictionary becomes a best effort data element. In kill-9 scenarios it could become incorrect, but it is believed that this will generally only affect scoring not correctness, and we can pursue read-repair operations. ### Index State -The following pseudo-structure will be used to explain changes to the internal state. Keep in mind the datatypes shows represent the logical strcture required for correct behavior. The actual implementation may be different to achieve performance goals. +The following pseudo-structure will be used to explain changes to the internal state. Keep in mind the datatypes shown represent the logical structure required for correct behavior. The actual implementation may be different to achieve performance goals. indexState { docCount uint64 @@ -114,6 +113,7 @@ The following pseudo-structure will be used to explain changes to the internal s inFlightDocIds = {} deletedDocIdNumbers = {} } + - Garbage Collector Thread is started - Old Doc Number Lookup Thread is started - Index marked open @@ -124,7 +124,7 @@ The following pseudo-structure will be used to explain changes to the internal s - ITERATE all FieldRows{} - ITERATE all TermFrequencyRow{ where field_id = 0 } - Identify consecutive rows with same doc_id but different doc_number - - Lower document numbers get added to the deleted doc numbers list + - Lower document numbers are added to the deletedDocIdNumbers list - Count all non-duplicate rows, seed the docCount - Observe highest document number seen, seed nextDocNumber @@ -148,7 +148,7 @@ The following pseudo-structure will be used to explain changes to the internal s #### Garbage Collector Thread -The role of the Garbage Collector thread is to clean up rows referring to document numbers that are no longer relevant (document was deleted or updated). +The role of the Garbage Collector thread is to clean up rows referring to document numbers that are no longer relevant (document was deleted or updated). Currently, only two types of rows include document numbers: - Term Frequency Rows @@ -156,12 +156,13 @@ Currently, only two types of rows include document numbers: The current thought is that the garbage collector thread will use a single iterator to iterate the following key spaces: -TermFrequencyRow { where field_id > 0} -StoredRow {all} +- TermFrequencyRow { where field_id > 0} +- StoredRow {all} For any row refering to a document number on the deletedDocNumbers list, that key will be DELETED. The garbage collector will track loop iterations or start key for each deletedDocNumber so that it knows when it has walked a full circle for a given doc number. At point the following happen in order: + - docNumber is removed from the deletecDocNumbers list - DELETE is issued on TermFreqRow{ field_id=0, term=doc_id, doc_id=doc_id_number } @@ -201,35 +202,29 @@ It is notified via a channel of increased term usage (by index ops) and of decre #### Indexing a Document -Perform all analysis on the document. - -new_doc_number = indexState.nextDocNumber++ - -Create New Batch -Batch will contain SET operations for: -- any new Fields -- Term Frequency Rows for indexed fields terms -- Stored Rows for stored fields -Execute Batch - -Acquire indexState.docIdNumberMutex for writing: -set maxReadDocNumber new_doc_number -set inFlightDocIds{ docId = new_doc_number } -Release indexState.docIdNumberMutex - -Notify Term Frequency Updater thread of increased term usage. - -Notify Old Doc Number Lookup Thread of doc_id. +- Perform all analysis on the document. +- new_doc_number = indexState.nextDocNumber++ +- Create New Batch +- Batch will contain SET operations for: + - any new Fields + - Term Frequency Rows for indexed fields terms + - Stored Rows for stored fields +- Execute Batch +- Acquire indexState.docIdNumberMutex for writing: +- set maxReadDocNumber new_doc_number +- set inFlightDocIds{ docId = new_doc_number } +- Release indexState.docIdNumberMutex +- Notify Term Frequency Updater thread of increased term usage. +- Notify Old Doc Number Lookup Thread of doc_id. The key property is that a search matching the updated document *SHOULD* return the document once this method returns. If the document was an update, it should return the previous document until this method returns. There should be no period of time where neither document matches. #### Deleting a Document -Acquire indexState.docIdNumberMutex for writing: -set inFlightDocIds{ docId = 0 } // 0 is a doc number we never use, indicates pending deltion of docId -Release indexState.docIdNumberMutex - -Notify Old Doc Number Lookup Thread of doc_id. +- Acquire indexState.docIdNumberMutex for writing: +- set inFlightDocIds{ docId = 0 } // 0 is a doc number we never use, indicates pending deltion of docId +- Release indexState.docIdNumberMutex +- Notify Old Doc Number Lookup Thread of doc_id. #### Batch Operations @@ -241,13 +236,12 @@ Batch operations look largely just like the indexing/deleting operations. Two o #### Term Field Iteration - Acquire indexState.docIdNumberMutex for reading: -- Get copy of: (it is assumed some COW datastructure is used, or MVCC is accomodated in some way by the impl) +- Get copy of: (it is assumed some COW data structure is used, or MVCC is accomodated in some way by the impl) - maxReadDocNumber - inFlightDocIds - deletedDocIdNumbers - Release indexState.docIdNumberMutex - Term Field Iteration is used by the basic term search. It produces the set of documents (and related info like term vectors) which used the specified term in the specified field. Iterator starts at key: @@ -256,16 +250,18 @@ Iterator starts at key: Iterator ends when the term does not match. -Any row with doc_number > maxReadDocNumber MUST be ignored. -Any row with doc_id_number on the deletedDocIdNumber list MUST be ignored. -Any row with the same doc_id as an entry in the inFlightDocIds map, MUST have the same number. +- Any row with doc_number > maxReadDocNumber MUST be ignored. +- Any row with doc_id_number on the deletedDocIdNumber list MUST be ignored. +- Any row with the same doc_id as an entry in the inFlightDocIds map, MUST have the same number. Any row satisfying the above conditions is a candidate document. ### Row Encoding All keys are manually encoded to ensure a precise row ordering. + Internal Row values are opaque byte arrays. + All other values are encoded using protobuf for a balance of efficiency and flexibility. Dictionary and TermFrequency rows are the most likely to take advantage of this flexibility, but other rows are read/written infrequently enough that the flexibility outweighs any overhead. #### Version @@ -282,7 +278,7 @@ There is a single version row which records which version of the firestorm index #### Field -Field rows map field names to numeric values +Field rows map field names to numeric values | Key | Value | |---------|------------| @@ -375,7 +371,7 @@ In state d, we have completed the lookup for the old document numbers of X, and In state e, the garbage collector has removed all record of X#1. -The Index method returns after it has transitioned to state C, which maintains the semantics we desire. +The Index method returns after it has transitioned to state c, which maintains the semantics we desire. 2\. Wait, what happens if I kill -9 the process, won't you forget about the deleted documents? From ee5ccda1125818ebb6b351d3c2ff9e2999f3d01b Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 30 Dec 2015 11:18:16 -0800 Subject: [PATCH 03/17] use KeyTo/ValueTo in firestorm.batchRows After this change, with null kvstore micro-benchmark... GOMAXPROCS=8 ./bleve-blast -source=../../tmp/enwiki.txt \ -count=100000 -numAnalyzers=8 -numIndexers=8 \ -config=../../configs/null-firestorm.json -batch=100 Then TermFreqRow key and value methods dissapear as large boxes from the cpu profile graphs. --- index/firestorm/firestorm.go | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/index/firestorm/firestorm.go b/index/firestorm/firestorm.go index 00257296..959b3d47 100644 --- a/index/firestorm/firestorm.go +++ b/index/firestorm/firestorm.go @@ -195,16 +195,46 @@ func (f *Firestorm) batchRows(writer store.KVWriter, rows []index.IndexRow, dele // prepare batch wb := writer.NewBatch() + var kbuf []byte + var vbuf []byte + + prepareBuf := func(buf []byte, sizeNeeded int) []byte { + if cap(buf) < sizeNeeded { + return make([]byte, sizeNeeded, sizeNeeded+128) + } + return buf[0:sizeNeeded] + } + dictionaryDeltas := make(map[string]int64) for _, row := range rows { tfr, ok := row.(*TermFreqRow) if ok { if tfr.Field() != 0 { - drk := tfr.DictionaryRowKey() - dictionaryDeltas[string(drk)] += 1 + kbuf = prepareBuf(kbuf, tfr.DictionaryRowKeySize()) + klen, err := tfr.DictionaryRowKeyTo(kbuf) + if err != nil { + return nil, err + } + + dictionaryDeltas[string(kbuf[0:klen])] += 1 } + + kbuf = prepareBuf(kbuf, tfr.KeySize()) + klen, err := tfr.KeyTo(kbuf) + if err != nil { + return nil, err + } + + vbuf = prepareBuf(vbuf, tfr.ValueSize()) + vlen, err := tfr.ValueTo(vbuf) + if err != nil { + return nil, err + } + + wb.Set(kbuf[0:klen], vbuf[0:vlen]) + } else { + wb.Set(row.Key(), row.Value()) } - wb.Set(row.Key(), row.Value()) } for _, dk := range deleteKeys { From a345e7951e59fab9f12438d60b7fd4c34418706f Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 30 Dec 2015 13:13:38 -0800 Subject: [PATCH 04/17] TokenFrequency() alloc's all TokenLocations up front --- analysis/freq.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/analysis/freq.go b/analysis/freq.go index 434dc85c..a67e0afa 100644 --- a/analysis/freq.go +++ b/analysis/freq.go @@ -55,28 +55,28 @@ func (tfs TokenFrequencies) MergeAll(remoteField string, other TokenFrequencies) func TokenFrequency(tokens TokenStream, arrayPositions []uint64) TokenFrequencies { rv := make(map[string]*TokenFreq, len(tokens)) + tls := make([]TokenLocation, len(tokens)) + tlNext := 0 + for _, token := range tokens { + tls[tlNext] = TokenLocation{ + ArrayPositions: arrayPositions, + Start: token.Start, + End: token.End, + Position: token.Position, + } + curr, ok := rv[string(token.Term)] if ok { - curr.Locations = append(curr.Locations, &TokenLocation{ - ArrayPositions: arrayPositions, - Start: token.Start, - End: token.End, - Position: token.Position, - }) + curr.Locations = append(curr.Locations, &tls[tlNext]) } else { rv[string(token.Term)] = &TokenFreq{ - Term: token.Term, - Locations: []*TokenLocation{ - &TokenLocation{ - ArrayPositions: arrayPositions, - Start: token.Start, - End: token.End, - Position: token.Position, - }, - }, + Term: token.Term, + Locations: []*TokenLocation{&tls[tlNext]}, } } + + tlNext++ } return rv From fd81d0364cd5b9d9d11b0f4d5e665f3f35b98fff Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 30 Dec 2015 13:55:08 -0800 Subject: [PATCH 05/17] firestorm.indexField() uses capacity of len(tokenFreqs) --- index/firestorm/analysis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index/firestorm/analysis.go b/index/firestorm/analysis.go index c2118ff3..9e9626f6 100644 --- a/index/firestorm/analysis.go +++ b/index/firestorm/analysis.go @@ -94,7 +94,7 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult { func (f *Firestorm) indexField(docID string, docNum uint64, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies) []index.IndexRow { - rows := make([]index.IndexRow, 0, 100) + rows := make([]index.IndexRow, 0, len(tokenFreqs)) fieldNorm := float32(1.0 / math.Sqrt(float64(fieldLength))) for _, tf := range tokenFreqs { From 0a7f7e3df8700a727302de5e937a41064908b28c Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 30 Dec 2015 14:03:32 -0800 Subject: [PATCH 06/17] firestorm.Analyze() converts docID to bytes only once --- index/firestorm/analysis.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/index/firestorm/analysis.go b/index/firestorm/analysis.go index 9e9626f6..4e5b71dc 100644 --- a/index/firestorm/analysis.go +++ b/index/firestorm/analysis.go @@ -24,6 +24,8 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult { Rows: make([]index.IndexRow, 0, 100), } + docIDBytes := []byte(d.ID) + // information we collate as we merge fields with same name fieldTermFreqs := make(map[uint16]analysis.TokenFrequencies) fieldLengths := make(map[uint16]int) @@ -38,7 +40,7 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult { fieldNames[fieldIndex] = field.Name() // add the _id row - rv.Rows = append(rv.Rows, NewTermFreqRow(0, nil, []byte(d.ID), d.Number, 0, 0, nil)) + rv.Rows = append(rv.Rows, NewTermFreqRow(0, nil, docIDBytes, d.Number, 0, 0, nil)) if field.Options().IsIndexed() { fieldLength, tokenFreqs := field.Analyze() @@ -54,7 +56,7 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult { } if field.Options().IsStored() { - storeRow := f.storeField(d.ID, d.Number, field, fieldIndex) + storeRow := f.storeField(docIDBytes, d.Number, field, fieldIndex) rv.Rows = append(rv.Rows, storeRow) } } @@ -71,7 +73,7 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult { } // encode this field - indexRows := f.indexField(d.ID, d.Number, includeTermVectors, fieldIndex, fieldLength, tokenFreqs) + indexRows := f.indexField(docIDBytes, d.Number, includeTermVectors, fieldIndex, fieldLength, tokenFreqs) rv.Rows = append(rv.Rows, indexRows...) } @@ -84,7 +86,7 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult { if compositeField.Options().IsIndexed() { fieldLength, tokenFreqs := compositeField.Analyze() // encode this field - indexRows := f.indexField(d.ID, d.Number, compositeField.Options().IncludeTermVectors(), fieldIndex, fieldLength, tokenFreqs) + indexRows := f.indexField(docIDBytes, d.Number, compositeField.Options().IncludeTermVectors(), fieldIndex, fieldLength, tokenFreqs) rv.Rows = append(rv.Rows, indexRows...) } } @@ -92,7 +94,7 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult { return rv } -func (f *Firestorm) indexField(docID string, docNum uint64, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies) []index.IndexRow { +func (f *Firestorm) indexField(docID []byte, docNum uint64, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies) []index.IndexRow { rows := make([]index.IndexRow, 0, len(tokenFreqs)) fieldNorm := float32(1.0 / math.Sqrt(float64(fieldLength))) @@ -102,9 +104,9 @@ func (f *Firestorm) indexField(docID string, docNum uint64, includeTermVectors b if includeTermVectors { tv, newFieldRows := f.termVectorsFromTokenFreq(fieldIndex, tf) rows = append(rows, newFieldRows...) - termFreqRow = NewTermFreqRow(fieldIndex, tf.Term, []byte(docID), docNum, uint64(tf.Frequency()), fieldNorm, tv) + termFreqRow = NewTermFreqRow(fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, tv) } else { - termFreqRow = NewTermFreqRow(fieldIndex, tf.Term, []byte(docID), docNum, uint64(tf.Frequency()), fieldNorm, nil) + termFreqRow = NewTermFreqRow(fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, nil) } rows = append(rows, termFreqRow) @@ -134,11 +136,11 @@ func (f *Firestorm) termVectorsFromTokenFreq(field uint16, tf *analysis.TokenFre return rv, newFieldRows } -func (f *Firestorm) storeField(docID string, docNum uint64, field document.Field, fieldIndex uint16) index.IndexRow { +func (f *Firestorm) storeField(docID []byte, docNum uint64, field document.Field, fieldIndex uint16) index.IndexRow { fieldValue := make([]byte, 1+len(field.Value())) fieldValue[0] = encodeFieldType(field) copy(fieldValue[1:], field.Value()) - storedRow := NewStoredRow([]byte(docID), docNum, fieldIndex, field.ArrayPositions(), fieldValue) + storedRow := NewStoredRow(docID, docNum, fieldIndex, field.ArrayPositions(), fieldValue) return storedRow } From 3feeb14b7d2fdb110cdeea2f437d092a4c4185ca Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 30 Dec 2015 16:05:51 -0800 Subject: [PATCH 07/17] firestorm.batchRows reuses buf for all IndexRows --- index/firestorm/firestorm.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/index/firestorm/firestorm.go b/index/firestorm/firestorm.go index 959b3d47..83eb8a2c 100644 --- a/index/firestorm/firestorm.go +++ b/index/firestorm/firestorm.go @@ -218,23 +218,21 @@ func (f *Firestorm) batchRows(writer store.KVWriter, rows []index.IndexRow, dele dictionaryDeltas[string(kbuf[0:klen])] += 1 } - - kbuf = prepareBuf(kbuf, tfr.KeySize()) - klen, err := tfr.KeyTo(kbuf) - if err != nil { - return nil, err - } - - vbuf = prepareBuf(vbuf, tfr.ValueSize()) - vlen, err := tfr.ValueTo(vbuf) - if err != nil { - return nil, err - } - - wb.Set(kbuf[0:klen], vbuf[0:vlen]) - } else { - wb.Set(row.Key(), row.Value()) } + + kbuf = prepareBuf(kbuf, row.KeySize()) + klen, err := row.KeyTo(kbuf) + if err != nil { + return nil, err + } + + vbuf = prepareBuf(vbuf, row.ValueSize()) + vlen, err := row.ValueTo(vbuf) + if err != nil { + return nil, err + } + + wb.Set(kbuf[0:klen], vbuf[0:vlen]) } for _, dk := range deleteKeys { From 38d50ed8b53c1354070c47481cb2adfe73c1b523 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 30 Dec 2015 19:23:56 -0800 Subject: [PATCH 08/17] renamed var to docsUpdated to match docsDeleted naming --- index/firestorm/firestorm.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/index/firestorm/firestorm.go b/index/firestorm/firestorm.go index 83eb8a2c..b73d0fe0 100644 --- a/index/firestorm/firestorm.go +++ b/index/firestorm/firestorm.go @@ -258,13 +258,13 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { analysisStart := time.Now() resultChan := make(chan *index.AnalysisResult) - var numUpdates uint64 + var docsUpdated uint64 var docsDeleted uint64 for _, doc := range batch.IndexOps { if doc != nil { doc.Number = firstDocNumber // actually assign doc numbers here firstDocNumber++ - numUpdates++ + docsUpdated++ } else { docsDeleted++ } @@ -278,7 +278,7 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { for _, doc := range batch.IndexOps { if doc != nil { sofar++ - if sofar > numUpdates { + if sofar > docsUpdated { detectedUnsafeMutex.Lock() detectedUnsafe = true detectedUnsafeMutex.Unlock() @@ -294,7 +294,7 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { allRows := make([]index.IndexRow, 0, 1000) // wait for the result var itemsDeQueued uint64 - for itemsDeQueued < numUpdates { + for itemsDeQueued < docsUpdated { result := <-resultChan allRows = append(allRows, result.Rows...) itemsDeQueued++ @@ -352,7 +352,7 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart))) if err == nil { - atomic.AddUint64(&f.stats.updates, numUpdates) + atomic.AddUint64(&f.stats.updates, docsUpdated) atomic.AddUint64(&f.stats.deletes, docsDeleted) atomic.AddUint64(&f.stats.batches, 1) } else { From 7ae696d66129f7c362143c6327483c3029a9adf9 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 30 Dec 2015 20:43:31 -0800 Subject: [PATCH 09/17] firestorm lookuper notified via batch Previously, the firestorm.Batch() would notify the lookuper goroutine on a document by document basis. If the lookuper input channel became full, then that would block the firestorm.Batch() operation. With this change, lookuper is notified once, with a "batch" that is an []*InFlightItem. This change also reuses that same []*InFlightItem to invoke the compensator.MutateBatch(). This also has the advantage of only converting the docID's from string to []byte just once, outside of the lock that's used by the compensator. Micro-benchmark of this change with null-firestorm bleve-blast does not show large impact, neither degradation or improvement. --- index/firestorm/comp.go | 13 ++++-------- index/firestorm/firestorm.go | 25 +++++++++++++--------- index/firestorm/lookup.go | 39 +++++++++++++++++----------------- index/firestorm/lookup_test.go | 2 +- 4 files changed, 40 insertions(+), 39 deletions(-) diff --git a/index/firestorm/comp.go b/index/firestorm/comp.go index bd0bfb01..a4138823 100644 --- a/index/firestorm/comp.go +++ b/index/firestorm/comp.go @@ -15,7 +15,6 @@ import ( "sort" "sync" - "github.com/blevesearch/bleve/document" "github.com/steveyen/gtreap" "github.com/willf/bitset" ) @@ -80,17 +79,13 @@ func (c *Compensator) Mutate(docID []byte, docNum uint64) { } } -func (c *Compensator) MutateBatch(docs map[string]*document.Document, docNum uint64) { +func (c *Compensator) MutateBatch(inflightItems []*InFlightItem, lastDocNum uint64) { c.inFlightMutex.Lock() defer c.inFlightMutex.Unlock() - for docID, doc := range docs { - if doc != nil { - c.inFlight = c.inFlight.Upsert(&InFlightItem{docID: []byte(docID), docNum: doc.Number}, rand.Int()) - } else { - c.inFlight = c.inFlight.Upsert(&InFlightItem{docID: []byte(docID), docNum: 0}, rand.Int()) - } + for _, item := range inflightItems { + c.inFlight = c.inFlight.Upsert(item, rand.Int()) } - c.maxRead = docNum + c.maxRead = lastDocNum } func (c *Compensator) Migrate(docID []byte, docNum uint64, oldDocNums []uint64) { diff --git a/index/firestorm/firestorm.go b/index/firestorm/firestorm.go index b73d0fe0..662646ca 100644 --- a/index/firestorm/firestorm.go +++ b/index/firestorm/firestorm.go @@ -175,7 +175,7 @@ func (f *Firestorm) Update(doc *document.Document) (err error) { } f.compensator.Mutate([]byte(doc.ID), doc.Number) - f.lookuper.Notify(doc.Number, []byte(doc.ID)) + f.lookuper.NotifyBatch([]*InFlightItem{&InFlightItem{[]byte(doc.ID), doc.Number}}) f.dictUpdater.NotifyBatch(dictionaryDeltas) atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart))) @@ -185,7 +185,7 @@ func (f *Firestorm) Update(doc *document.Document) (err error) { func (f *Firestorm) Delete(id string) error { indexStart := time.Now() f.compensator.Mutate([]byte(id), 0) - f.lookuper.Notify(0, []byte(id)) + f.lookuper.NotifyBatch([]*InFlightItem{&InFlightItem{[]byte(id), 0}}) atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart))) return nil } @@ -322,6 +322,17 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { } } + inflightItems := make([]*InFlightItem, 0, len(batch.IndexOps)) + for docID, doc := range batch.IndexOps { + if doc != nil { + inflightItems = append(inflightItems, + &InFlightItem{[]byte(docID), doc.Number}) + } else { + inflightItems = append(inflightItems, + &InFlightItem{[]byte(docID), 0}) + } + } + indexStart := time.Now() // start a writer for this batch var kvwriter store.KVWriter @@ -338,14 +349,8 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { return } - f.compensator.MutateBatch(batch.IndexOps, lastDocNumber) - for docID, doc := range batch.IndexOps { - if doc != nil { - f.lookuper.Notify(doc.Number, []byte(doc.ID)) - } else { - f.lookuper.Notify(0, []byte(docID)) - } - } + f.compensator.MutateBatch(inflightItems, lastDocNumber) + f.lookuper.NotifyBatch(inflightItems) f.dictUpdater.NotifyBatch(dictionaryDeltas) err = kvwriter.Close() diff --git a/index/firestorm/lookup.go b/index/firestorm/lookup.go index c07ea525..0964f29d 100644 --- a/index/firestorm/lookup.go +++ b/index/firestorm/lookup.go @@ -18,14 +18,9 @@ import ( const channelBufferSize = 1000 -type lookupTask struct { - docID []byte - docNum uint64 -} - type Lookuper struct { f *Firestorm - workChan chan *lookupTask + workChan chan []*InFlightItem quit chan struct{} closeWait sync.WaitGroup @@ -36,15 +31,15 @@ type Lookuper struct { func NewLookuper(f *Firestorm) *Lookuper { rv := Lookuper{ f: f, - workChan: make(chan *lookupTask, channelBufferSize), + workChan: make(chan []*InFlightItem, channelBufferSize), quit: make(chan struct{}), } return &rv } -func (l *Lookuper) Notify(docNum uint64, docID []byte) { +func (l *Lookuper) NotifyBatch(items []*InFlightItem) { atomic.AddUint64(&l.tasksQueued, 1) - l.workChan <- &lookupTask{docID: docID, docNum: docNum} + l.workChan <- items } func (l *Lookuper) Start() { @@ -65,17 +60,24 @@ func (l *Lookuper) run() { logger.Printf("lookuper asked to quit") l.closeWait.Done() return - case task, ok := <-l.workChan: + case items, ok := <-l.workChan: if !ok { logger.Printf("lookuper work channel closed unexpectedly, stopping") return } - l.lookup(task) + l.lookupItems(items) } } } -func (l *Lookuper) lookup(task *lookupTask) { +func (l *Lookuper) lookupItems(items []*InFlightItem) { + for _, item := range items { + l.lookup(item) + } + atomic.AddUint64(&l.tasksDone, 1) +} + +func (l *Lookuper) lookup(item *InFlightItem) { reader, err := l.f.store.Reader() if err != nil { logger.Printf("lookuper fatal: %v", err) @@ -87,7 +89,7 @@ func (l *Lookuper) lookup(task *lookupTask) { } }() - prefix := TermFreqPrefixFieldTermDocId(0, nil, task.docID) + prefix := TermFreqPrefixFieldTermDocId(0, nil, item.docID) logger.Printf("lookuper prefix - % x", prefix) docNums := make(DocNumberList, 0) err = visitPrefix(reader, prefix, func(key, val []byte) (bool, error) { @@ -106,20 +108,19 @@ func (l *Lookuper) lookup(task *lookupTask) { } oldDocNums := make(DocNumberList, 0, len(docNums)) for _, docNum := range docNums { - if task.docNum == 0 || docNum < task.docNum { + if item.docNum == 0 || docNum < item.docNum { oldDocNums = append(oldDocNums, docNum) } } - logger.Printf("lookup migrating '%s' - %d - oldDocNums: %v", task.docID, task.docNum, oldDocNums) - l.f.compensator.Migrate(task.docID, task.docNum, oldDocNums) - if len(oldDocNums) == 0 && task.docNum != 0 { + logger.Printf("lookup migrating '%s' - %d - oldDocNums: %v", item.docID, item.docNum, oldDocNums) + l.f.compensator.Migrate(item.docID, item.docNum, oldDocNums) + if len(oldDocNums) == 0 && item.docNum != 0 { // this was an add, not an update atomic.AddUint64(l.f.docCount, 1) - } else if len(oldDocNums) > 0 && task.docNum == 0 { + } else if len(oldDocNums) > 0 && item.docNum == 0 { // this was a delete (and it previously existed) atomic.AddUint64(l.f.docCount, ^uint64(0)) } - atomic.AddUint64(&l.tasksDone, 1) } // this is not intended to be used publicly, only for unit tests diff --git a/index/firestorm/lookup_test.go b/index/firestorm/lookup_test.go index 8097ba8a..bc33828f 100644 --- a/index/firestorm/lookup_test.go +++ b/index/firestorm/lookup_test.go @@ -62,7 +62,7 @@ func TestLookups(t *testing.T) { if val == nil { t.Errorf("expected key: % x to be in the inflight list", tfr.DocID()) } - f.(*Firestorm).lookuper.lookup(&lookupTask{docID: tfr.DocID(), docNum: tfr.DocNum()}) + f.(*Firestorm).lookuper.lookup(&InFlightItem{docID: tfr.DocID(), docNum: tfr.DocNum()}) // now expect this mutation to NOT be in the in-flight list val = f.(*Firestorm).compensator.inFlight.Get(&InFlightItem{docID: tfr.DocID()}) if val != nil { From 45e9eaaacbdc885efee9c9efefc41ebc92099a92 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 30 Dec 2015 21:15:24 -0800 Subject: [PATCH 10/17] firestorm.indexField() allocs up-front array of TermFreqRow's This uses the "backing array" technique to allocate many TermFreqRow's at the front of firestorm.indexField(), instead of the previous one-by-one, as-needed TermFreqRow allocation approach. Results from micro-benchmark, null-firestorm, bleve-blast has this change producing a ~half MB/sec improvement. --- index/firestorm/analysis.go | 8 ++++++-- index/firestorm/termfreq.go | 22 +++++++++++----------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/index/firestorm/analysis.go b/index/firestorm/analysis.go index 4e5b71dc..32c6cf66 100644 --- a/index/firestorm/analysis.go +++ b/index/firestorm/analysis.go @@ -97,19 +97,23 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult { func (f *Firestorm) indexField(docID []byte, docNum uint64, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies) []index.IndexRow { rows := make([]index.IndexRow, 0, len(tokenFreqs)) + tfrs := make([]TermFreqRow, len(tokenFreqs)) + fieldNorm := float32(1.0 / math.Sqrt(float64(fieldLength))) + i := 0 for _, tf := range tokenFreqs { var termFreqRow *TermFreqRow if includeTermVectors { tv, newFieldRows := f.termVectorsFromTokenFreq(fieldIndex, tf) rows = append(rows, newFieldRows...) - termFreqRow = NewTermFreqRow(fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, tv) + termFreqRow = InitTermFreqRow(&tfrs[i], fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, tv) } else { - termFreqRow = NewTermFreqRow(fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, nil) + termFreqRow = InitTermFreqRow(&tfrs[i], fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, nil) } rows = append(rows, termFreqRow) + i++ } return rows diff --git a/index/firestorm/termfreq.go b/index/firestorm/termfreq.go index 0d5cc9e8..1d36ec26 100644 --- a/index/firestorm/termfreq.go +++ b/index/firestorm/termfreq.go @@ -46,18 +46,18 @@ func NewTermVector(field uint16, pos uint64, start uint64, end uint64, arrayPos } func NewTermFreqRow(field uint16, term []byte, docID []byte, docNum uint64, freq uint64, norm float32, termVectors []*TermVector) *TermFreqRow { - rv := TermFreqRow{ - field: field, - term: term, - docID: docID, - docNum: docNum, - } + return InitTermFreqRow(&TermFreqRow{}, field, term, docID, docNum, freq, norm, termVectors) +} - rv.value.Freq = proto.Uint64(freq) - rv.value.Norm = proto.Float32(norm) - rv.value.Vectors = termVectors - - return &rv +func InitTermFreqRow(tfr *TermFreqRow, field uint16, term []byte, docID []byte, docNum uint64, freq uint64, norm float32, termVectors []*TermVector) *TermFreqRow { + tfr.field = field + tfr.term = term + tfr.docID = docID + tfr.docNum = docNum + tfr.value.Freq = proto.Uint64(freq) + tfr.value.Norm = proto.Float32(norm) + tfr.value.Vectors = termVectors + return tfr } func NewTermFreqRowKV(key, value []byte) (*TermFreqRow, error) { From 5b2bc1c20f7fde8cf4768fca67573677fad266ae Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 30 Dec 2015 21:32:06 -0800 Subject: [PATCH 11/17] firestorm.indexField() check for includeTermVectors moved out of loop --- index/firestorm/analysis.go | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/index/firestorm/analysis.go b/index/firestorm/analysis.go index 32c6cf66..58c6e2c5 100644 --- a/index/firestorm/analysis.go +++ b/index/firestorm/analysis.go @@ -96,26 +96,28 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult { func (f *Firestorm) indexField(docID []byte, docNum uint64, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies) []index.IndexRow { - rows := make([]index.IndexRow, 0, len(tokenFreqs)) tfrs := make([]TermFreqRow, len(tokenFreqs)) fieldNorm := float32(1.0 / math.Sqrt(float64(fieldLength))) - i := 0 - for _, tf := range tokenFreqs { - var termFreqRow *TermFreqRow - if includeTermVectors { - tv, newFieldRows := f.termVectorsFromTokenFreq(fieldIndex, tf) - rows = append(rows, newFieldRows...) - termFreqRow = InitTermFreqRow(&tfrs[i], fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, tv) - } else { - termFreqRow = InitTermFreqRow(&tfrs[i], fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, nil) + if !includeTermVectors { + rows := make([]index.IndexRow, len(tokenFreqs)) + i := 0 + for _, tf := range tokenFreqs { + rows[i] = InitTermFreqRow(&tfrs[i], fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, nil) + i++ } - - rows = append(rows, termFreqRow) - i++ + return rows } + rows := make([]index.IndexRow, 0, len(tokenFreqs)) + i := 0 + for _, tf := range tokenFreqs { + tv, newFieldRows := f.termVectorsFromTokenFreq(fieldIndex, tf) + rows = append(rows, newFieldRows...) + rows = append(rows, InitTermFreqRow(&tfrs[i], fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, tv)) + i++ + } return rows } From 918732f3d85deace75cd560a7a19f2898f078f20 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 30 Dec 2015 22:18:15 -0800 Subject: [PATCH 12/17] unicode.Tokenize() allocs backing array of Tokens Previously, unicode.Tokenize() would allocate a Token one-by-one, on an as-needed basis. This change allocates a "backing array" of Tokens, so that it goes to the runtime object allocator much less often. It takes a heuristic guess as to the backing array size by using the average token (segment) length seen so far. Results from micro-benchmark (null-firestorm, bleve-blast) seem to give perhaps less than ~0.5 MB/second throughput improvement. --- analysis/tokenizers/unicode/unicode.go | 38 ++++++++++++++++++++------ 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/analysis/tokenizers/unicode/unicode.go b/analysis/tokenizers/unicode/unicode.go index b957231d..291893d3 100644 --- a/analysis/tokenizers/unicode/unicode.go +++ b/analysis/tokenizers/unicode/unicode.go @@ -26,6 +26,8 @@ func NewUnicodeTokenizer() *UnicodeTokenizer { } func (rt *UnicodeTokenizer) Tokenize(input []byte) analysis.TokenStream { + ta := []analysis.Token(nil) + taNext := 0 rv := make(analysis.TokenStream, 0) @@ -36,18 +38,38 @@ func (rt *UnicodeTokenizer) Tokenize(input []byte) analysis.TokenStream { segmentBytes := segmenter.Bytes() end := start + len(segmentBytes) if segmenter.Type() != segment.None { - token := analysis.Token{ - Term: segmentBytes, - Start: start, - End: end, - Position: pos, - Type: convertType(segmenter.Type()), + if taNext >= len(ta) { + avgSegmentLen := end / (len(rv) + 1) + if avgSegmentLen < 1 { + avgSegmentLen = 1 + } + + remainingLen := len(input) - end + remainingSegments := remainingLen / avgSegmentLen + if remainingSegments > 1000 { + remainingSegments = 1000 + } + if remainingSegments < 1 { + remainingSegments = 1 + } + + ta = make([]analysis.Token, remainingSegments) + taNext = 0 } - rv = append(rv, &token) + + token := &ta[taNext] + taNext++ + + token.Term = segmentBytes + token.Start = start + token.End = end + token.Position = pos + token.Type = convertType(segmenter.Type()) + + rv = append(rv, token) pos++ } start = end - } return rv } From 325a6169937f6fd093d423d5bb09eaba0c70a72d Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Thu, 31 Dec 2015 19:43:55 -0800 Subject: [PATCH 13/17] unicode.Tokenize() avoids array growth via array of arrays --- analysis/tokenizers/unicode/unicode.go | 49 +++++++++++++++++++++----- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/analysis/tokenizers/unicode/unicode.go b/analysis/tokenizers/unicode/unicode.go index 291893d3..6240417e 100644 --- a/analysis/tokenizers/unicode/unicode.go +++ b/analysis/tokenizers/unicode/unicode.go @@ -26,26 +26,33 @@ func NewUnicodeTokenizer() *UnicodeTokenizer { } func (rt *UnicodeTokenizer) Tokenize(input []byte) analysis.TokenStream { + rvx := make([]analysis.TokenStream, 0, 10) // When rv gets full, append to rvx. + rv := make(analysis.TokenStream, 0, 1) + ta := []analysis.Token(nil) taNext := 0 - rv := make(analysis.TokenStream, 0) - segmenter := segment.NewWordSegmenterDirect(input) start := 0 pos := 1 + + guessRemaining := func(end int) int { + avgSegmentLen := end / (len(rv) + 1) + if avgSegmentLen < 1 { + avgSegmentLen = 1 + } + + remainingLen := len(input) - end + + return remainingLen / avgSegmentLen + } + for segmenter.Segment() { segmentBytes := segmenter.Bytes() end := start + len(segmentBytes) if segmenter.Type() != segment.None { if taNext >= len(ta) { - avgSegmentLen := end / (len(rv) + 1) - if avgSegmentLen < 1 { - avgSegmentLen = 1 - } - - remainingLen := len(input) - end - remainingSegments := remainingLen / avgSegmentLen + remainingSegments := guessRemaining(end) if remainingSegments > 1000 { remainingSegments = 1000 } @@ -66,11 +73,35 @@ func (rt *UnicodeTokenizer) Tokenize(input []byte) analysis.TokenStream { token.Position = pos token.Type = convertType(segmenter.Type()) + if len(rv) >= cap(rv) { // When rv is full, save it into rvx. + rvx = append(rvx, rv) + + rvCap := cap(rv) * 2 + if rvCap > 256 { + rvCap = 256 + } + + rv = make(analysis.TokenStream, 0, rvCap) // Next rv cap is bigger. + } + rv = append(rv, token) pos++ } start = end } + + if len(rvx) > 0 { + n := len(rv) + for _, r := range rvx { + n += len(r) + } + rall := make(analysis.TokenStream, 0, n) + for _, r := range rvx { + rall = append(rall, r...) + } + return append(rall, rv...) + } + return rv } From b2412424653ff873b5896dc9c5e11d2135720864 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Thu, 31 Dec 2015 18:13:54 -0800 Subject: [PATCH 14/17] firestorm.Analyze() preallocs rows, with analyzeField() func The new analyzeField() helper func is used for both regular fields and for composite fields. With this change, all analysis is done up front, for both regular fields and composite fields. After analysis, this change counts up all the row capacity needed and extends the AnalysisResult.Rows in one shot, as opposed to the previous approach of dynamically growing the array as needed during append()'s. Also, in this change, the TermFreqRow for _id is added first, which seems more correct. --- index/firestorm/analysis.go | 73 ++++++++++++++++---------------- index/firestorm/analysis_test.go | 2 +- 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/index/firestorm/analysis.go b/index/firestorm/analysis.go index 58c6e2c5..58f105e3 100644 --- a/index/firestorm/analysis.go +++ b/index/firestorm/analysis.go @@ -26,22 +26,22 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult { docIDBytes := []byte(d.ID) + // add the _id row + rv.Rows = append(rv.Rows, NewTermFreqRow(0, nil, docIDBytes, d.Number, 0, 0, nil)) + // information we collate as we merge fields with same name fieldTermFreqs := make(map[uint16]analysis.TokenFrequencies) fieldLengths := make(map[uint16]int) fieldIncludeTermVectors := make(map[uint16]bool) fieldNames := make(map[uint16]string) - for _, field := range d.Fields { + analyzeField := func(field document.Field, storable bool) { fieldIndex, newFieldRow := f.fieldIndexOrNewRow(field.Name()) if newFieldRow != nil { rv.Rows = append(rv.Rows, newFieldRow) } fieldNames[fieldIndex] = field.Name() - // add the _id row - rv.Rows = append(rv.Rows, NewTermFreqRow(0, nil, docIDBytes, d.Number, 0, 0, nil)) - if field.Options().IsIndexed() { fieldLength, tokenFreqs := field.Analyze() existingFreqs := fieldTermFreqs[fieldIndex] @@ -55,75 +55,74 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult { fieldIncludeTermVectors[fieldIndex] = field.Options().IncludeTermVectors() } - if field.Options().IsStored() { + if storable && field.Options().IsStored() { storeRow := f.storeField(docIDBytes, d.Number, field, fieldIndex) rv.Rows = append(rv.Rows, storeRow) } } + for _, field := range d.Fields { + analyzeField(field, true) + } + + for fieldIndex, tokenFreqs := range fieldTermFreqs { + // see if any of the composite fields need this + for _, compositeField := range d.CompositeFields { + compositeField.Compose(fieldNames[fieldIndex], fieldLengths[fieldIndex], tokenFreqs) + } + } + + for _, compositeField := range d.CompositeFields { + analyzeField(compositeField, false) + } + + rowsCapNeeded := len(rv.Rows) + for _, tokenFreqs := range fieldTermFreqs { + rowsCapNeeded += len(tokenFreqs) + } + + rows := make([]index.IndexRow, 0, rowsCapNeeded) + rv.Rows = append(rows, rv.Rows...) + // walk through the collated information and proccess // once for each indexed field (unique name) for fieldIndex, tokenFreqs := range fieldTermFreqs { fieldLength := fieldLengths[fieldIndex] includeTermVectors := fieldIncludeTermVectors[fieldIndex] - // see if any of the composite fields need this - for _, compositeField := range d.CompositeFields { - compositeField.Compose(fieldNames[fieldIndex], fieldLength, tokenFreqs) - } - - // encode this field - indexRows := f.indexField(docIDBytes, d.Number, includeTermVectors, fieldIndex, fieldLength, tokenFreqs) - rv.Rows = append(rv.Rows, indexRows...) - } - - // now index the composite fields - for _, compositeField := range d.CompositeFields { - fieldIndex, newFieldRow := f.fieldIndexOrNewRow(compositeField.Name()) - if newFieldRow != nil { - rv.Rows = append(rv.Rows, newFieldRow) - } - if compositeField.Options().IsIndexed() { - fieldLength, tokenFreqs := compositeField.Analyze() - // encode this field - indexRows := f.indexField(docIDBytes, d.Number, compositeField.Options().IncludeTermVectors(), fieldIndex, fieldLength, tokenFreqs) - rv.Rows = append(rv.Rows, indexRows...) - } + rv.Rows = f.indexField(docIDBytes, d.Number, includeTermVectors, fieldIndex, fieldLength, tokenFreqs, rv.Rows) } return rv } -func (f *Firestorm) indexField(docID []byte, docNum uint64, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies) []index.IndexRow { +func (f *Firestorm) indexField(docID []byte, docNum uint64, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies, rows []index.IndexRow) []index.IndexRow { tfrs := make([]TermFreqRow, len(tokenFreqs)) fieldNorm := float32(1.0 / math.Sqrt(float64(fieldLength))) if !includeTermVectors { - rows := make([]index.IndexRow, len(tokenFreqs)) i := 0 for _, tf := range tokenFreqs { - rows[i] = InitTermFreqRow(&tfrs[i], fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, nil) + rows = append(rows, InitTermFreqRow(&tfrs[i], fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, nil)) i++ } return rows } - rows := make([]index.IndexRow, 0, len(tokenFreqs)) i := 0 for _, tf := range tokenFreqs { - tv, newFieldRows := f.termVectorsFromTokenFreq(fieldIndex, tf) - rows = append(rows, newFieldRows...) + var tv []*TermVector + tv, rows = f.termVectorsFromTokenFreq(fieldIndex, tf, rows) rows = append(rows, InitTermFreqRow(&tfrs[i], fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, tv)) i++ } return rows } -func (f *Firestorm) termVectorsFromTokenFreq(field uint16, tf *analysis.TokenFreq) ([]*TermVector, []index.IndexRow) { +func (f *Firestorm) termVectorsFromTokenFreq(field uint16, tf *analysis.TokenFreq, rows []index.IndexRow) ([]*TermVector, []index.IndexRow) { rv := make([]*TermVector, len(tf.Locations)) - newFieldRows := make([]index.IndexRow, 0) for i, l := range tf.Locations { var newFieldRow *FieldRow @@ -132,14 +131,14 @@ func (f *Firestorm) termVectorsFromTokenFreq(field uint16, tf *analysis.TokenFre // lookup correct field fieldIndex, newFieldRow = f.fieldIndexOrNewRow(l.Field) if newFieldRow != nil { - newFieldRows = append(newFieldRows, newFieldRow) + rows = append(rows, newFieldRow) } } tv := NewTermVector(fieldIndex, uint64(l.Position), uint64(l.Start), uint64(l.End), l.ArrayPositions) rv[i] = tv } - return rv, newFieldRows + return rv, rows } func (f *Firestorm) storeField(docID []byte, docNum uint64, field document.Field, fieldIndex uint16) index.IndexRow { diff --git a/index/firestorm/analysis_test.go b/index/firestorm/analysis_test.go index 8d975ee3..4fe0c775 100644 --- a/index/firestorm/analysis_test.go +++ b/index/firestorm/analysis_test.go @@ -78,8 +78,8 @@ func TestAnalysis(t *testing.T) { r: &index.AnalysisResult{ DocID: "a", Rows: []index.IndexRow{ - NewFieldRow(1, "name"), NewTermFreqRow(0, nil, []byte("a"), 1, 0, 0.0, nil), + NewFieldRow(1, "name"), NewStoredRow([]byte("a"), 1, 1, nil, []byte("ttest")), NewTermFreqRow(1, []byte("test"), []byte("a"), 1, 1, 1.0, []*TermVector{NewTermVector(1, 1, 0, 4, nil)}), }, From 1c5b84911de28ea8b5dfa1bdab5cf49175bcffd8 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Thu, 31 Dec 2015 11:07:59 -0800 Subject: [PATCH 15/17] firestorm DictUpdater NotifyBatch is more async --- index/firestorm/dict_updater.go | 27 ++++++++++++++++++++++----- index/firestorm/dict_updater_test.go | 10 ++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/index/firestorm/dict_updater.go b/index/firestorm/dict_updater.go index 38a5e59a..74c66506 100644 --- a/index/firestorm/dict_updater.go +++ b/index/firestorm/dict_updater.go @@ -25,6 +25,7 @@ type DictUpdater struct { f *Firestorm dictUpdateSleep time.Duration quit chan struct{} + incoming chan map[string]int64 mutex sync.RWMutex workingSet map[string]int64 @@ -41,6 +42,7 @@ func NewDictUpdater(f *Firestorm) *DictUpdater { workingSet: make(map[string]int64), batchesStarted: 1, quit: make(chan struct{}), + incoming: make(chan map[string]int64, 8), } return &rv } @@ -52,15 +54,12 @@ func (d *DictUpdater) Notify(term string, usage int64) { } func (d *DictUpdater) NotifyBatch(termUsages map[string]int64) { - d.mutex.Lock() - defer d.mutex.Unlock() - for term, usage := range termUsages { - d.workingSet[term] += usage - } + d.incoming <- termUsages } func (d *DictUpdater) Start() { d.closeWait.Add(1) + go d.runIncoming() go d.run() } @@ -69,6 +68,24 @@ func (d *DictUpdater) Stop() { d.closeWait.Wait() } +func (d *DictUpdater) runIncoming() { + for { + select { + case <-d.quit: + return + case termUsages, ok := <-d.incoming: + if !ok { + return + } + d.mutex.Lock() + for term, usage := range termUsages { + d.workingSet[term] += usage + } + d.mutex.Unlock() + } + } +} + func (d *DictUpdater) run() { tick := time.Tick(d.dictUpdateSleep) for { diff --git a/index/firestorm/dict_updater_test.go b/index/firestorm/dict_updater_test.go index 9ad5c303..659f8096 100644 --- a/index/firestorm/dict_updater_test.go +++ b/index/firestorm/dict_updater_test.go @@ -10,6 +10,7 @@ package firestorm import ( + "runtime" "testing" "github.com/blevesearch/bleve/index" @@ -38,6 +39,9 @@ func TestDictUpdater(t *testing.T) { f.(*Firestorm).dictUpdater.NotifyBatch(dictBatch) // invoke updater manually + for len(f.(*Firestorm).dictUpdater.incoming) > 0 { + runtime.Gosched() + } f.(*Firestorm).dictUpdater.update() // assert that dictionary rows are correct @@ -77,6 +81,9 @@ func TestDictUpdater(t *testing.T) { f.(*Firestorm).dictUpdater.NotifyBatch(dictBatch) // invoke updater manually + for len(f.(*Firestorm).dictUpdater.incoming) > 0 { + runtime.Gosched() + } f.(*Firestorm).dictUpdater.update() // assert that dictionary rows are correct @@ -116,6 +123,9 @@ func TestDictUpdater(t *testing.T) { f.(*Firestorm).dictUpdater.NotifyBatch(dictBatch) // invoke updater manually + for len(f.(*Firestorm).dictUpdater.incoming) > 0 { + runtime.Gosched() + } f.(*Firestorm).dictUpdater.update() // assert that dictionary rows are correct From fb8c9a7475ee465625c3553886e240a4c90d9c4d Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Fri, 1 Jan 2016 22:25:34 -0800 Subject: [PATCH 16/17] firestorm.Batch() collects [][]IndexRows instead of []IndexRow Rather than append() all received rows into a flat []IndexRow during the result gathering loop, this change instead collects the analysis result rows into a [][]IndexRow, which avoids extra copying. As part of this, firestorm batchRows() now takes the [][]IndexRow as its input. --- index/firestorm/firestorm.go | 83 ++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 37 deletions(-) diff --git a/index/firestorm/firestorm.go b/index/firestorm/firestorm.go index 662646ca..c76b445f 100644 --- a/index/firestorm/firestorm.go +++ b/index/firestorm/firestorm.go @@ -167,7 +167,7 @@ func (f *Firestorm) Update(doc *document.Document) (err error) { }() var dictionaryDeltas map[string]int64 - dictionaryDeltas, err = f.batchRows(kvwriter, result.Rows, nil) + dictionaryDeltas, err = f.batchRows(kvwriter, [][]index.IndexRow{result.Rows}, nil) if err != nil { _ = kvwriter.Close() atomic.AddUint64(&f.stats.errors, 1) @@ -190,7 +190,7 @@ func (f *Firestorm) Delete(id string) error { return nil } -func (f *Firestorm) batchRows(writer store.KVWriter, rows []index.IndexRow, deleteKeys [][]byte) (map[string]int64, error) { +func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexRow, deleteKeys [][]byte) (map[string]int64, error) { // prepare batch wb := writer.NewBatch() @@ -206,33 +206,36 @@ func (f *Firestorm) batchRows(writer store.KVWriter, rows []index.IndexRow, dele } dictionaryDeltas := make(map[string]int64) - for _, row := range rows { - tfr, ok := row.(*TermFreqRow) - if ok { - if tfr.Field() != 0 { - kbuf = prepareBuf(kbuf, tfr.DictionaryRowKeySize()) - klen, err := tfr.DictionaryRowKeyTo(kbuf) - if err != nil { - return nil, err + + for _, rows := range rowsOfRows { + for _, row := range rows { + tfr, ok := row.(*TermFreqRow) + if ok { + if tfr.Field() != 0 { + kbuf = prepareBuf(kbuf, tfr.DictionaryRowKeySize()) + klen, err := tfr.DictionaryRowKeyTo(kbuf) + if err != nil { + return nil, err + } + + dictionaryDeltas[string(kbuf[0:klen])] += 1 } - - dictionaryDeltas[string(kbuf[0:klen])] += 1 } - } - kbuf = prepareBuf(kbuf, row.KeySize()) - klen, err := row.KeyTo(kbuf) - if err != nil { - return nil, err - } + kbuf = prepareBuf(kbuf, row.KeySize()) + klen, err := row.KeyTo(kbuf) + if err != nil { + return nil, err + } - vbuf = prepareBuf(vbuf, row.ValueSize()) - vlen, err := row.ValueTo(vbuf) - if err != nil { - return nil, err - } + vbuf = prepareBuf(vbuf, row.ValueSize()) + vlen, err := row.ValueTo(vbuf) + if err != nil { + return nil, err + } - wb.Set(kbuf[0:klen], vbuf[0:vlen]) + wb.Set(kbuf[0:klen], vbuf[0:vlen]) + } } for _, dk := range deleteKeys { @@ -291,12 +294,14 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { } }() - allRows := make([]index.IndexRow, 0, 1000) + // extra 1 capacity for internal updates. + collectRows := make([][]index.IndexRow, 0, docsUpdated+1) + // wait for the result var itemsDeQueued uint64 for itemsDeQueued < docsUpdated { result := <-resultChan - allRows = append(allRows, result.Rows...) + collectRows = append(collectRows, result.Rows) itemsDeQueued++ } close(resultChan) @@ -309,17 +314,21 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { atomic.AddUint64(&f.stats.analysisTime, uint64(time.Since(analysisStart))) - deleteKeys := make([][]byte, 0) - // add the internal ops - for internalKey, internalValue := range batch.InternalOps { - if internalValue == nil { - // delete - deleteInternalRow := NewInternalRow([]byte(internalKey), nil) - deleteKeys = append(deleteKeys, deleteInternalRow.Key()) - } else { - updateInternalRow := NewInternalRow([]byte(internalKey), internalValue) - allRows = append(allRows, updateInternalRow) + var deleteKeys [][]byte + if len(batch.InternalOps) > 0 { + // add the internal ops + updateInternalRows := make([]index.IndexRow, 0, len(batch.InternalOps)) + for internalKey, internalValue := range batch.InternalOps { + if internalValue == nil { + // delete + deleteInternalRow := NewInternalRow([]byte(internalKey), nil) + deleteKeys = append(deleteKeys, deleteInternalRow.Key()) + } else { + updateInternalRow := NewInternalRow([]byte(internalKey), internalValue) + updateInternalRows = append(updateInternalRows, updateInternalRow) + } } + collectRows = append(collectRows, updateInternalRows) } inflightItems := make([]*InFlightItem, 0, len(batch.IndexOps)) @@ -342,7 +351,7 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { } var dictionaryDeltas map[string]int64 - dictionaryDeltas, err = f.batchRows(kvwriter, allRows, deleteKeys) + dictionaryDeltas, err = f.batchRows(kvwriter, collectRows, deleteKeys) if err != nil { _ = kvwriter.Close() atomic.AddUint64(&f.stats.errors, 1) From 70b7e73c826c59064b54be322b7bf66e660e0f5e Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Sun, 3 Jan 2016 10:20:56 -0800 Subject: [PATCH 17/17] firestorm compensator inFlight.Get() might return nil --- index/firestorm/comp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index/firestorm/comp.go b/index/firestorm/comp.go index a4138823..5fad468d 100644 --- a/index/firestorm/comp.go +++ b/index/firestorm/comp.go @@ -106,7 +106,7 @@ func (c *Compensator) Migrate(docID []byte, docNum uint64, oldDocNums []uint64) // remove entry from in-flight if it still has same doc num val := c.inFlight.Get(&InFlightItem{docID: docID}) - if val.(*InFlightItem).docNum == docNum { + if val != nil && val.(*InFlightItem).docNum == docNum { c.inFlight = c.inFlight.Delete(&InFlightItem{docID: docID}) } }