0
0

upside_down and firestorm use new NewBatchEx() API

With this change, the upside_down batchRows() and firestorm
batchRows() now use the new KVWriter.NewBatchEx() API, which can
improve performance by reducing the number of cgo hops.
This commit is contained in:
Steve Yen 2016-01-13 16:48:23 -08:00
parent d94ccf2d74
commit 6849e538be
2 changed files with 145 additions and 65 deletions

View File

@ -192,14 +192,17 @@ func (f *Firestorm) Delete(id string) error {
func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexRow, deleteKeys [][]byte) (map[string]int64, error) {
// prepare batch
wb := writer.NewBatch()
defer func() {
_ = wb.Close()
}()
dictionaryDeltas := make(map[string]int64)
// count up bytes needed for buffering.
addNum := 0
addKeyBytes := 0
addValBytes := 0
deleteNum := 0
deleteKeyBytes := 0
var kbuf []byte
var vbuf []byte
prepareBuf := func(buf []byte, sizeNeeded int) []byte {
if cap(buf) < sizeNeeded {
@ -208,8 +211,6 @@ func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexR
return buf[0:sizeNeeded]
}
dictionaryDeltas := make(map[string]int64)
for _, rows := range rowsOfRows {
for _, row := range rows {
tfr, ok := row.(*TermFreqRow)
@ -225,28 +226,59 @@ func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexR
}
}
kbuf = prepareBuf(kbuf, row.KeySize())
klen, err := row.KeyTo(kbuf)
addKeyBytes += row.KeySize()
addValBytes += row.ValueSize()
}
addNum += len(rows)
}
for _, dk := range deleteKeys {
deleteKeyBytes += len(dk)
}
deleteNum += len(deleteKeys)
// prepare batch
totBytes := addKeyBytes + addValBytes + deleteKeyBytes
buf, wb, err := writer.NewBatchEx(store.KVBatchOptions{
TotalBytes: totBytes,
NumSets: addNum,
NumDeletes: deleteNum,
NumMerges: 0,
})
if err != nil {
return nil, err
}
defer func() {
_ = wb.Close()
}()
for _, rows := range rowsOfRows {
for _, row := range rows {
klen, err := row.KeyTo(buf)
if err != nil {
return nil, err
}
vbuf = prepareBuf(vbuf, row.ValueSize())
vlen, err := row.ValueTo(vbuf)
vlen, err := row.ValueTo(buf[klen:])
if err != nil {
return nil, err
}
wb.Set(kbuf[0:klen], vbuf[0:vlen])
wb.Set(buf[0:klen], buf[klen:klen+vlen])
buf = buf[klen+vlen:]
}
}
for _, dk := range deleteKeys {
wb.Delete(dk)
dklen := copy(buf, dk)
wb.Delete(buf[0:dklen])
buf = buf[dklen:]
}
// write out the batch
err := writer.ExecuteBatch(wb)
err = writer.ExecuteBatch(wb)
if err != nil {
return nil, err
}

View File

@ -142,19 +142,22 @@ func PutRowBuffer(buf []byte) {
}
func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]UpsideDownCouchRow, updateRowsAll [][]UpsideDownCouchRow, deleteRowsAll [][]UpsideDownCouchRow) (err error) {
// prepare batch
wb := writer.NewBatch()
defer func() {
_ = wb.Close()
}()
// buffer to work with
rowBuf := GetRowBuffer()
dictionaryDeltas := make(map[string]int64)
// add
// count up bytes needed for buffering.
addNum := 0
addKeyBytes := 0
addValBytes := 0
updateNum := 0
updateKeyBytes := 0
updateValBytes := 0
deleteNum := 0
deleteKeyBytes := 0
rowBuf := GetRowBuffer()
for _, addRows := range addRowsAll {
for _, row := range addRows {
tfr, ok := row.(*TermFrequencyRow)
@ -168,37 +171,20 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]Upsi
}
dictionaryDeltas[string(rowBuf[:dictKeySize])] += 1
}
if row.KeySize()+row.ValueSize() > len(rowBuf) {
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
}
keySize, err := row.KeyTo(rowBuf)
if err != nil {
return err
}
valSize, err := row.ValueTo(rowBuf[keySize:])
wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize])
addKeyBytes += row.KeySize()
addValBytes += row.ValueSize()
}
addNum += len(addRows)
}
// update
for _, updateRows := range updateRowsAll {
for _, row := range updateRows {
if row.KeySize()+row.ValueSize() > len(rowBuf) {
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
}
keySize, err := row.KeyTo(rowBuf)
if err != nil {
return err
}
valSize, err := row.ValueTo(rowBuf[keySize:])
if err != nil {
return err
}
wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize])
updateKeyBytes += row.KeySize()
updateValBytes += row.ValueSize()
}
updateNum += len(updateRows)
}
// delete
for _, deleteRows := range deleteRowsAll {
for _, row := range deleteRows {
tfr, ok := row.(*TermFrequencyRow)
@ -213,27 +199,89 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]Upsi
}
dictionaryDeltas[string(rowBuf[:dictKeySize])] -= 1
}
if row.KeySize()+row.ValueSize() > len(rowBuf) {
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
}
keySize, err := row.KeyTo(rowBuf)
if err != nil {
return err
}
wb.Delete(rowBuf[:keySize])
deleteKeyBytes += row.KeySize()
}
}
if 8 > len(rowBuf) {
rowBuf = make([]byte, 8)
}
for dictRowKey, delta := range dictionaryDeltas {
binary.LittleEndian.PutUint64(rowBuf, uint64(delta))
wb.Merge([]byte(dictRowKey), rowBuf[0:8])
deleteNum += len(deleteRows)
}
PutRowBuffer(rowBuf)
mergeNum := len(dictionaryDeltas)
mergeKeyBytes := 0
mergeValBytes := mergeNum * 8
for dictRowKey, _ := range dictionaryDeltas {
mergeKeyBytes += len(dictRowKey)
}
// prepare batch
totBytes := addKeyBytes + addValBytes +
updateKeyBytes + updateValBytes +
deleteKeyBytes +
mergeKeyBytes + mergeValBytes
buf, wb, err := writer.NewBatchEx(store.KVBatchOptions{
TotalBytes: totBytes,
NumSets: addNum + updateNum,
NumDeletes: deleteNum,
NumMerges: mergeNum,
})
if err != nil {
return err
}
defer func() {
_ = wb.Close()
}()
// fill the batch
for _, addRows := range addRowsAll {
for _, row := range addRows {
keySize, err := row.KeyTo(buf)
if err != nil {
return err
}
valSize, err := row.ValueTo(buf[keySize:])
if err != nil {
return err
}
wb.Set(buf[:keySize], buf[keySize:keySize+valSize])
buf = buf[keySize+valSize:]
}
}
for _, updateRows := range updateRowsAll {
for _, row := range updateRows {
keySize, err := row.KeyTo(buf)
if err != nil {
return err
}
valSize, err := row.ValueTo(buf[keySize:])
if err != nil {
return err
}
wb.Set(buf[:keySize], buf[keySize:keySize+valSize])
buf = buf[keySize+valSize:]
}
}
for _, deleteRows := range deleteRowsAll {
for _, row := range deleteRows {
keySize, err := row.KeyTo(buf)
if err != nil {
return err
}
wb.Delete(buf[:keySize])
buf = buf[keySize:]
}
}
for dictRowKey, delta := range dictionaryDeltas {
dictRowKeyLen := copy(buf, dictRowKey)
binary.LittleEndian.PutUint64(buf[dictRowKeyLen:], uint64(delta))
wb.Merge(buf[:dictRowKeyLen], buf[dictRowKeyLen:dictRowKeyLen+8])
buf = buf[dictRowKeyLen+8:]
}
// write out the batch
return writer.ExecuteBatch(wb)
}