From 6849e538bed6015255ca9f84b36c63b291244180 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 13 Jan 2016 16:48:23 -0800 Subject: [PATCH] upside_down and firestorm use new NewBatchEx() API With this change, the upside_down batchRows() and firestorm batchRows() now use the new KVWriter.NewBatchEx() API, which can improve performance by reducing the number of cgo hops. --- index/firestorm/firestorm.go | 62 +++++++++---- index/upside_down/upside_down.go | 148 ++++++++++++++++++++----------- 2 files changed, 145 insertions(+), 65 deletions(-) diff --git a/index/firestorm/firestorm.go b/index/firestorm/firestorm.go index 73633f0b..303e8983 100644 --- a/index/firestorm/firestorm.go +++ b/index/firestorm/firestorm.go @@ -192,14 +192,17 @@ func (f *Firestorm) Delete(id string) error { func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexRow, deleteKeys [][]byte) (map[string]int64, error) { - // prepare batch - wb := writer.NewBatch() - defer func() { - _ = wb.Close() - }() + dictionaryDeltas := make(map[string]int64) + + // count up bytes needed for buffering. + addNum := 0 + addKeyBytes := 0 + addValBytes := 0 + + deleteNum := 0 + deleteKeyBytes := 0 var kbuf []byte - var vbuf []byte prepareBuf := func(buf []byte, sizeNeeded int) []byte { if cap(buf) < sizeNeeded { @@ -208,8 +211,6 @@ func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexR return buf[0:sizeNeeded] } - dictionaryDeltas := make(map[string]int64) - for _, rows := range rowsOfRows { for _, row := range rows { tfr, ok := row.(*TermFreqRow) @@ -225,28 +226,59 @@ func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexR } } - kbuf = prepareBuf(kbuf, row.KeySize()) - klen, err := row.KeyTo(kbuf) + addKeyBytes += row.KeySize() + addValBytes += row.ValueSize() + } + addNum += len(rows) + } + + for _, dk := range deleteKeys { + deleteKeyBytes += len(dk) + } + deleteNum += len(deleteKeys) + + // prepare batch + totBytes := addKeyBytes + addValBytes + deleteKeyBytes + + buf, wb, err := writer.NewBatchEx(store.KVBatchOptions{ + TotalBytes: totBytes, + NumSets: addNum, + NumDeletes: deleteNum, + NumMerges: 0, + }) + if err != nil { + return nil, err + } + defer func() { + _ = wb.Close() + }() + + for _, rows := range rowsOfRows { + for _, row := range rows { + klen, err := row.KeyTo(buf) if err != nil { return nil, err } - vbuf = prepareBuf(vbuf, row.ValueSize()) - vlen, err := row.ValueTo(vbuf) + vlen, err := row.ValueTo(buf[klen:]) if err != nil { return nil, err } - wb.Set(kbuf[0:klen], vbuf[0:vlen]) + wb.Set(buf[0:klen], buf[klen:klen+vlen]) + + buf = buf[klen+vlen:] } } for _, dk := range deleteKeys { - wb.Delete(dk) + dklen := copy(buf, dk) + wb.Delete(buf[0:dklen]) + buf = buf[dklen:] } // write out the batch - err := writer.ExecuteBatch(wb) + err = writer.ExecuteBatch(wb) if err != nil { return nil, err } diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index c24256b5..657d9eba 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -142,19 +142,22 @@ func PutRowBuffer(buf []byte) { } func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]UpsideDownCouchRow, updateRowsAll [][]UpsideDownCouchRow, deleteRowsAll [][]UpsideDownCouchRow) (err error) { - - // prepare batch - wb := writer.NewBatch() - defer func() { - _ = wb.Close() - }() - - // buffer to work with - rowBuf := GetRowBuffer() - dictionaryDeltas := make(map[string]int64) - // add + // count up bytes needed for buffering. + addNum := 0 + addKeyBytes := 0 + addValBytes := 0 + + updateNum := 0 + updateKeyBytes := 0 + updateValBytes := 0 + + deleteNum := 0 + deleteKeyBytes := 0 + + rowBuf := GetRowBuffer() + for _, addRows := range addRowsAll { for _, row := range addRows { tfr, ok := row.(*TermFrequencyRow) @@ -168,37 +171,20 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]Upsi } dictionaryDeltas[string(rowBuf[:dictKeySize])] += 1 } - if row.KeySize()+row.ValueSize() > len(rowBuf) { - rowBuf = make([]byte, row.KeySize()+row.ValueSize()) - } - keySize, err := row.KeyTo(rowBuf) - if err != nil { - return err - } - valSize, err := row.ValueTo(rowBuf[keySize:]) - wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize]) + addKeyBytes += row.KeySize() + addValBytes += row.ValueSize() } + addNum += len(addRows) } - // update for _, updateRows := range updateRowsAll { for _, row := range updateRows { - if row.KeySize()+row.ValueSize() > len(rowBuf) { - rowBuf = make([]byte, row.KeySize()+row.ValueSize()) - } - keySize, err := row.KeyTo(rowBuf) - if err != nil { - return err - } - valSize, err := row.ValueTo(rowBuf[keySize:]) - if err != nil { - return err - } - wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize]) + updateKeyBytes += row.KeySize() + updateValBytes += row.ValueSize() } + updateNum += len(updateRows) } - // delete for _, deleteRows := range deleteRowsAll { for _, row := range deleteRows { tfr, ok := row.(*TermFrequencyRow) @@ -213,27 +199,89 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]Upsi } dictionaryDeltas[string(rowBuf[:dictKeySize])] -= 1 } - if row.KeySize()+row.ValueSize() > len(rowBuf) { - rowBuf = make([]byte, row.KeySize()+row.ValueSize()) - } - keySize, err := row.KeyTo(rowBuf) - if err != nil { - return err - } - wb.Delete(rowBuf[:keySize]) + deleteKeyBytes += row.KeySize() } - } - - if 8 > len(rowBuf) { - rowBuf = make([]byte, 8) - } - for dictRowKey, delta := range dictionaryDeltas { - binary.LittleEndian.PutUint64(rowBuf, uint64(delta)) - wb.Merge([]byte(dictRowKey), rowBuf[0:8]) + deleteNum += len(deleteRows) } PutRowBuffer(rowBuf) + mergeNum := len(dictionaryDeltas) + mergeKeyBytes := 0 + mergeValBytes := mergeNum * 8 + + for dictRowKey, _ := range dictionaryDeltas { + mergeKeyBytes += len(dictRowKey) + } + + // prepare batch + totBytes := addKeyBytes + addValBytes + + updateKeyBytes + updateValBytes + + deleteKeyBytes + + mergeKeyBytes + mergeValBytes + + buf, wb, err := writer.NewBatchEx(store.KVBatchOptions{ + TotalBytes: totBytes, + NumSets: addNum + updateNum, + NumDeletes: deleteNum, + NumMerges: mergeNum, + }) + if err != nil { + return err + } + defer func() { + _ = wb.Close() + }() + + // fill the batch + for _, addRows := range addRowsAll { + for _, row := range addRows { + keySize, err := row.KeyTo(buf) + if err != nil { + return err + } + valSize, err := row.ValueTo(buf[keySize:]) + if err != nil { + return err + } + wb.Set(buf[:keySize], buf[keySize:keySize+valSize]) + buf = buf[keySize+valSize:] + } + } + + for _, updateRows := range updateRowsAll { + for _, row := range updateRows { + keySize, err := row.KeyTo(buf) + if err != nil { + return err + } + valSize, err := row.ValueTo(buf[keySize:]) + if err != nil { + return err + } + wb.Set(buf[:keySize], buf[keySize:keySize+valSize]) + buf = buf[keySize+valSize:] + } + } + + for _, deleteRows := range deleteRowsAll { + for _, row := range deleteRows { + keySize, err := row.KeyTo(buf) + if err != nil { + return err + } + wb.Delete(buf[:keySize]) + buf = buf[keySize:] + } + } + + for dictRowKey, delta := range dictionaryDeltas { + dictRowKeyLen := copy(buf, dictRowKey) + binary.LittleEndian.PutUint64(buf[dictRowKeyLen:], uint64(delta)) + wb.Merge(buf[:dictRowKeyLen], buf[dictRowKeyLen:dictRowKeyLen+8]) + buf = buf[dictRowKeyLen+8:] + } + // write out the batch return writer.ExecuteBatch(wb) }