diff --git a/index/firestorm/firestorm.go b/index/firestorm/firestorm.go index 662646ca..c76b445f 100644 --- a/index/firestorm/firestorm.go +++ b/index/firestorm/firestorm.go @@ -167,7 +167,7 @@ func (f *Firestorm) Update(doc *document.Document) (err error) { }() var dictionaryDeltas map[string]int64 - dictionaryDeltas, err = f.batchRows(kvwriter, result.Rows, nil) + dictionaryDeltas, err = f.batchRows(kvwriter, [][]index.IndexRow{result.Rows}, nil) if err != nil { _ = kvwriter.Close() atomic.AddUint64(&f.stats.errors, 1) @@ -190,7 +190,7 @@ func (f *Firestorm) Delete(id string) error { return nil } -func (f *Firestorm) batchRows(writer store.KVWriter, rows []index.IndexRow, deleteKeys [][]byte) (map[string]int64, error) { +func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexRow, deleteKeys [][]byte) (map[string]int64, error) { // prepare batch wb := writer.NewBatch() @@ -206,33 +206,36 @@ func (f *Firestorm) batchRows(writer store.KVWriter, rows []index.IndexRow, dele } dictionaryDeltas := make(map[string]int64) - for _, row := range rows { - tfr, ok := row.(*TermFreqRow) - if ok { - if tfr.Field() != 0 { - kbuf = prepareBuf(kbuf, tfr.DictionaryRowKeySize()) - klen, err := tfr.DictionaryRowKeyTo(kbuf) - if err != nil { - return nil, err + + for _, rows := range rowsOfRows { + for _, row := range rows { + tfr, ok := row.(*TermFreqRow) + if ok { + if tfr.Field() != 0 { + kbuf = prepareBuf(kbuf, tfr.DictionaryRowKeySize()) + klen, err := tfr.DictionaryRowKeyTo(kbuf) + if err != nil { + return nil, err + } + + dictionaryDeltas[string(kbuf[0:klen])] += 1 } - - dictionaryDeltas[string(kbuf[0:klen])] += 1 } - } - kbuf = prepareBuf(kbuf, row.KeySize()) - klen, err := row.KeyTo(kbuf) - if err != nil { - return nil, err - } + kbuf = prepareBuf(kbuf, row.KeySize()) + klen, err := row.KeyTo(kbuf) + if err != nil { + return nil, err + } - vbuf = prepareBuf(vbuf, row.ValueSize()) - vlen, err := row.ValueTo(vbuf) - if err != nil { - return nil, err - } + vbuf = prepareBuf(vbuf, row.ValueSize()) + vlen, err := row.ValueTo(vbuf) + if err != nil { + return nil, err + } - wb.Set(kbuf[0:klen], vbuf[0:vlen]) + wb.Set(kbuf[0:klen], vbuf[0:vlen]) + } } for _, dk := range deleteKeys { @@ -291,12 +294,14 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { } }() - allRows := make([]index.IndexRow, 0, 1000) + // extra 1 capacity for internal updates. + collectRows := make([][]index.IndexRow, 0, docsUpdated+1) + // wait for the result var itemsDeQueued uint64 for itemsDeQueued < docsUpdated { result := <-resultChan - allRows = append(allRows, result.Rows...) + collectRows = append(collectRows, result.Rows) itemsDeQueued++ } close(resultChan) @@ -309,17 +314,21 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { atomic.AddUint64(&f.stats.analysisTime, uint64(time.Since(analysisStart))) - deleteKeys := make([][]byte, 0) - // add the internal ops - for internalKey, internalValue := range batch.InternalOps { - if internalValue == nil { - // delete - deleteInternalRow := NewInternalRow([]byte(internalKey), nil) - deleteKeys = append(deleteKeys, deleteInternalRow.Key()) - } else { - updateInternalRow := NewInternalRow([]byte(internalKey), internalValue) - allRows = append(allRows, updateInternalRow) + var deleteKeys [][]byte + if len(batch.InternalOps) > 0 { + // add the internal ops + updateInternalRows := make([]index.IndexRow, 0, len(batch.InternalOps)) + for internalKey, internalValue := range batch.InternalOps { + if internalValue == nil { + // delete + deleteInternalRow := NewInternalRow([]byte(internalKey), nil) + deleteKeys = append(deleteKeys, deleteInternalRow.Key()) + } else { + updateInternalRow := NewInternalRow([]byte(internalKey), internalValue) + updateInternalRows = append(updateInternalRows, updateInternalRow) + } } + collectRows = append(collectRows, updateInternalRows) } inflightItems := make([]*InFlightItem, 0, len(batch.IndexOps)) @@ -342,7 +351,7 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) { } var dictionaryDeltas map[string]int64 - dictionaryDeltas, err = f.batchRows(kvwriter, allRows, deleteKeys) + dictionaryDeltas, err = f.batchRows(kvwriter, collectRows, deleteKeys) if err != nil { _ = kvwriter.Close() atomic.AddUint64(&f.stats.errors, 1)