From bb5cd8f3d616ebb4038d54677db02eadf06e4529 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Sun, 10 Jan 2016 18:47:09 -0800 Subject: [PATCH] upside_down merge backIndexRow concurrently Previously, the code would gather all the backIndexRows before processing them. This change instead merges the backIndexRows concurrently on the theory that we might as well make progress on compute & processing tasks while waiting for the rest of the back index rows to be fetched from the KVStore. --- index/upside_down/upside_down.go | 83 +++++++++++++++----------------- 1 file changed, 38 insertions(+), 45 deletions(-) diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index 4731e7ed..440d62e6 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -60,6 +60,12 @@ type UpsideDownCouch struct { writeMutex sync.Mutex } +type docBackIndexRow struct { + docID string + doc *document.Document // If deletion, doc will be nil. + backIndexRow *BackIndexRow +} + func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) { return &UpsideDownCouch{ version: Version, @@ -635,20 +641,6 @@ func (udc *UpsideDownCouch) backIndexRowForDoc(kvreader store.KVReader, docID st return backIndexRow, nil } -func (udc *UpsideDownCouch) backIndexRowsForBatch(kvreader store.KVReader, batch *index.Batch) (map[string]*BackIndexRow, error) { - // FIXME faster to order the ids and scan sequentially - // for now just get it working - rv := make(map[string]*BackIndexRow, 0) - for docID := range batch.IndexOps { - backIndexRow, err := udc.backIndexRowForDoc(kvreader, docID) - if err != nil { - return nil, err - } - rv[docID] = backIndexRow - } - return rv, nil -} - func decodeFieldType(typ byte, name string, pos []uint64, value []byte) document.Field { switch typ { case 't': @@ -742,33 +734,36 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { }() // retrieve back index rows concurrent with analysis - var backIndexRows map[string]*BackIndexRow - backindexReaderCh := make(chan error) + docBackIndexRowErr := error(nil) + docBackIndexRowCh := make(chan *docBackIndexRow, len(batch.IndexOps)) udc.writeMutex.Lock() defer udc.writeMutex.Unlock() go func() { - defer close(backindexReaderCh) + defer close(docBackIndexRowCh) // open a reader for backindex lookup var kvreader store.KVReader kvreader, err = udc.store.Reader() if err != nil { - backindexReaderCh <- err + docBackIndexRowErr = err return } - backIndexRows, err = udc.backIndexRowsForBatch(kvreader, batch) - if err != nil { - _ = kvreader.Close() - backindexReaderCh <- err - return + for docID, doc := range batch.IndexOps { + backIndexRow, err := udc.backIndexRowForDoc(kvreader, docID) + if err != nil { + docBackIndexRowErr = err + return + } + + docBackIndexRowCh <- &docBackIndexRow{docID, doc, backIndexRow} } err = kvreader.Close() if err != nil { - backindexReaderCh <- err + docBackIndexRowErr = err return } }() @@ -785,20 +780,6 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart))) - // wait for back index rows - backindexReaderErr := <-backindexReaderCh - if backindexReaderErr != nil { - return backindexReaderErr - } - - detectedUnsafeMutex.RLock() - defer detectedUnsafeMutex.RUnlock() - if detectedUnsafe { - return UnsafeBatchUseDetected - } - - indexStart := time.Now() - // prepare a list of rows addRows := make([]UpsideDownCouchRow, 0) updateRows := make([]UpsideDownCouchRow, 0) @@ -806,20 +787,32 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { docsAdded := uint64(0) docsDeleted := uint64(0) - for docID, doc := range batch.IndexOps { - backIndexRow := backIndexRows[docID] - if doc == nil && backIndexRow != nil { + + indexStart := time.Now() + + for dbir := range docBackIndexRowCh { + if dbir.doc == nil && dbir.backIndexRow != nil { // delete - deleteRows = udc.deleteSingle(docID, backIndexRow, deleteRows) + deleteRows = udc.deleteSingle(dbir.docID, dbir.backIndexRow, deleteRows) docsDeleted++ - } else if doc != nil { - addRows, updateRows, deleteRows = udc.mergeOldAndNew(backIndexRow, newRowsMap[docID], addRows, updateRows, deleteRows) - if backIndexRow == nil { + } else if dbir.doc != nil { + addRows, updateRows, deleteRows = udc.mergeOldAndNew(dbir.backIndexRow, newRowsMap[dbir.docID], addRows, updateRows, deleteRows) + if dbir.backIndexRow == nil { docsAdded++ } } } + if docBackIndexRowErr != nil { + return docBackIndexRowErr + } + + detectedUnsafeMutex.RLock() + defer detectedUnsafeMutex.RUnlock() + if detectedUnsafe { + return UnsafeBatchUseDetected + } + // add the internal ops for internalKey, internalValue := range batch.InternalOps { if internalValue == nil {