Merge pull request #316 from steveyen/WIP-perf-20160110
upside-down backindex read concurrency
This commit is contained in:
commit
c3ddf038ab
|
@ -346,6 +346,7 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
indexStart := time.Now()
|
indexStart := time.Now()
|
||||||
|
|
||||||
// start a writer for this batch
|
// start a writer for this batch
|
||||||
var kvwriter store.KVWriter
|
var kvwriter store.KVWriter
|
||||||
kvwriter, err = f.store.Writer()
|
kvwriter, err = f.store.Writer()
|
||||||
|
@ -362,10 +363,12 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
f.compensator.MutateBatch(inflightItems, lastDocNumber)
|
f.compensator.MutateBatch(inflightItems, lastDocNumber)
|
||||||
|
|
||||||
|
err = kvwriter.Close()
|
||||||
|
|
||||||
f.lookuper.NotifyBatch(inflightItems)
|
f.lookuper.NotifyBatch(inflightItems)
|
||||||
f.dictUpdater.NotifyBatch(dictionaryDeltas)
|
f.dictUpdater.NotifyBatch(dictionaryDeltas)
|
||||||
|
|
||||||
err = kvwriter.Close()
|
|
||||||
atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart)))
|
atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart)))
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
package upside_down
|
package upside_down
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/binary"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
@ -60,6 +61,12 @@ type UpsideDownCouch struct {
|
||||||
writeMutex sync.Mutex
|
writeMutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type docBackIndexRow struct {
|
||||||
|
docID string
|
||||||
|
doc *document.Document // If deletion, doc will be nil.
|
||||||
|
backIndexRow *BackIndexRow
|
||||||
|
}
|
||||||
|
|
||||||
func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
|
func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
|
||||||
return &UpsideDownCouch{
|
return &UpsideDownCouch{
|
||||||
version: Version,
|
version: Version,
|
||||||
|
@ -72,13 +79,12 @@ func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, an
|
||||||
}
|
}
|
||||||
|
|
||||||
func (udc *UpsideDownCouch) init(kvwriter store.KVWriter) (err error) {
|
func (udc *UpsideDownCouch) init(kvwriter store.KVWriter) (err error) {
|
||||||
// prepare a list of rows
|
|
||||||
rows := make([]UpsideDownCouchRow, 0)
|
|
||||||
|
|
||||||
// version marker
|
// version marker
|
||||||
rows = append(rows, NewVersionRow(udc.version))
|
rowsAll := [][]UpsideDownCouchRow{
|
||||||
|
[]UpsideDownCouchRow{NewVersionRow(udc.version)},
|
||||||
|
}
|
||||||
|
|
||||||
err = udc.batchRows(kvwriter, nil, rows, nil)
|
err = udc.batchRows(kvwriter, nil, rowsAll, nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,7 +141,7 @@ func PutRowBuffer(buf []byte) {
|
||||||
rowBufferPool.Put(buf)
|
rowBufferPool.Put(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) (err error) {
|
func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]UpsideDownCouchRow, updateRowsAll [][]UpsideDownCouchRow, deleteRowsAll [][]UpsideDownCouchRow) (err error) {
|
||||||
|
|
||||||
// prepare batch
|
// prepare batch
|
||||||
wb := writer.NewBatch()
|
wb := writer.NewBatch()
|
||||||
|
@ -146,68 +152,84 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow
|
||||||
// buffer to work with
|
// buffer to work with
|
||||||
rowBuf := GetRowBuffer()
|
rowBuf := GetRowBuffer()
|
||||||
|
|
||||||
|
dictionaryDeltas := make(map[string]int64)
|
||||||
|
|
||||||
// add
|
// add
|
||||||
for _, row := range addRows {
|
for _, addRows := range addRowsAll {
|
||||||
tfr, ok := row.(*TermFrequencyRow)
|
for _, row := range addRows {
|
||||||
if ok {
|
tfr, ok := row.(*TermFrequencyRow)
|
||||||
if tfr.DictionaryRowKeySize() > len(rowBuf) {
|
if ok {
|
||||||
rowBuf = make([]byte, tfr.DictionaryRowKeySize())
|
if tfr.DictionaryRowKeySize() > len(rowBuf) {
|
||||||
|
rowBuf = make([]byte, tfr.DictionaryRowKeySize())
|
||||||
|
}
|
||||||
|
dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
dictionaryDeltas[string(rowBuf[:dictKeySize])] += 1
|
||||||
}
|
}
|
||||||
dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf)
|
if row.KeySize()+row.ValueSize() > len(rowBuf) {
|
||||||
|
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
|
||||||
|
}
|
||||||
|
keySize, err := row.KeyTo(rowBuf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
wb.Merge(rowBuf[:dictKeySize], dictionaryTermIncr)
|
valSize, err := row.ValueTo(rowBuf[keySize:])
|
||||||
|
wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize])
|
||||||
}
|
}
|
||||||
if row.KeySize()+row.ValueSize() > len(rowBuf) {
|
|
||||||
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
|
|
||||||
}
|
|
||||||
keySize, err := row.KeyTo(rowBuf)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
valSize, err := row.ValueTo(rowBuf[keySize:])
|
|
||||||
wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// update
|
// update
|
||||||
for _, row := range updateRows {
|
for _, updateRows := range updateRowsAll {
|
||||||
if row.KeySize()+row.ValueSize() > len(rowBuf) {
|
for _, row := range updateRows {
|
||||||
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
|
if row.KeySize()+row.ValueSize() > len(rowBuf) {
|
||||||
}
|
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
|
||||||
keySize, err := row.KeyTo(rowBuf)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
valSize, err := row.ValueTo(rowBuf[keySize:])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize])
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete
|
|
||||||
for _, row := range deleteRows {
|
|
||||||
tfr, ok := row.(*TermFrequencyRow)
|
|
||||||
if ok {
|
|
||||||
// need to decrement counter
|
|
||||||
if tfr.DictionaryRowKeySize() > len(rowBuf) {
|
|
||||||
rowBuf = make([]byte, tfr.DictionaryRowKeySize())
|
|
||||||
}
|
}
|
||||||
dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf)
|
keySize, err := row.KeyTo(rowBuf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
wb.Merge(rowBuf[:dictKeySize], dictionaryTermDecr)
|
valSize, err := row.ValueTo(rowBuf[keySize:])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize])
|
||||||
}
|
}
|
||||||
if row.KeySize()+row.ValueSize() > len(rowBuf) {
|
}
|
||||||
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
|
|
||||||
|
// delete
|
||||||
|
for _, deleteRows := range deleteRowsAll {
|
||||||
|
for _, row := range deleteRows {
|
||||||
|
tfr, ok := row.(*TermFrequencyRow)
|
||||||
|
if ok {
|
||||||
|
// need to decrement counter
|
||||||
|
if tfr.DictionaryRowKeySize() > len(rowBuf) {
|
||||||
|
rowBuf = make([]byte, tfr.DictionaryRowKeySize())
|
||||||
|
}
|
||||||
|
dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
dictionaryDeltas[string(rowBuf[:dictKeySize])] -= 1
|
||||||
|
}
|
||||||
|
if row.KeySize()+row.ValueSize() > len(rowBuf) {
|
||||||
|
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
|
||||||
|
}
|
||||||
|
keySize, err := row.KeyTo(rowBuf)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
wb.Delete(rowBuf[:keySize])
|
||||||
}
|
}
|
||||||
keySize, err := row.KeyTo(rowBuf)
|
}
|
||||||
if err != nil {
|
|
||||||
return err
|
if 8 > len(rowBuf) {
|
||||||
}
|
rowBuf = make([]byte, 8)
|
||||||
wb.Delete(rowBuf[:keySize])
|
}
|
||||||
|
for dictRowKey, delta := range dictionaryDeltas {
|
||||||
|
binary.LittleEndian.PutUint64(rowBuf, uint64(delta))
|
||||||
|
wb.Merge([]byte(dictRowKey), rowBuf[0:8])
|
||||||
}
|
}
|
||||||
|
|
||||||
PutRowBuffer(rowBuf)
|
PutRowBuffer(rowBuf)
|
||||||
|
@ -395,13 +417,22 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// prepare a list of rows
|
// prepare a list of rows
|
||||||
addRows := make([]UpsideDownCouchRow, 0)
|
var addRowsAll [][]UpsideDownCouchRow
|
||||||
updateRows := make([]UpsideDownCouchRow, 0)
|
var updateRowsAll [][]UpsideDownCouchRow
|
||||||
deleteRows := make([]UpsideDownCouchRow, 0)
|
var deleteRowsAll [][]UpsideDownCouchRow
|
||||||
|
|
||||||
addRows, updateRows, deleteRows = udc.mergeOldAndNew(backIndexRow, result.Rows, addRows, updateRows, deleteRows)
|
addRows, updateRows, deleteRows := udc.mergeOldAndNew(backIndexRow, result.Rows)
|
||||||
|
if len(addRows) > 0 {
|
||||||
|
addRowsAll = append(addRowsAll, addRows)
|
||||||
|
}
|
||||||
|
if len(updateRows) > 0 {
|
||||||
|
updateRowsAll = append(updateRowsAll, updateRows)
|
||||||
|
}
|
||||||
|
if len(deleteRows) > 0 {
|
||||||
|
deleteRowsAll = append(deleteRowsAll, deleteRows)
|
||||||
|
}
|
||||||
|
|
||||||
err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows)
|
err = udc.batchRows(kvwriter, addRowsAll, updateRowsAll, deleteRowsAll)
|
||||||
if err == nil && backIndexRow == nil {
|
if err == nil && backIndexRow == nil {
|
||||||
udc.m.Lock()
|
udc.m.Lock()
|
||||||
udc.docCount++
|
udc.docCount++
|
||||||
|
@ -416,7 +447,11 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []index.IndexRow, addRows, updateRows, deleteRows []UpsideDownCouchRow) ([]UpsideDownCouchRow, []UpsideDownCouchRow, []UpsideDownCouchRow) {
|
func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []index.IndexRow) (addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) {
|
||||||
|
addRows = make([]UpsideDownCouchRow, 0, len(rows))
|
||||||
|
updateRows = make([]UpsideDownCouchRow, 0, len(rows))
|
||||||
|
deleteRows = make([]UpsideDownCouchRow, 0, len(rows))
|
||||||
|
|
||||||
existingTermKeys := make(map[string]bool)
|
existingTermKeys := make(map[string]bool)
|
||||||
for _, key := range backIndexRow.AllTermKeys() {
|
for _, key := range backIndexRow.AllTermKeys() {
|
||||||
existingTermKeys[string(key)] = true
|
existingTermKeys[string(key)] = true
|
||||||
|
@ -570,10 +605,14 @@ func (udc *UpsideDownCouch) Delete(id string) (err error) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
deleteRows := make([]UpsideDownCouchRow, 0)
|
var deleteRowsAll [][]UpsideDownCouchRow
|
||||||
deleteRows = udc.deleteSingle(id, backIndexRow, deleteRows)
|
|
||||||
|
|
||||||
err = udc.batchRows(kvwriter, nil, nil, deleteRows)
|
deleteRows := udc.deleteSingle(id, backIndexRow, nil)
|
||||||
|
if len(deleteRows) > 0 {
|
||||||
|
deleteRowsAll = append(deleteRowsAll, deleteRows)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = udc.batchRows(kvwriter, nil, nil, deleteRowsAll)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
udc.m.Lock()
|
udc.m.Lock()
|
||||||
udc.docCount--
|
udc.docCount--
|
||||||
|
@ -635,20 +674,6 @@ func (udc *UpsideDownCouch) backIndexRowForDoc(kvreader store.KVReader, docID st
|
||||||
return backIndexRow, nil
|
return backIndexRow, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (udc *UpsideDownCouch) backIndexRowsForBatch(kvreader store.KVReader, batch *index.Batch) (map[string]*BackIndexRow, error) {
|
|
||||||
// FIXME faster to order the ids and scan sequentially
|
|
||||||
// for now just get it working
|
|
||||||
rv := make(map[string]*BackIndexRow, 0)
|
|
||||||
for docID := range batch.IndexOps {
|
|
||||||
backIndexRow, err := udc.backIndexRowForDoc(kvreader, docID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
rv[docID] = backIndexRow
|
|
||||||
}
|
|
||||||
return rv, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func decodeFieldType(typ byte, name string, pos []uint64, value []byte) document.Field {
|
func decodeFieldType(typ byte, name string, pos []uint64, value []byte) document.Field {
|
||||||
switch typ {
|
switch typ {
|
||||||
case 't':
|
case 't':
|
||||||
|
@ -710,7 +735,8 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) []
|
||||||
|
|
||||||
func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
||||||
analysisStart := time.Now()
|
analysisStart := time.Now()
|
||||||
resultChan := make(chan *index.AnalysisResult)
|
|
||||||
|
resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
|
||||||
|
|
||||||
var numUpdates uint64
|
var numUpdates uint64
|
||||||
for _, doc := range batch.IndexOps {
|
for _, doc := range batch.IndexOps {
|
||||||
|
@ -740,8 +766,43 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// retrieve back index rows concurrent with analysis
|
||||||
|
docBackIndexRowErr := error(nil)
|
||||||
|
docBackIndexRowCh := make(chan *docBackIndexRow, len(batch.IndexOps))
|
||||||
|
|
||||||
|
udc.writeMutex.Lock()
|
||||||
|
defer udc.writeMutex.Unlock()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(docBackIndexRowCh)
|
||||||
|
|
||||||
|
// open a reader for backindex lookup
|
||||||
|
var kvreader store.KVReader
|
||||||
|
kvreader, err = udc.store.Reader()
|
||||||
|
if err != nil {
|
||||||
|
docBackIndexRowErr = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for docID, doc := range batch.IndexOps {
|
||||||
|
backIndexRow, err := udc.backIndexRowForDoc(kvreader, docID)
|
||||||
|
if err != nil {
|
||||||
|
docBackIndexRowErr = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
docBackIndexRowCh <- &docBackIndexRow{docID, doc, backIndexRow}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = kvreader.Close()
|
||||||
|
if err != nil {
|
||||||
|
docBackIndexRowErr = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// wait for analysis result
|
||||||
newRowsMap := make(map[string][]index.IndexRow)
|
newRowsMap := make(map[string][]index.IndexRow)
|
||||||
// wait for the result
|
|
||||||
var itemsDeQueued uint64
|
var itemsDeQueued uint64
|
||||||
for itemsDeQueued < numUpdates {
|
for itemsDeQueued < numUpdates {
|
||||||
result := <-resultChan
|
result := <-resultChan
|
||||||
|
@ -750,68 +811,22 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
||||||
}
|
}
|
||||||
close(resultChan)
|
close(resultChan)
|
||||||
|
|
||||||
detectedUnsafeMutex.RLock()
|
|
||||||
defer detectedUnsafeMutex.RUnlock()
|
|
||||||
if detectedUnsafe {
|
|
||||||
return UnsafeBatchUseDetected
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart)))
|
atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart)))
|
||||||
|
|
||||||
indexStart := time.Now()
|
|
||||||
|
|
||||||
udc.writeMutex.Lock()
|
|
||||||
defer udc.writeMutex.Unlock()
|
|
||||||
|
|
||||||
// open a reader for backindex lookup
|
|
||||||
var kvreader store.KVReader
|
|
||||||
kvreader, err = udc.store.Reader()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// first lookup all the back index rows
|
|
||||||
var backIndexRows map[string]*BackIndexRow
|
|
||||||
backIndexRows, err = udc.backIndexRowsForBatch(kvreader, batch)
|
|
||||||
if err != nil {
|
|
||||||
_ = kvreader.Close()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err = kvreader.Close()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// start a writer for this batch
|
|
||||||
var kvwriter store.KVWriter
|
|
||||||
kvwriter, err = udc.store.Writer()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// prepare a list of rows
|
|
||||||
addRows := make([]UpsideDownCouchRow, 0)
|
|
||||||
updateRows := make([]UpsideDownCouchRow, 0)
|
|
||||||
deleteRows := make([]UpsideDownCouchRow, 0)
|
|
||||||
|
|
||||||
docsAdded := uint64(0)
|
docsAdded := uint64(0)
|
||||||
docsDeleted := uint64(0)
|
docsDeleted := uint64(0)
|
||||||
for docID, doc := range batch.IndexOps {
|
|
||||||
backIndexRow := backIndexRows[docID]
|
indexStart := time.Now()
|
||||||
if doc == nil && backIndexRow != nil {
|
|
||||||
// delete
|
// prepare a list of rows
|
||||||
deleteRows = udc.deleteSingle(docID, backIndexRow, deleteRows)
|
var addRowsAll [][]UpsideDownCouchRow
|
||||||
docsDeleted++
|
var updateRowsAll [][]UpsideDownCouchRow
|
||||||
} else if doc != nil {
|
var deleteRowsAll [][]UpsideDownCouchRow
|
||||||
addRows, updateRows, deleteRows = udc.mergeOldAndNew(backIndexRow, newRowsMap[docID], addRows, updateRows, deleteRows)
|
|
||||||
if backIndexRow == nil {
|
|
||||||
docsAdded++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// add the internal ops
|
// add the internal ops
|
||||||
|
var updateRows []UpsideDownCouchRow
|
||||||
|
var deleteRows []UpsideDownCouchRow
|
||||||
|
|
||||||
for internalKey, internalValue := range batch.InternalOps {
|
for internalKey, internalValue := range batch.InternalOps {
|
||||||
if internalValue == nil {
|
if internalValue == nil {
|
||||||
// delete
|
// delete
|
||||||
|
@ -823,7 +838,57 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows)
|
if len(updateRows) > 0 {
|
||||||
|
updateRowsAll = append(updateRowsAll, updateRows)
|
||||||
|
}
|
||||||
|
if len(deleteRows) > 0 {
|
||||||
|
deleteRowsAll = append(deleteRowsAll, deleteRows)
|
||||||
|
}
|
||||||
|
|
||||||
|
// process back index rows as they arrive
|
||||||
|
for dbir := range docBackIndexRowCh {
|
||||||
|
if dbir.doc == nil && dbir.backIndexRow != nil {
|
||||||
|
// delete
|
||||||
|
deleteRows := udc.deleteSingle(dbir.docID, dbir.backIndexRow, nil)
|
||||||
|
if len(deleteRows) > 0 {
|
||||||
|
deleteRowsAll = append(deleteRowsAll, deleteRows)
|
||||||
|
}
|
||||||
|
docsDeleted++
|
||||||
|
} else if dbir.doc != nil {
|
||||||
|
addRows, updateRows, deleteRows := udc.mergeOldAndNew(dbir.backIndexRow, newRowsMap[dbir.docID])
|
||||||
|
if len(addRows) > 0 {
|
||||||
|
addRowsAll = append(addRowsAll, addRows)
|
||||||
|
}
|
||||||
|
if len(updateRows) > 0 {
|
||||||
|
updateRowsAll = append(updateRowsAll, updateRows)
|
||||||
|
}
|
||||||
|
if len(deleteRows) > 0 {
|
||||||
|
deleteRowsAll = append(deleteRowsAll, deleteRows)
|
||||||
|
}
|
||||||
|
if dbir.backIndexRow == nil {
|
||||||
|
docsAdded++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if docBackIndexRowErr != nil {
|
||||||
|
return docBackIndexRowErr
|
||||||
|
}
|
||||||
|
|
||||||
|
detectedUnsafeMutex.RLock()
|
||||||
|
defer detectedUnsafeMutex.RUnlock()
|
||||||
|
if detectedUnsafe {
|
||||||
|
return UnsafeBatchUseDetected
|
||||||
|
}
|
||||||
|
|
||||||
|
// start a writer for this batch
|
||||||
|
var kvwriter store.KVWriter
|
||||||
|
kvwriter, err = udc.store.Writer()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = udc.batchRows(kvwriter, addRowsAll, updateRowsAll, deleteRowsAll)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = kvwriter.Close()
|
_ = kvwriter.Close()
|
||||||
atomic.AddUint64(&udc.stats.errors, 1)
|
atomic.AddUint64(&udc.stats.errors, 1)
|
||||||
|
@ -831,6 +896,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err = kvwriter.Close()
|
err = kvwriter.Close()
|
||||||
|
|
||||||
atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
|
atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user