diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index dc20d48b..4731e7ed 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -710,6 +710,7 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) [] func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { analysisStart := time.Now() + resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps)) var numUpdates uint64 @@ -740,12 +741,13 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } }() - udc.writeMutex.Lock() - defer udc.writeMutex.Unlock() - + // retrieve back index rows concurrent with analysis var backIndexRows map[string]*BackIndexRow backindexReaderCh := make(chan error) + udc.writeMutex.Lock() + defer udc.writeMutex.Unlock() + go func() { defer close(backindexReaderCh) @@ -757,7 +759,6 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { return } - // first lookup all the back index rows backIndexRows, err = udc.backIndexRowsForBatch(kvreader, batch) if err != nil { _ = kvreader.Close() @@ -772,7 +773,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } }() - // wait for the result + // wait for analysis result newRowsMap := make(map[string][]index.IndexRow) var itemsDeQueued uint64 for itemsDeQueued < numUpdates { @@ -782,6 +783,9 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } close(resultChan) + atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart))) + + // wait for back index rows backindexReaderErr := <-backindexReaderCh if backindexReaderErr != nil { return backindexReaderErr @@ -793,8 +797,6 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { return UnsafeBatchUseDetected } - atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart))) - indexStart := time.Now() // prepare a list of rows @@ -845,6 +847,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } err = kvwriter.Close() + atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart))) if err == nil {