0
0

upside_down merge backIndexRow concurrently

Previously, the code would gather all the backIndexRows before
processing them.  This change instead merges the backIndexRows
concurrently on the theory that we might as well make progress on
compute & processing tasks while waiting for the rest of the back
index rows to be fetched from the KVStore.
This commit is contained in:
Steve Yen 2016-01-10 18:47:09 -08:00
parent c3b5246b0c
commit bb5cd8f3d6

View File

@ -60,6 +60,12 @@ type UpsideDownCouch struct {
writeMutex sync.Mutex
}
type docBackIndexRow struct {
docID string
doc *document.Document // If deletion, doc will be nil.
backIndexRow *BackIndexRow
}
func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
return &UpsideDownCouch{
version: Version,
@ -635,20 +641,6 @@ func (udc *UpsideDownCouch) backIndexRowForDoc(kvreader store.KVReader, docID st
return backIndexRow, nil
}
func (udc *UpsideDownCouch) backIndexRowsForBatch(kvreader store.KVReader, batch *index.Batch) (map[string]*BackIndexRow, error) {
// FIXME faster to order the ids and scan sequentially
// for now just get it working
rv := make(map[string]*BackIndexRow, 0)
for docID := range batch.IndexOps {
backIndexRow, err := udc.backIndexRowForDoc(kvreader, docID)
if err != nil {
return nil, err
}
rv[docID] = backIndexRow
}
return rv, nil
}
func decodeFieldType(typ byte, name string, pos []uint64, value []byte) document.Field {
switch typ {
case 't':
@ -742,33 +734,36 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
}()
// retrieve back index rows concurrent with analysis
var backIndexRows map[string]*BackIndexRow
backindexReaderCh := make(chan error)
docBackIndexRowErr := error(nil)
docBackIndexRowCh := make(chan *docBackIndexRow, len(batch.IndexOps))
udc.writeMutex.Lock()
defer udc.writeMutex.Unlock()
go func() {
defer close(backindexReaderCh)
defer close(docBackIndexRowCh)
// open a reader for backindex lookup
var kvreader store.KVReader
kvreader, err = udc.store.Reader()
if err != nil {
backindexReaderCh <- err
docBackIndexRowErr = err
return
}
backIndexRows, err = udc.backIndexRowsForBatch(kvreader, batch)
if err != nil {
_ = kvreader.Close()
backindexReaderCh <- err
return
for docID, doc := range batch.IndexOps {
backIndexRow, err := udc.backIndexRowForDoc(kvreader, docID)
if err != nil {
docBackIndexRowErr = err
return
}
docBackIndexRowCh <- &docBackIndexRow{docID, doc, backIndexRow}
}
err = kvreader.Close()
if err != nil {
backindexReaderCh <- err
docBackIndexRowErr = err
return
}
}()
@ -785,20 +780,6 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart)))
// wait for back index rows
backindexReaderErr := <-backindexReaderCh
if backindexReaderErr != nil {
return backindexReaderErr
}
detectedUnsafeMutex.RLock()
defer detectedUnsafeMutex.RUnlock()
if detectedUnsafe {
return UnsafeBatchUseDetected
}
indexStart := time.Now()
// prepare a list of rows
addRows := make([]UpsideDownCouchRow, 0)
updateRows := make([]UpsideDownCouchRow, 0)
@ -806,20 +787,32 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
docsAdded := uint64(0)
docsDeleted := uint64(0)
for docID, doc := range batch.IndexOps {
backIndexRow := backIndexRows[docID]
if doc == nil && backIndexRow != nil {
indexStart := time.Now()
for dbir := range docBackIndexRowCh {
if dbir.doc == nil && dbir.backIndexRow != nil {
// delete
deleteRows = udc.deleteSingle(docID, backIndexRow, deleteRows)
deleteRows = udc.deleteSingle(dbir.docID, dbir.backIndexRow, deleteRows)
docsDeleted++
} else if doc != nil {
addRows, updateRows, deleteRows = udc.mergeOldAndNew(backIndexRow, newRowsMap[docID], addRows, updateRows, deleteRows)
if backIndexRow == nil {
} else if dbir.doc != nil {
addRows, updateRows, deleteRows = udc.mergeOldAndNew(dbir.backIndexRow, newRowsMap[dbir.docID], addRows, updateRows, deleteRows)
if dbir.backIndexRow == nil {
docsAdded++
}
}
}
if docBackIndexRowErr != nil {
return docBackIndexRowErr
}
detectedUnsafeMutex.RLock()
defer detectedUnsafeMutex.RUnlock()
if detectedUnsafe {
return UnsafeBatchUseDetected
}
// add the internal ops
for internalKey, internalValue := range batch.InternalOps {
if internalValue == nil {