0
0

upside_down batchRows() takes array of arrays

In order to spend less time in append(), this change in upside_down
(similar to another recent performance change in firestorm) builds up
an array of arrays as the eventual input to batchRows().
This commit is contained in:
Steve Yen 2016-01-11 18:11:21 -08:00
parent 7ce7d98cba
commit 0e72b949b3

View File

@ -79,13 +79,12 @@ func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, an
}
func (udc *UpsideDownCouch) init(kvwriter store.KVWriter) (err error) {
// prepare a list of rows
rows := make([]UpsideDownCouchRow, 0)
// version marker
rows = append(rows, NewVersionRow(udc.version))
rowsAll := [][]UpsideDownCouchRow{
[]UpsideDownCouchRow{NewVersionRow(udc.version)},
}
err = udc.batchRows(kvwriter, nil, rows, nil)
err = udc.batchRows(kvwriter, nil, rowsAll, nil)
return
}
@ -142,7 +141,7 @@ func PutRowBuffer(buf []byte) {
rowBufferPool.Put(buf)
}
func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) (err error) {
func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]UpsideDownCouchRow, updateRowsAll [][]UpsideDownCouchRow, deleteRowsAll [][]UpsideDownCouchRow) (err error) {
// prepare batch
wb := writer.NewBatch()
@ -156,73 +155,78 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow
dictionaryDeltas := make(map[string]int64)
// add
for _, row := range addRows {
tfr, ok := row.(*TermFrequencyRow)
if ok {
if tfr.DictionaryRowKeySize() > len(rowBuf) {
rowBuf = make([]byte, tfr.DictionaryRowKeySize())
for _, addRows := range addRowsAll {
for _, row := range addRows {
tfr, ok := row.(*TermFrequencyRow)
if ok {
if tfr.DictionaryRowKeySize() > len(rowBuf) {
rowBuf = make([]byte, tfr.DictionaryRowKeySize())
}
dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf)
if err != nil {
return err
}
dictionaryDeltas[string(rowBuf[:dictKeySize])] += 1
}
dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf)
if row.KeySize()+row.ValueSize() > len(rowBuf) {
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
}
keySize, err := row.KeyTo(rowBuf)
if err != nil {
return err
}
dictionaryDeltas[string(rowBuf[:dictKeySize])] += 1
valSize, err := row.ValueTo(rowBuf[keySize:])
wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize])
}
if row.KeySize()+row.ValueSize() > len(rowBuf) {
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
}
keySize, err := row.KeyTo(rowBuf)
if err != nil {
return err
}
valSize, err := row.ValueTo(rowBuf[keySize:])
wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize])
}
// update
for _, row := range updateRows {
if row.KeySize()+row.ValueSize() > len(rowBuf) {
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
}
keySize, err := row.KeyTo(rowBuf)
if err != nil {
return err
}
valSize, err := row.ValueTo(rowBuf[keySize:])
if err != nil {
return err
}
wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize])
}
// delete
for _, row := range deleteRows {
tfr, ok := row.(*TermFrequencyRow)
if ok {
// need to decrement counter
if tfr.DictionaryRowKeySize() > len(rowBuf) {
rowBuf = make([]byte, tfr.DictionaryRowKeySize())
for _, updateRows := range updateRowsAll {
for _, row := range updateRows {
if row.KeySize()+row.ValueSize() > len(rowBuf) {
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
}
dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf)
keySize, err := row.KeyTo(rowBuf)
if err != nil {
return err
}
dictionaryDeltas[string(rowBuf[:dictKeySize])] -= 1
valSize, err := row.ValueTo(rowBuf[keySize:])
if err != nil {
return err
}
wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize])
}
if row.KeySize()+row.ValueSize() > len(rowBuf) {
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
}
// delete
for _, deleteRows := range deleteRowsAll {
for _, row := range deleteRows {
tfr, ok := row.(*TermFrequencyRow)
if ok {
// need to decrement counter
if tfr.DictionaryRowKeySize() > len(rowBuf) {
rowBuf = make([]byte, tfr.DictionaryRowKeySize())
}
dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf)
if err != nil {
return err
}
dictionaryDeltas[string(rowBuf[:dictKeySize])] -= 1
}
if row.KeySize()+row.ValueSize() > len(rowBuf) {
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
}
keySize, err := row.KeyTo(rowBuf)
if err != nil {
return err
}
wb.Delete(rowBuf[:keySize])
}
keySize, err := row.KeyTo(rowBuf)
if err != nil {
return err
}
wb.Delete(rowBuf[:keySize])
}
if 8 > len(rowBuf) {
rowBuf = make([]byte, 8)
}
for dictRowKey, delta := range dictionaryDeltas {
binary.LittleEndian.PutUint64(rowBuf, uint64(delta))
wb.Merge([]byte(dictRowKey), rowBuf[0:8])
@ -413,13 +417,22 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) {
}()
// prepare a list of rows
addRows := make([]UpsideDownCouchRow, 0)
updateRows := make([]UpsideDownCouchRow, 0)
deleteRows := make([]UpsideDownCouchRow, 0)
var addRowsAll [][]UpsideDownCouchRow
var updateRowsAll [][]UpsideDownCouchRow
var deleteRowsAll [][]UpsideDownCouchRow
addRows, updateRows, deleteRows = udc.mergeOldAndNew(backIndexRow, result.Rows, addRows, updateRows, deleteRows)
addRows, updateRows, deleteRows := udc.mergeOldAndNew(backIndexRow, result.Rows)
if len(addRows) > 0 {
addRowsAll = append(addRowsAll, addRows)
}
if len(updateRows) > 0 {
updateRowsAll = append(updateRowsAll, updateRows)
}
if len(deleteRows) > 0 {
deleteRowsAll = append(deleteRowsAll, deleteRows)
}
err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows)
err = udc.batchRows(kvwriter, addRowsAll, updateRowsAll, deleteRowsAll)
if err == nil && backIndexRow == nil {
udc.m.Lock()
udc.docCount++
@ -434,7 +447,11 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) {
return
}
func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []index.IndexRow, addRows, updateRows, deleteRows []UpsideDownCouchRow) ([]UpsideDownCouchRow, []UpsideDownCouchRow, []UpsideDownCouchRow) {
func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []index.IndexRow) (addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) {
addRows = make([]UpsideDownCouchRow, 0, len(rows))
updateRows = make([]UpsideDownCouchRow, 0, len(rows))
deleteRows = make([]UpsideDownCouchRow, 0, len(rows))
existingTermKeys := make(map[string]bool)
for _, key := range backIndexRow.AllTermKeys() {
existingTermKeys[string(key)] = true
@ -588,10 +605,14 @@ func (udc *UpsideDownCouch) Delete(id string) (err error) {
}
}()
deleteRows := make([]UpsideDownCouchRow, 0)
deleteRows = udc.deleteSingle(id, backIndexRow, deleteRows)
var deleteRowsAll [][]UpsideDownCouchRow
err = udc.batchRows(kvwriter, nil, nil, deleteRows)
deleteRows := udc.deleteSingle(id, backIndexRow, nil)
if len(deleteRows) > 0 {
deleteRowsAll = append(deleteRowsAll, deleteRows)
}
err = udc.batchRows(kvwriter, nil, nil, deleteRowsAll)
if err == nil {
udc.m.Lock()
udc.docCount--
@ -792,17 +813,20 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart)))
// prepare a list of rows
addRows := make([]UpsideDownCouchRow, 0)
updateRows := make([]UpsideDownCouchRow, 0)
deleteRows := make([]UpsideDownCouchRow, 0)
docsAdded := uint64(0)
docsDeleted := uint64(0)
indexStart := time.Now()
// prepare a list of rows
var addRowsAll [][]UpsideDownCouchRow
var updateRowsAll [][]UpsideDownCouchRow
var deleteRowsAll [][]UpsideDownCouchRow
// add the internal ops
var updateRows []UpsideDownCouchRow
var deleteRows []UpsideDownCouchRow
for internalKey, internalValue := range batch.InternalOps {
if internalValue == nil {
// delete
@ -814,13 +838,33 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
}
}
if len(updateRows) > 0 {
updateRowsAll = append(updateRowsAll, updateRows)
}
if len(deleteRows) > 0 {
deleteRowsAll = append(deleteRowsAll, deleteRows)
}
// process back index rows as they arrive
for dbir := range docBackIndexRowCh {
if dbir.doc == nil && dbir.backIndexRow != nil {
// delete
deleteRows = udc.deleteSingle(dbir.docID, dbir.backIndexRow, deleteRows)
deleteRows := udc.deleteSingle(dbir.docID, dbir.backIndexRow, nil)
if len(deleteRows) > 0 {
deleteRowsAll = append(deleteRowsAll, deleteRows)
}
docsDeleted++
} else if dbir.doc != nil {
addRows, updateRows, deleteRows = udc.mergeOldAndNew(dbir.backIndexRow, newRowsMap[dbir.docID], addRows, updateRows, deleteRows)
addRows, updateRows, deleteRows := udc.mergeOldAndNew(dbir.backIndexRow, newRowsMap[dbir.docID])
if len(addRows) > 0 {
addRowsAll = append(addRowsAll, addRows)
}
if len(updateRows) > 0 {
updateRowsAll = append(updateRowsAll, updateRows)
}
if len(deleteRows) > 0 {
deleteRowsAll = append(deleteRowsAll, deleteRows)
}
if dbir.backIndexRow == nil {
docsAdded++
}
@ -844,7 +888,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
return
}
err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows)
err = udc.batchRows(kvwriter, addRowsAll, updateRowsAll, deleteRowsAll)
if err != nil {
_ = kvwriter.Close()
atomic.AddUint64(&udc.stats.errors, 1)