From bff95eef70c14c94ca952d9ff708bb90680cb2de Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Sun, 10 Jan 2016 09:58:24 -0800 Subject: [PATCH 1/7] firestorm close kvwriter sooner --- index/firestorm/firestorm.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/index/firestorm/firestorm.go b/index/firestorm/firestorm.go index 7726f1ad..73633f0b 100644 --- a/index/firestorm/firestorm.go +++ b/index/firestorm/firestorm.go @@ -346,6 +346,7 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { } indexStart := time.Now() + // start a writer for this batch var kvwriter store.KVWriter kvwriter, err = f.store.Writer() @@ -362,10 +363,12 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { } f.compensator.MutateBatch(inflightItems, lastDocNumber) + + err = kvwriter.Close() + f.lookuper.NotifyBatch(inflightItems) f.dictUpdater.NotifyBatch(dictionaryDeltas) - err = kvwriter.Close() atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart))) if err == nil { From d3dd40d334bf9a1b24f26d5a2badfa96de0fcba5 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Sun, 10 Jan 2016 09:41:12 -0800 Subject: [PATCH 2/7] upside_down retrieves backindex concurrently with analysis Start backindex reading concurrently with analysi to try to utilize more I/O bandwidth. The analysis time vs indexing time stats tracking are also now "off", since there's now concurrency between those actiivties. One tradeoff is that the lock area in upside_down Batch() is increased as part of this change. --- index/upside_down/upside_down.go | 78 +++++++++++++++++++------------- 1 file changed, 46 insertions(+), 32 deletions(-) diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index 549da868..dc20d48b 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -710,7 +710,7 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) [] func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { analysisStart := time.Now() - resultChan := make(chan *index.AnalysisResult) + resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps)) var numUpdates uint64 for _, doc := range batch.IndexOps { @@ -740,8 +740,40 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } }() - newRowsMap := make(map[string][]index.IndexRow) + udc.writeMutex.Lock() + defer udc.writeMutex.Unlock() + + var backIndexRows map[string]*BackIndexRow + backindexReaderCh := make(chan error) + + go func() { + defer close(backindexReaderCh) + + // open a reader for backindex lookup + var kvreader store.KVReader + kvreader, err = udc.store.Reader() + if err != nil { + backindexReaderCh <- err + return + } + + // first lookup all the back index rows + backIndexRows, err = udc.backIndexRowsForBatch(kvreader, batch) + if err != nil { + _ = kvreader.Close() + backindexReaderCh <- err + return + } + + err = kvreader.Close() + if err != nil { + backindexReaderCh <- err + return + } + }() + // wait for the result + newRowsMap := make(map[string][]index.IndexRow) var itemsDeQueued uint64 for itemsDeQueued < numUpdates { result := <-resultChan @@ -750,6 +782,11 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } close(resultChan) + backindexReaderErr := <-backindexReaderCh + if backindexReaderErr != nil { + return backindexReaderErr + } + detectedUnsafeMutex.RLock() defer detectedUnsafeMutex.RUnlock() if detectedUnsafe { @@ -760,36 +797,6 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { indexStart := time.Now() - udc.writeMutex.Lock() - defer udc.writeMutex.Unlock() - - // open a reader for backindex lookup - var kvreader store.KVReader - kvreader, err = udc.store.Reader() - if err != nil { - return - } - - // first lookup all the back index rows - var backIndexRows map[string]*BackIndexRow - backIndexRows, err = udc.backIndexRowsForBatch(kvreader, batch) - if err != nil { - _ = kvreader.Close() - return - } - - err = kvreader.Close() - if err != nil { - return - } - - // start a writer for this batch - var kvwriter store.KVWriter - kvwriter, err = udc.store.Writer() - if err != nil { - return - } - // prepare a list of rows addRows := make([]UpsideDownCouchRow, 0) updateRows := make([]UpsideDownCouchRow, 0) @@ -823,6 +830,13 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } } + // start a writer for this batch + var kvwriter store.KVWriter + kvwriter, err = udc.store.Writer() + if err != nil { + return + } + err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows) if err != nil { _ = kvwriter.Close() From c3b5246b0c09ecfe05bf7db683da677c6bd5ed30 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Sun, 10 Jan 2016 15:36:54 -0800 Subject: [PATCH 3/7] upside_down track analysis time tighter; and comments --- index/upside_down/upside_down.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index dc20d48b..4731e7ed 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -710,6 +710,7 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) [] func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { analysisStart := time.Now() + resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps)) var numUpdates uint64 @@ -740,12 +741,13 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } }() - udc.writeMutex.Lock() - defer udc.writeMutex.Unlock() - + // retrieve back index rows concurrent with analysis var backIndexRows map[string]*BackIndexRow backindexReaderCh := make(chan error) + udc.writeMutex.Lock() + defer udc.writeMutex.Unlock() + go func() { defer close(backindexReaderCh) @@ -757,7 +759,6 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { return } - // first lookup all the back index rows backIndexRows, err = udc.backIndexRowsForBatch(kvreader, batch) if err != nil { _ = kvreader.Close() @@ -772,7 +773,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } }() - // wait for the result + // wait for analysis result newRowsMap := make(map[string][]index.IndexRow) var itemsDeQueued uint64 for itemsDeQueued < numUpdates { @@ -782,6 +783,9 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } close(resultChan) + atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart))) + + // wait for back index rows backindexReaderErr := <-backindexReaderCh if backindexReaderErr != nil { return backindexReaderErr @@ -793,8 +797,6 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { return UnsafeBatchUseDetected } - atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart))) - indexStart := time.Now() // prepare a list of rows @@ -845,6 +847,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } err = kvwriter.Close() + atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart))) if err == nil { From bb5cd8f3d616ebb4038d54677db02eadf06e4529 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Sun, 10 Jan 2016 18:47:09 -0800 Subject: [PATCH 4/7] upside_down merge backIndexRow concurrently Previously, the code would gather all the backIndexRows before processing them. This change instead merges the backIndexRows concurrently on the theory that we might as well make progress on compute & processing tasks while waiting for the rest of the back index rows to be fetched from the KVStore. --- index/upside_down/upside_down.go | 83 +++++++++++++++----------------- 1 file changed, 38 insertions(+), 45 deletions(-) diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index 4731e7ed..440d62e6 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -60,6 +60,12 @@ type UpsideDownCouch struct { writeMutex sync.Mutex } +type docBackIndexRow struct { + docID string + doc *document.Document // If deletion, doc will be nil. + backIndexRow *BackIndexRow +} + func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) { return &UpsideDownCouch{ version: Version, @@ -635,20 +641,6 @@ func (udc *UpsideDownCouch) backIndexRowForDoc(kvreader store.KVReader, docID st return backIndexRow, nil } -func (udc *UpsideDownCouch) backIndexRowsForBatch(kvreader store.KVReader, batch *index.Batch) (map[string]*BackIndexRow, error) { - // FIXME faster to order the ids and scan sequentially - // for now just get it working - rv := make(map[string]*BackIndexRow, 0) - for docID := range batch.IndexOps { - backIndexRow, err := udc.backIndexRowForDoc(kvreader, docID) - if err != nil { - return nil, err - } - rv[docID] = backIndexRow - } - return rv, nil -} - func decodeFieldType(typ byte, name string, pos []uint64, value []byte) document.Field { switch typ { case 't': @@ -742,33 +734,36 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { }() // retrieve back index rows concurrent with analysis - var backIndexRows map[string]*BackIndexRow - backindexReaderCh := make(chan error) + docBackIndexRowErr := error(nil) + docBackIndexRowCh := make(chan *docBackIndexRow, len(batch.IndexOps)) udc.writeMutex.Lock() defer udc.writeMutex.Unlock() go func() { - defer close(backindexReaderCh) + defer close(docBackIndexRowCh) // open a reader for backindex lookup var kvreader store.KVReader kvreader, err = udc.store.Reader() if err != nil { - backindexReaderCh <- err + docBackIndexRowErr = err return } - backIndexRows, err = udc.backIndexRowsForBatch(kvreader, batch) - if err != nil { - _ = kvreader.Close() - backindexReaderCh <- err - return + for docID, doc := range batch.IndexOps { + backIndexRow, err := udc.backIndexRowForDoc(kvreader, docID) + if err != nil { + docBackIndexRowErr = err + return + } + + docBackIndexRowCh <- &docBackIndexRow{docID, doc, backIndexRow} } err = kvreader.Close() if err != nil { - backindexReaderCh <- err + docBackIndexRowErr = err return } }() @@ -785,20 +780,6 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart))) - // wait for back index rows - backindexReaderErr := <-backindexReaderCh - if backindexReaderErr != nil { - return backindexReaderErr - } - - detectedUnsafeMutex.RLock() - defer detectedUnsafeMutex.RUnlock() - if detectedUnsafe { - return UnsafeBatchUseDetected - } - - indexStart := time.Now() - // prepare a list of rows addRows := make([]UpsideDownCouchRow, 0) updateRows := make([]UpsideDownCouchRow, 0) @@ -806,20 +787,32 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { docsAdded := uint64(0) docsDeleted := uint64(0) - for docID, doc := range batch.IndexOps { - backIndexRow := backIndexRows[docID] - if doc == nil && backIndexRow != nil { + + indexStart := time.Now() + + for dbir := range docBackIndexRowCh { + if dbir.doc == nil && dbir.backIndexRow != nil { // delete - deleteRows = udc.deleteSingle(docID, backIndexRow, deleteRows) + deleteRows = udc.deleteSingle(dbir.docID, dbir.backIndexRow, deleteRows) docsDeleted++ - } else if doc != nil { - addRows, updateRows, deleteRows = udc.mergeOldAndNew(backIndexRow, newRowsMap[docID], addRows, updateRows, deleteRows) - if backIndexRow == nil { + } else if dbir.doc != nil { + addRows, updateRows, deleteRows = udc.mergeOldAndNew(dbir.backIndexRow, newRowsMap[dbir.docID], addRows, updateRows, deleteRows) + if dbir.backIndexRow == nil { docsAdded++ } } } + if docBackIndexRowErr != nil { + return docBackIndexRowErr + } + + detectedUnsafeMutex.RLock() + defer detectedUnsafeMutex.RUnlock() + if detectedUnsafe { + return UnsafeBatchUseDetected + } + // add the internal ops for internalKey, internalValue := range batch.InternalOps { if internalValue == nil { From 94273d5fa9230ed2d24c038574358c353b91dd69 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Mon, 11 Jan 2016 16:25:35 -0800 Subject: [PATCH 5/7] upside_down process internal rows earlier With this change, internal rows are processed while we're waiting for backIndex rows to be retrieved. --- index/upside_down/upside_down.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index 440d62e6..e195d676 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -790,6 +790,18 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { indexStart := time.Now() + // add the internal ops + for internalKey, internalValue := range batch.InternalOps { + if internalValue == nil { + // delete + deleteInternalRow := NewInternalRow([]byte(internalKey), nil) + deleteRows = append(deleteRows, deleteInternalRow) + } else { + updateInternalRow := NewInternalRow([]byte(internalKey), internalValue) + updateRows = append(updateRows, updateInternalRow) + } + } + for dbir := range docBackIndexRowCh { if dbir.doc == nil && dbir.backIndexRow != nil { // delete @@ -813,18 +825,6 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { return UnsafeBatchUseDetected } - // add the internal ops - for internalKey, internalValue := range batch.InternalOps { - if internalValue == nil { - // delete - deleteInternalRow := NewInternalRow([]byte(internalKey), nil) - deleteRows = append(deleteRows, deleteInternalRow) - } else { - updateInternalRow := NewInternalRow([]byte(internalKey), internalValue) - updateRows = append(updateRows, updateInternalRow) - } - } - // start a writer for this batch var kvwriter store.KVWriter kvwriter, err = udc.store.Writer() From 7ce7d98cbadeeba22b8ae805a3c0c427a0733d34 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Mon, 11 Jan 2016 16:52:07 -0800 Subject: [PATCH 6/7] upside_down merge dictionary deltas before using batch.Merge() This change performs more dictionary delta incr/decr math in batchRows() instead of in the KVStore ExecuteBatch() machinery. --- index/upside_down/upside_down.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index e195d676..5b66f2be 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -10,6 +10,7 @@ package upside_down import ( + "encoding/binary" "encoding/json" "fmt" "math" @@ -152,6 +153,8 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow // buffer to work with rowBuf := GetRowBuffer() + dictionaryDeltas := make(map[string]int64) + // add for _, row := range addRows { tfr, ok := row.(*TermFrequencyRow) @@ -163,7 +166,7 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow if err != nil { return err } - wb.Merge(rowBuf[:dictKeySize], dictionaryTermIncr) + dictionaryDeltas[string(rowBuf[:dictKeySize])] += 1 } if row.KeySize()+row.ValueSize() > len(rowBuf) { rowBuf = make([]byte, row.KeySize()+row.ValueSize()) @@ -204,7 +207,7 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow if err != nil { return err } - wb.Merge(rowBuf[:dictKeySize], dictionaryTermDecr) + dictionaryDeltas[string(rowBuf[:dictKeySize])] -= 1 } if row.KeySize()+row.ValueSize() > len(rowBuf) { rowBuf = make([]byte, row.KeySize()+row.ValueSize()) @@ -216,6 +219,15 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow wb.Delete(rowBuf[:keySize]) } + if 8 > len(rowBuf) { + rowBuf = make([]byte, 8) + } + + for dictRowKey, delta := range dictionaryDeltas { + binary.LittleEndian.PutUint64(rowBuf, uint64(delta)) + wb.Merge([]byte(dictRowKey), rowBuf[0:8]) + } + PutRowBuffer(rowBuf) // write out the batch From 0e72b949b39475c17e1f93af5710c184a0ca8c54 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Mon, 11 Jan 2016 18:11:21 -0800 Subject: [PATCH 7/7] upside_down batchRows() takes array of arrays In order to spend less time in append(), this change in upside_down (similar to another recent performance change in firestorm) builds up an array of arrays as the eventual input to batchRows(). --- index/upside_down/upside_down.go | 186 +++++++++++++++++++------------ 1 file changed, 115 insertions(+), 71 deletions(-) diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index 5b66f2be..ed5a3d6c 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -79,13 +79,12 @@ func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, an } func (udc *UpsideDownCouch) init(kvwriter store.KVWriter) (err error) { - // prepare a list of rows - rows := make([]UpsideDownCouchRow, 0) - // version marker - rows = append(rows, NewVersionRow(udc.version)) + rowsAll := [][]UpsideDownCouchRow{ + []UpsideDownCouchRow{NewVersionRow(udc.version)}, + } - err = udc.batchRows(kvwriter, nil, rows, nil) + err = udc.batchRows(kvwriter, nil, rowsAll, nil) return } @@ -142,7 +141,7 @@ func PutRowBuffer(buf []byte) { rowBufferPool.Put(buf) } -func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) (err error) { +func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]UpsideDownCouchRow, updateRowsAll [][]UpsideDownCouchRow, deleteRowsAll [][]UpsideDownCouchRow) (err error) { // prepare batch wb := writer.NewBatch() @@ -156,73 +155,78 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow dictionaryDeltas := make(map[string]int64) // add - for _, row := range addRows { - tfr, ok := row.(*TermFrequencyRow) - if ok { - if tfr.DictionaryRowKeySize() > len(rowBuf) { - rowBuf = make([]byte, tfr.DictionaryRowKeySize()) + for _, addRows := range addRowsAll { + for _, row := range addRows { + tfr, ok := row.(*TermFrequencyRow) + if ok { + if tfr.DictionaryRowKeySize() > len(rowBuf) { + rowBuf = make([]byte, tfr.DictionaryRowKeySize()) + } + dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf) + if err != nil { + return err + } + dictionaryDeltas[string(rowBuf[:dictKeySize])] += 1 } - dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf) + if row.KeySize()+row.ValueSize() > len(rowBuf) { + rowBuf = make([]byte, row.KeySize()+row.ValueSize()) + } + keySize, err := row.KeyTo(rowBuf) if err != nil { return err } - dictionaryDeltas[string(rowBuf[:dictKeySize])] += 1 + valSize, err := row.ValueTo(rowBuf[keySize:]) + wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize]) } - if row.KeySize()+row.ValueSize() > len(rowBuf) { - rowBuf = make([]byte, row.KeySize()+row.ValueSize()) - } - keySize, err := row.KeyTo(rowBuf) - if err != nil { - return err - } - valSize, err := row.ValueTo(rowBuf[keySize:]) - wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize]) } // update - for _, row := range updateRows { - if row.KeySize()+row.ValueSize() > len(rowBuf) { - rowBuf = make([]byte, row.KeySize()+row.ValueSize()) - } - keySize, err := row.KeyTo(rowBuf) - if err != nil { - return err - } - valSize, err := row.ValueTo(rowBuf[keySize:]) - if err != nil { - return err - } - wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize]) - } - - // delete - for _, row := range deleteRows { - tfr, ok := row.(*TermFrequencyRow) - if ok { - // need to decrement counter - if tfr.DictionaryRowKeySize() > len(rowBuf) { - rowBuf = make([]byte, tfr.DictionaryRowKeySize()) + for _, updateRows := range updateRowsAll { + for _, row := range updateRows { + if row.KeySize()+row.ValueSize() > len(rowBuf) { + rowBuf = make([]byte, row.KeySize()+row.ValueSize()) } - dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf) + keySize, err := row.KeyTo(rowBuf) if err != nil { return err } - dictionaryDeltas[string(rowBuf[:dictKeySize])] -= 1 + valSize, err := row.ValueTo(rowBuf[keySize:]) + if err != nil { + return err + } + wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize]) } - if row.KeySize()+row.ValueSize() > len(rowBuf) { - rowBuf = make([]byte, row.KeySize()+row.ValueSize()) + } + + // delete + for _, deleteRows := range deleteRowsAll { + for _, row := range deleteRows { + tfr, ok := row.(*TermFrequencyRow) + if ok { + // need to decrement counter + if tfr.DictionaryRowKeySize() > len(rowBuf) { + rowBuf = make([]byte, tfr.DictionaryRowKeySize()) + } + dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf) + if err != nil { + return err + } + dictionaryDeltas[string(rowBuf[:dictKeySize])] -= 1 + } + if row.KeySize()+row.ValueSize() > len(rowBuf) { + rowBuf = make([]byte, row.KeySize()+row.ValueSize()) + } + keySize, err := row.KeyTo(rowBuf) + if err != nil { + return err + } + wb.Delete(rowBuf[:keySize]) } - keySize, err := row.KeyTo(rowBuf) - if err != nil { - return err - } - wb.Delete(rowBuf[:keySize]) } if 8 > len(rowBuf) { rowBuf = make([]byte, 8) } - for dictRowKey, delta := range dictionaryDeltas { binary.LittleEndian.PutUint64(rowBuf, uint64(delta)) wb.Merge([]byte(dictRowKey), rowBuf[0:8]) @@ -413,13 +417,22 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) { }() // prepare a list of rows - addRows := make([]UpsideDownCouchRow, 0) - updateRows := make([]UpsideDownCouchRow, 0) - deleteRows := make([]UpsideDownCouchRow, 0) + var addRowsAll [][]UpsideDownCouchRow + var updateRowsAll [][]UpsideDownCouchRow + var deleteRowsAll [][]UpsideDownCouchRow - addRows, updateRows, deleteRows = udc.mergeOldAndNew(backIndexRow, result.Rows, addRows, updateRows, deleteRows) + addRows, updateRows, deleteRows := udc.mergeOldAndNew(backIndexRow, result.Rows) + if len(addRows) > 0 { + addRowsAll = append(addRowsAll, addRows) + } + if len(updateRows) > 0 { + updateRowsAll = append(updateRowsAll, updateRows) + } + if len(deleteRows) > 0 { + deleteRowsAll = append(deleteRowsAll, deleteRows) + } - err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows) + err = udc.batchRows(kvwriter, addRowsAll, updateRowsAll, deleteRowsAll) if err == nil && backIndexRow == nil { udc.m.Lock() udc.docCount++ @@ -434,7 +447,11 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) { return } -func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []index.IndexRow, addRows, updateRows, deleteRows []UpsideDownCouchRow) ([]UpsideDownCouchRow, []UpsideDownCouchRow, []UpsideDownCouchRow) { +func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []index.IndexRow) (addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) { + addRows = make([]UpsideDownCouchRow, 0, len(rows)) + updateRows = make([]UpsideDownCouchRow, 0, len(rows)) + deleteRows = make([]UpsideDownCouchRow, 0, len(rows)) + existingTermKeys := make(map[string]bool) for _, key := range backIndexRow.AllTermKeys() { existingTermKeys[string(key)] = true @@ -588,10 +605,14 @@ func (udc *UpsideDownCouch) Delete(id string) (err error) { } }() - deleteRows := make([]UpsideDownCouchRow, 0) - deleteRows = udc.deleteSingle(id, backIndexRow, deleteRows) + var deleteRowsAll [][]UpsideDownCouchRow - err = udc.batchRows(kvwriter, nil, nil, deleteRows) + deleteRows := udc.deleteSingle(id, backIndexRow, nil) + if len(deleteRows) > 0 { + deleteRowsAll = append(deleteRowsAll, deleteRows) + } + + err = udc.batchRows(kvwriter, nil, nil, deleteRowsAll) if err == nil { udc.m.Lock() udc.docCount-- @@ -792,17 +813,20 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart))) - // prepare a list of rows - addRows := make([]UpsideDownCouchRow, 0) - updateRows := make([]UpsideDownCouchRow, 0) - deleteRows := make([]UpsideDownCouchRow, 0) - docsAdded := uint64(0) docsDeleted := uint64(0) indexStart := time.Now() + // prepare a list of rows + var addRowsAll [][]UpsideDownCouchRow + var updateRowsAll [][]UpsideDownCouchRow + var deleteRowsAll [][]UpsideDownCouchRow + // add the internal ops + var updateRows []UpsideDownCouchRow + var deleteRows []UpsideDownCouchRow + for internalKey, internalValue := range batch.InternalOps { if internalValue == nil { // delete @@ -814,13 +838,33 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } } + if len(updateRows) > 0 { + updateRowsAll = append(updateRowsAll, updateRows) + } + if len(deleteRows) > 0 { + deleteRowsAll = append(deleteRowsAll, deleteRows) + } + + // process back index rows as they arrive for dbir := range docBackIndexRowCh { if dbir.doc == nil && dbir.backIndexRow != nil { // delete - deleteRows = udc.deleteSingle(dbir.docID, dbir.backIndexRow, deleteRows) + deleteRows := udc.deleteSingle(dbir.docID, dbir.backIndexRow, nil) + if len(deleteRows) > 0 { + deleteRowsAll = append(deleteRowsAll, deleteRows) + } docsDeleted++ } else if dbir.doc != nil { - addRows, updateRows, deleteRows = udc.mergeOldAndNew(dbir.backIndexRow, newRowsMap[dbir.docID], addRows, updateRows, deleteRows) + addRows, updateRows, deleteRows := udc.mergeOldAndNew(dbir.backIndexRow, newRowsMap[dbir.docID]) + if len(addRows) > 0 { + addRowsAll = append(addRowsAll, addRows) + } + if len(updateRows) > 0 { + updateRowsAll = append(updateRowsAll, updateRows) + } + if len(deleteRows) > 0 { + deleteRowsAll = append(deleteRowsAll, deleteRows) + } if dbir.backIndexRow == nil { docsAdded++ } @@ -844,7 +888,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { return } - err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows) + err = udc.batchRows(kvwriter, addRowsAll, updateRowsAll, deleteRowsAll) if err != nil { _ = kvwriter.Close() atomic.AddUint64(&udc.stats.errors, 1)