upside_down retrieves backindex concurrently with analysis
Start backindex reading concurrently with analysi to try to utilize more I/O bandwidth. The analysis time vs indexing time stats tracking are also now "off", since there's now concurrency between those actiivties. One tradeoff is that the lock area in upside_down Batch() is increased as part of this change.
This commit is contained in:
parent
bff95eef70
commit
d3dd40d334
|
@ -710,7 +710,7 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) []
|
||||||
|
|
||||||
func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
||||||
analysisStart := time.Now()
|
analysisStart := time.Now()
|
||||||
resultChan := make(chan *index.AnalysisResult)
|
resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
|
||||||
|
|
||||||
var numUpdates uint64
|
var numUpdates uint64
|
||||||
for _, doc := range batch.IndexOps {
|
for _, doc := range batch.IndexOps {
|
||||||
|
@ -740,8 +740,40 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
newRowsMap := make(map[string][]index.IndexRow)
|
udc.writeMutex.Lock()
|
||||||
|
defer udc.writeMutex.Unlock()
|
||||||
|
|
||||||
|
var backIndexRows map[string]*BackIndexRow
|
||||||
|
backindexReaderCh := make(chan error)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(backindexReaderCh)
|
||||||
|
|
||||||
|
// open a reader for backindex lookup
|
||||||
|
var kvreader store.KVReader
|
||||||
|
kvreader, err = udc.store.Reader()
|
||||||
|
if err != nil {
|
||||||
|
backindexReaderCh <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// first lookup all the back index rows
|
||||||
|
backIndexRows, err = udc.backIndexRowsForBatch(kvreader, batch)
|
||||||
|
if err != nil {
|
||||||
|
_ = kvreader.Close()
|
||||||
|
backindexReaderCh <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = kvreader.Close()
|
||||||
|
if err != nil {
|
||||||
|
backindexReaderCh <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// wait for the result
|
// wait for the result
|
||||||
|
newRowsMap := make(map[string][]index.IndexRow)
|
||||||
var itemsDeQueued uint64
|
var itemsDeQueued uint64
|
||||||
for itemsDeQueued < numUpdates {
|
for itemsDeQueued < numUpdates {
|
||||||
result := <-resultChan
|
result := <-resultChan
|
||||||
|
@ -750,6 +782,11 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
||||||
}
|
}
|
||||||
close(resultChan)
|
close(resultChan)
|
||||||
|
|
||||||
|
backindexReaderErr := <-backindexReaderCh
|
||||||
|
if backindexReaderErr != nil {
|
||||||
|
return backindexReaderErr
|
||||||
|
}
|
||||||
|
|
||||||
detectedUnsafeMutex.RLock()
|
detectedUnsafeMutex.RLock()
|
||||||
defer detectedUnsafeMutex.RUnlock()
|
defer detectedUnsafeMutex.RUnlock()
|
||||||
if detectedUnsafe {
|
if detectedUnsafe {
|
||||||
|
@ -760,36 +797,6 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
||||||
|
|
||||||
indexStart := time.Now()
|
indexStart := time.Now()
|
||||||
|
|
||||||
udc.writeMutex.Lock()
|
|
||||||
defer udc.writeMutex.Unlock()
|
|
||||||
|
|
||||||
// open a reader for backindex lookup
|
|
||||||
var kvreader store.KVReader
|
|
||||||
kvreader, err = udc.store.Reader()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// first lookup all the back index rows
|
|
||||||
var backIndexRows map[string]*BackIndexRow
|
|
||||||
backIndexRows, err = udc.backIndexRowsForBatch(kvreader, batch)
|
|
||||||
if err != nil {
|
|
||||||
_ = kvreader.Close()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err = kvreader.Close()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// start a writer for this batch
|
|
||||||
var kvwriter store.KVWriter
|
|
||||||
kvwriter, err = udc.store.Writer()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// prepare a list of rows
|
// prepare a list of rows
|
||||||
addRows := make([]UpsideDownCouchRow, 0)
|
addRows := make([]UpsideDownCouchRow, 0)
|
||||||
updateRows := make([]UpsideDownCouchRow, 0)
|
updateRows := make([]UpsideDownCouchRow, 0)
|
||||||
|
@ -823,6 +830,13 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// start a writer for this batch
|
||||||
|
var kvwriter store.KVWriter
|
||||||
|
kvwriter, err = udc.store.Writer()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows)
|
err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = kvwriter.Close()
|
_ = kvwriter.Close()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user