diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index 4731e7ed..440d62e6 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -60,6 +60,12 @@ type UpsideDownCouch struct { writeMutex sync.Mutex } +type docBackIndexRow struct { + docID string + doc *document.Document // If deletion, doc will be nil. + backIndexRow *BackIndexRow +} + func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) { return &UpsideDownCouch{ version: Version, @@ -635,20 +641,6 @@ func (udc *UpsideDownCouch) backIndexRowForDoc(kvreader store.KVReader, docID st return backIndexRow, nil } -func (udc *UpsideDownCouch) backIndexRowsForBatch(kvreader store.KVReader, batch *index.Batch) (map[string]*BackIndexRow, error) { - // FIXME faster to order the ids and scan sequentially - // for now just get it working - rv := make(map[string]*BackIndexRow, 0) - for docID := range batch.IndexOps { - backIndexRow, err := udc.backIndexRowForDoc(kvreader, docID) - if err != nil { - return nil, err - } - rv[docID] = backIndexRow - } - return rv, nil -} - func decodeFieldType(typ byte, name string, pos []uint64, value []byte) document.Field { switch typ { case 't': @@ -742,33 +734,36 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { }() // retrieve back index rows concurrent with analysis - var backIndexRows map[string]*BackIndexRow - backindexReaderCh := make(chan error) + docBackIndexRowErr := error(nil) + docBackIndexRowCh := make(chan *docBackIndexRow, len(batch.IndexOps)) udc.writeMutex.Lock() defer udc.writeMutex.Unlock() go func() { - defer close(backindexReaderCh) + defer close(docBackIndexRowCh) // open a reader for backindex lookup var kvreader store.KVReader kvreader, err = udc.store.Reader() if err != nil { - backindexReaderCh <- err + docBackIndexRowErr = err return } - backIndexRows, err = udc.backIndexRowsForBatch(kvreader, batch) - if err != nil { - _ = kvreader.Close() - backindexReaderCh <- err - return + for docID, doc := range batch.IndexOps { + backIndexRow, err := udc.backIndexRowForDoc(kvreader, docID) + if err != nil { + docBackIndexRowErr = err + return + } + + docBackIndexRowCh <- &docBackIndexRow{docID, doc, backIndexRow} } err = kvreader.Close() if err != nil { - backindexReaderCh <- err + docBackIndexRowErr = err return } }() @@ -785,20 +780,6 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart))) - // wait for back index rows - backindexReaderErr := <-backindexReaderCh - if backindexReaderErr != nil { - return backindexReaderErr - } - - detectedUnsafeMutex.RLock() - defer detectedUnsafeMutex.RUnlock() - if detectedUnsafe { - return UnsafeBatchUseDetected - } - - indexStart := time.Now() - // prepare a list of rows addRows := make([]UpsideDownCouchRow, 0) updateRows := make([]UpsideDownCouchRow, 0) @@ -806,20 +787,32 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { docsAdded := uint64(0) docsDeleted := uint64(0) - for docID, doc := range batch.IndexOps { - backIndexRow := backIndexRows[docID] - if doc == nil && backIndexRow != nil { + + indexStart := time.Now() + + for dbir := range docBackIndexRowCh { + if dbir.doc == nil && dbir.backIndexRow != nil { // delete - deleteRows = udc.deleteSingle(docID, backIndexRow, deleteRows) + deleteRows = udc.deleteSingle(dbir.docID, dbir.backIndexRow, deleteRows) docsDeleted++ - } else if doc != nil { - addRows, updateRows, deleteRows = udc.mergeOldAndNew(backIndexRow, newRowsMap[docID], addRows, updateRows, deleteRows) - if backIndexRow == nil { + } else if dbir.doc != nil { + addRows, updateRows, deleteRows = udc.mergeOldAndNew(dbir.backIndexRow, newRowsMap[dbir.docID], addRows, updateRows, deleteRows) + if dbir.backIndexRow == nil { docsAdded++ } } } + if docBackIndexRowErr != nil { + return docBackIndexRowErr + } + + detectedUnsafeMutex.RLock() + defer detectedUnsafeMutex.RUnlock() + if detectedUnsafe { + return UnsafeBatchUseDetected + } + // add the internal ops for internalKey, internalValue := range batch.InternalOps { if internalValue == nil {