diff --git a/index/firestorm/firestorm.go b/index/firestorm/firestorm.go index 7726f1ad..73633f0b 100644 --- a/index/firestorm/firestorm.go +++ b/index/firestorm/firestorm.go @@ -346,6 +346,7 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { } indexStart := time.Now() + // start a writer for this batch var kvwriter store.KVWriter kvwriter, err = f.store.Writer() @@ -362,10 +363,12 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { } f.compensator.MutateBatch(inflightItems, lastDocNumber) + + err = kvwriter.Close() + f.lookuper.NotifyBatch(inflightItems) f.dictUpdater.NotifyBatch(dictionaryDeltas) - err = kvwriter.Close() atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart))) if err == nil { diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index 549da868..ed5a3d6c 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -10,6 +10,7 @@ package upside_down import ( + "encoding/binary" "encoding/json" "fmt" "math" @@ -60,6 +61,12 @@ type UpsideDownCouch struct { writeMutex sync.Mutex } +type docBackIndexRow struct { + docID string + doc *document.Document // If deletion, doc will be nil. + backIndexRow *BackIndexRow +} + func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) { return &UpsideDownCouch{ version: Version, @@ -72,13 +79,12 @@ func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, an } func (udc *UpsideDownCouch) init(kvwriter store.KVWriter) (err error) { - // prepare a list of rows - rows := make([]UpsideDownCouchRow, 0) - // version marker - rows = append(rows, NewVersionRow(udc.version)) + rowsAll := [][]UpsideDownCouchRow{ + []UpsideDownCouchRow{NewVersionRow(udc.version)}, + } - err = udc.batchRows(kvwriter, nil, rows, nil) + err = udc.batchRows(kvwriter, nil, rowsAll, nil) return } @@ -135,7 +141,7 @@ func PutRowBuffer(buf []byte) { rowBufferPool.Put(buf) } -func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) (err error) { +func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]UpsideDownCouchRow, updateRowsAll [][]UpsideDownCouchRow, deleteRowsAll [][]UpsideDownCouchRow) (err error) { // prepare batch wb := writer.NewBatch() @@ -146,68 +152,84 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow // buffer to work with rowBuf := GetRowBuffer() + dictionaryDeltas := make(map[string]int64) + // add - for _, row := range addRows { - tfr, ok := row.(*TermFrequencyRow) - if ok { - if tfr.DictionaryRowKeySize() > len(rowBuf) { - rowBuf = make([]byte, tfr.DictionaryRowKeySize()) + for _, addRows := range addRowsAll { + for _, row := range addRows { + tfr, ok := row.(*TermFrequencyRow) + if ok { + if tfr.DictionaryRowKeySize() > len(rowBuf) { + rowBuf = make([]byte, tfr.DictionaryRowKeySize()) + } + dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf) + if err != nil { + return err + } + dictionaryDeltas[string(rowBuf[:dictKeySize])] += 1 } - dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf) + if row.KeySize()+row.ValueSize() > len(rowBuf) { + rowBuf = make([]byte, row.KeySize()+row.ValueSize()) + } + keySize, err := row.KeyTo(rowBuf) if err != nil { return err } - wb.Merge(rowBuf[:dictKeySize], dictionaryTermIncr) + valSize, err := row.ValueTo(rowBuf[keySize:]) + wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize]) } - if row.KeySize()+row.ValueSize() > len(rowBuf) { - rowBuf = make([]byte, row.KeySize()+row.ValueSize()) - } - keySize, err := row.KeyTo(rowBuf) - if err != nil { - return err - } - valSize, err := row.ValueTo(rowBuf[keySize:]) - wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize]) } // update - for _, row := range updateRows { - if row.KeySize()+row.ValueSize() > len(rowBuf) { - rowBuf = make([]byte, row.KeySize()+row.ValueSize()) - } - keySize, err := row.KeyTo(rowBuf) - if err != nil { - return err - } - valSize, err := row.ValueTo(rowBuf[keySize:]) - if err != nil { - return err - } - wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize]) - } - - // delete - for _, row := range deleteRows { - tfr, ok := row.(*TermFrequencyRow) - if ok { - // need to decrement counter - if tfr.DictionaryRowKeySize() > len(rowBuf) { - rowBuf = make([]byte, tfr.DictionaryRowKeySize()) + for _, updateRows := range updateRowsAll { + for _, row := range updateRows { + if row.KeySize()+row.ValueSize() > len(rowBuf) { + rowBuf = make([]byte, row.KeySize()+row.ValueSize()) } - dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf) + keySize, err := row.KeyTo(rowBuf) if err != nil { return err } - wb.Merge(rowBuf[:dictKeySize], dictionaryTermDecr) + valSize, err := row.ValueTo(rowBuf[keySize:]) + if err != nil { + return err + } + wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize]) } - if row.KeySize()+row.ValueSize() > len(rowBuf) { - rowBuf = make([]byte, row.KeySize()+row.ValueSize()) + } + + // delete + for _, deleteRows := range deleteRowsAll { + for _, row := range deleteRows { + tfr, ok := row.(*TermFrequencyRow) + if ok { + // need to decrement counter + if tfr.DictionaryRowKeySize() > len(rowBuf) { + rowBuf = make([]byte, tfr.DictionaryRowKeySize()) + } + dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf) + if err != nil { + return err + } + dictionaryDeltas[string(rowBuf[:dictKeySize])] -= 1 + } + if row.KeySize()+row.ValueSize() > len(rowBuf) { + rowBuf = make([]byte, row.KeySize()+row.ValueSize()) + } + keySize, err := row.KeyTo(rowBuf) + if err != nil { + return err + } + wb.Delete(rowBuf[:keySize]) } - keySize, err := row.KeyTo(rowBuf) - if err != nil { - return err - } - wb.Delete(rowBuf[:keySize]) + } + + if 8 > len(rowBuf) { + rowBuf = make([]byte, 8) + } + for dictRowKey, delta := range dictionaryDeltas { + binary.LittleEndian.PutUint64(rowBuf, uint64(delta)) + wb.Merge([]byte(dictRowKey), rowBuf[0:8]) } PutRowBuffer(rowBuf) @@ -395,13 +417,22 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) { }() // prepare a list of rows - addRows := make([]UpsideDownCouchRow, 0) - updateRows := make([]UpsideDownCouchRow, 0) - deleteRows := make([]UpsideDownCouchRow, 0) + var addRowsAll [][]UpsideDownCouchRow + var updateRowsAll [][]UpsideDownCouchRow + var deleteRowsAll [][]UpsideDownCouchRow - addRows, updateRows, deleteRows = udc.mergeOldAndNew(backIndexRow, result.Rows, addRows, updateRows, deleteRows) + addRows, updateRows, deleteRows := udc.mergeOldAndNew(backIndexRow, result.Rows) + if len(addRows) > 0 { + addRowsAll = append(addRowsAll, addRows) + } + if len(updateRows) > 0 { + updateRowsAll = append(updateRowsAll, updateRows) + } + if len(deleteRows) > 0 { + deleteRowsAll = append(deleteRowsAll, deleteRows) + } - err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows) + err = udc.batchRows(kvwriter, addRowsAll, updateRowsAll, deleteRowsAll) if err == nil && backIndexRow == nil { udc.m.Lock() udc.docCount++ @@ -416,7 +447,11 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) { return } -func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []index.IndexRow, addRows, updateRows, deleteRows []UpsideDownCouchRow) ([]UpsideDownCouchRow, []UpsideDownCouchRow, []UpsideDownCouchRow) { +func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []index.IndexRow) (addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) { + addRows = make([]UpsideDownCouchRow, 0, len(rows)) + updateRows = make([]UpsideDownCouchRow, 0, len(rows)) + deleteRows = make([]UpsideDownCouchRow, 0, len(rows)) + existingTermKeys := make(map[string]bool) for _, key := range backIndexRow.AllTermKeys() { existingTermKeys[string(key)] = true @@ -570,10 +605,14 @@ func (udc *UpsideDownCouch) Delete(id string) (err error) { } }() - deleteRows := make([]UpsideDownCouchRow, 0) - deleteRows = udc.deleteSingle(id, backIndexRow, deleteRows) + var deleteRowsAll [][]UpsideDownCouchRow - err = udc.batchRows(kvwriter, nil, nil, deleteRows) + deleteRows := udc.deleteSingle(id, backIndexRow, nil) + if len(deleteRows) > 0 { + deleteRowsAll = append(deleteRowsAll, deleteRows) + } + + err = udc.batchRows(kvwriter, nil, nil, deleteRowsAll) if err == nil { udc.m.Lock() udc.docCount-- @@ -635,20 +674,6 @@ func (udc *UpsideDownCouch) backIndexRowForDoc(kvreader store.KVReader, docID st return backIndexRow, nil } -func (udc *UpsideDownCouch) backIndexRowsForBatch(kvreader store.KVReader, batch *index.Batch) (map[string]*BackIndexRow, error) { - // FIXME faster to order the ids and scan sequentially - // for now just get it working - rv := make(map[string]*BackIndexRow, 0) - for docID := range batch.IndexOps { - backIndexRow, err := udc.backIndexRowForDoc(kvreader, docID) - if err != nil { - return nil, err - } - rv[docID] = backIndexRow - } - return rv, nil -} - func decodeFieldType(typ byte, name string, pos []uint64, value []byte) document.Field { switch typ { case 't': @@ -710,7 +735,8 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) [] func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { analysisStart := time.Now() - resultChan := make(chan *index.AnalysisResult) + + resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps)) var numUpdates uint64 for _, doc := range batch.IndexOps { @@ -740,8 +766,43 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } }() + // retrieve back index rows concurrent with analysis + docBackIndexRowErr := error(nil) + docBackIndexRowCh := make(chan *docBackIndexRow, len(batch.IndexOps)) + + udc.writeMutex.Lock() + defer udc.writeMutex.Unlock() + + go func() { + defer close(docBackIndexRowCh) + + // open a reader for backindex lookup + var kvreader store.KVReader + kvreader, err = udc.store.Reader() + if err != nil { + docBackIndexRowErr = err + return + } + + for docID, doc := range batch.IndexOps { + backIndexRow, err := udc.backIndexRowForDoc(kvreader, docID) + if err != nil { + docBackIndexRowErr = err + return + } + + docBackIndexRowCh <- &docBackIndexRow{docID, doc, backIndexRow} + } + + err = kvreader.Close() + if err != nil { + docBackIndexRowErr = err + return + } + }() + + // wait for analysis result newRowsMap := make(map[string][]index.IndexRow) - // wait for the result var itemsDeQueued uint64 for itemsDeQueued < numUpdates { result := <-resultChan @@ -750,68 +811,22 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } close(resultChan) - detectedUnsafeMutex.RLock() - defer detectedUnsafeMutex.RUnlock() - if detectedUnsafe { - return UnsafeBatchUseDetected - } - atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart))) - indexStart := time.Now() - - udc.writeMutex.Lock() - defer udc.writeMutex.Unlock() - - // open a reader for backindex lookup - var kvreader store.KVReader - kvreader, err = udc.store.Reader() - if err != nil { - return - } - - // first lookup all the back index rows - var backIndexRows map[string]*BackIndexRow - backIndexRows, err = udc.backIndexRowsForBatch(kvreader, batch) - if err != nil { - _ = kvreader.Close() - return - } - - err = kvreader.Close() - if err != nil { - return - } - - // start a writer for this batch - var kvwriter store.KVWriter - kvwriter, err = udc.store.Writer() - if err != nil { - return - } - - // prepare a list of rows - addRows := make([]UpsideDownCouchRow, 0) - updateRows := make([]UpsideDownCouchRow, 0) - deleteRows := make([]UpsideDownCouchRow, 0) - docsAdded := uint64(0) docsDeleted := uint64(0) - for docID, doc := range batch.IndexOps { - backIndexRow := backIndexRows[docID] - if doc == nil && backIndexRow != nil { - // delete - deleteRows = udc.deleteSingle(docID, backIndexRow, deleteRows) - docsDeleted++ - } else if doc != nil { - addRows, updateRows, deleteRows = udc.mergeOldAndNew(backIndexRow, newRowsMap[docID], addRows, updateRows, deleteRows) - if backIndexRow == nil { - docsAdded++ - } - } - } + + indexStart := time.Now() + + // prepare a list of rows + var addRowsAll [][]UpsideDownCouchRow + var updateRowsAll [][]UpsideDownCouchRow + var deleteRowsAll [][]UpsideDownCouchRow // add the internal ops + var updateRows []UpsideDownCouchRow + var deleteRows []UpsideDownCouchRow + for internalKey, internalValue := range batch.InternalOps { if internalValue == nil { // delete @@ -823,7 +838,57 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } } - err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows) + if len(updateRows) > 0 { + updateRowsAll = append(updateRowsAll, updateRows) + } + if len(deleteRows) > 0 { + deleteRowsAll = append(deleteRowsAll, deleteRows) + } + + // process back index rows as they arrive + for dbir := range docBackIndexRowCh { + if dbir.doc == nil && dbir.backIndexRow != nil { + // delete + deleteRows := udc.deleteSingle(dbir.docID, dbir.backIndexRow, nil) + if len(deleteRows) > 0 { + deleteRowsAll = append(deleteRowsAll, deleteRows) + } + docsDeleted++ + } else if dbir.doc != nil { + addRows, updateRows, deleteRows := udc.mergeOldAndNew(dbir.backIndexRow, newRowsMap[dbir.docID]) + if len(addRows) > 0 { + addRowsAll = append(addRowsAll, addRows) + } + if len(updateRows) > 0 { + updateRowsAll = append(updateRowsAll, updateRows) + } + if len(deleteRows) > 0 { + deleteRowsAll = append(deleteRowsAll, deleteRows) + } + if dbir.backIndexRow == nil { + docsAdded++ + } + } + } + + if docBackIndexRowErr != nil { + return docBackIndexRowErr + } + + detectedUnsafeMutex.RLock() + defer detectedUnsafeMutex.RUnlock() + if detectedUnsafe { + return UnsafeBatchUseDetected + } + + // start a writer for this batch + var kvwriter store.KVWriter + kvwriter, err = udc.store.Writer() + if err != nil { + return + } + + err = udc.batchRows(kvwriter, addRowsAll, updateRowsAll, deleteRowsAll) if err != nil { _ = kvwriter.Close() atomic.AddUint64(&udc.stats.errors, 1) @@ -831,6 +896,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } err = kvwriter.Close() + atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart))) if err == nil {