0
0

Merge pull request #322 from steveyen/WIP-perf-20160113

KVReader.MultiGet and KVWriter.NewBatchEx API's
This commit is contained in:
Marty Schoch 2016-01-15 14:28:59 -05:00
commit 1335eb2a7b
17 changed files with 277 additions and 80 deletions

View File

@ -65,15 +65,17 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult {
analyzeField(field, true)
}
for fieldIndex, tokenFreqs := range fieldTermFreqs {
// see if any of the composite fields need this
for _, compositeField := range d.CompositeFields {
compositeField.Compose(fieldNames[fieldIndex], fieldLengths[fieldIndex], tokenFreqs)
if len(d.CompositeFields) > 0 {
for fieldIndex, tokenFreqs := range fieldTermFreqs {
// see if any of the composite fields need this
for _, compositeField := range d.CompositeFields {
compositeField.Compose(fieldNames[fieldIndex], fieldLengths[fieldIndex], tokenFreqs)
}
}
}
for _, compositeField := range d.CompositeFields {
analyzeField(compositeField, false)
for _, compositeField := range d.CompositeFields {
analyzeField(compositeField, false)
}
}
rowsCapNeeded := len(rv.Rows)

View File

@ -74,7 +74,7 @@ func TestAnalysis(t *testing.T) {
{
d: document.NewDocument("a").
AddField(
document.NewTextFieldWithIndexingOptions("name", nil, []byte("test"), document.IndexField|document.StoreField|document.IncludeTermVectors)),
document.NewTextFieldWithIndexingOptions("name", nil, []byte("test"), document.IndexField|document.StoreField|document.IncludeTermVectors)),
r: &index.AnalysisResult{
DocID: "a",
Rows: []index.IndexRow{

View File

@ -192,14 +192,17 @@ func (f *Firestorm) Delete(id string) error {
func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexRow, deleteKeys [][]byte) (map[string]int64, error) {
// prepare batch
wb := writer.NewBatch()
defer func() {
_ = wb.Close()
}()
dictionaryDeltas := make(map[string]int64)
// count up bytes needed for buffering.
addNum := 0
addKeyBytes := 0
addValBytes := 0
deleteNum := 0
deleteKeyBytes := 0
var kbuf []byte
var vbuf []byte
prepareBuf := func(buf []byte, sizeNeeded int) []byte {
if cap(buf) < sizeNeeded {
@ -208,8 +211,6 @@ func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexR
return buf[0:sizeNeeded]
}
dictionaryDeltas := make(map[string]int64)
for _, rows := range rowsOfRows {
for _, row := range rows {
tfr, ok := row.(*TermFreqRow)
@ -225,28 +226,59 @@ func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexR
}
}
kbuf = prepareBuf(kbuf, row.KeySize())
klen, err := row.KeyTo(kbuf)
addKeyBytes += row.KeySize()
addValBytes += row.ValueSize()
}
addNum += len(rows)
}
for _, dk := range deleteKeys {
deleteKeyBytes += len(dk)
}
deleteNum += len(deleteKeys)
// prepare batch
totBytes := addKeyBytes + addValBytes + deleteKeyBytes
buf, wb, err := writer.NewBatchEx(store.KVBatchOptions{
TotalBytes: totBytes,
NumSets: addNum,
NumDeletes: deleteNum,
NumMerges: 0,
})
if err != nil {
return nil, err
}
defer func() {
_ = wb.Close()
}()
for _, rows := range rowsOfRows {
for _, row := range rows {
klen, err := row.KeyTo(buf)
if err != nil {
return nil, err
}
vbuf = prepareBuf(vbuf, row.ValueSize())
vlen, err := row.ValueTo(vbuf)
vlen, err := row.ValueTo(buf[klen:])
if err != nil {
return nil, err
}
wb.Set(kbuf[0:klen], vbuf[0:vlen])
wb.Set(buf[0:klen], buf[klen:klen+vlen])
buf = buf[klen+vlen:]
}
}
for _, dk := range deleteKeys {
wb.Delete(dk)
dklen := copy(buf, dk)
wb.Delete(buf[0:dklen])
buf = buf[dklen:]
}
// write out the batch
err := writer.ExecuteBatch(wb)
err = writer.ExecuteBatch(wb)
if err != nil {
return nil, err
}

View File

@ -30,6 +30,10 @@ func (r *Reader) Get(key []byte) ([]byte, error) {
return rv, nil
}
func (r *Reader) MultiGet(keys [][]byte) ([][]byte, error) {
return store.MultiGet(r, keys)
}
func (r *Reader) PrefixIterator(prefix []byte) store.KVIterator {
cursor := r.bucket.Cursor()

View File

@ -23,6 +23,10 @@ func (w *Writer) NewBatch() store.KVBatch {
return store.NewEmulatedBatch(w.store.mo)
}
func (w *Writer) NewBatchEx(options store.KVBatchOptions) ([]byte, store.KVBatch, error) {
return make([]byte, options.TotalBytes), w.NewBatch(), nil
}
func (w *Writer) ExecuteBatch(batch store.KVBatch) error {
emulatedBatch, ok := batch.(*store.EmulatedBatch)

View File

@ -28,6 +28,10 @@ func (r *Reader) Get(key []byte) ([]byte, error) {
return b, err
}
func (r *Reader) MultiGet(keys [][]byte) ([][]byte, error) {
return store.MultiGet(r, keys)
}
func (r *Reader) PrefixIterator(prefix []byte) store.KVIterator {
byteRange := util.BytesPrefix(prefix)
iter := r.snapshot.NewIterator(byteRange, r.store.defaultReadOptions)

View File

@ -29,6 +29,10 @@ func (w *Writer) NewBatch() store.KVBatch {
return &rv
}
func (w *Writer) NewBatchEx(options store.KVBatchOptions) ([]byte, store.KVBatch, error) {
return make([]byte, options.TotalBytes), w.NewBatch(), nil
}
func (w *Writer) ExecuteBatch(b store.KVBatch) error {
batch, ok := b.(*Batch)
if !ok {

View File

@ -35,6 +35,10 @@ func (w *Reader) Get(k []byte) (v []byte, err error) {
return nil, nil
}
func (r *Reader) MultiGet(keys [][]byte) ([][]byte, error) {
return store.MultiGet(r, keys)
}
func (w *Reader) PrefixIterator(k []byte) store.KVIterator {
rv := Iterator{
t: w.t,

View File

@ -29,6 +29,10 @@ func (w *Writer) NewBatch() store.KVBatch {
return store.NewEmulatedBatch(w.s.mo)
}
func (w *Writer) NewBatchEx(options store.KVBatchOptions) ([]byte, store.KVBatch, error) {
return make([]byte, options.TotalBytes), w.NewBatch(), nil
}
func (w *Writer) ExecuteBatch(batch store.KVBatch) error {
emulatedBatch, ok := batch.(*store.EmulatedBatch)

View File

@ -40,6 +40,9 @@ type KVReader interface {
// The caller owns the bytes returned.
Get(key []byte) ([]byte, error)
// MultiGet retrieves multiple values in one call.
MultiGet(keys [][]byte) ([][]byte, error)
// PrefixIterator returns a KVIterator that will
// visit all K/V pairs with the provided prefix
PrefixIterator(prefix []byte) KVIterator
@ -91,6 +94,14 @@ type KVWriter interface {
// NewBatch returns a KVBatch for performing batch operations on this kvstore
NewBatch() KVBatch
// NewBatchEx returns a KVBatch and an associated byte array
// that's pre-sized based on the KVBatchOptions. The caller can
// use the returned byte array for keys and values associated with
// the batch. Once the batch is either executed or closed, the
// associated byte array should no longer be accessed by the
// caller.
NewBatchEx(KVBatchOptions) ([]byte, KVBatch, error)
// ExecuteBatch will execute the KVBatch, the provided KVBatch **MUST** have
// been created by the same KVStore (though not necessarily the same KVWriter)
// Batch execution is atomic, either all the operations or none will be performed
@ -100,6 +111,27 @@ type KVWriter interface {
Close() error
}
// KVBatchOptions provides the KVWriter.NewBatchEx() method with batch
// preparation and preallocation information.
type KVBatchOptions struct {
// TotalBytes is the sum of key and value bytes needed by the
// caller for the entire batch. It affects the size of the
// returned byte array of KVWrite.NewBatchEx().
TotalBytes int
// NumSets is the number of Set() calls the caller will invoke on
// the KVBatch.
NumSets int
// NumMerges is the number of Merge() calls the caller will invoke
// on the KVBatch.
NumDeletes int
// NumMerges is the number of Merge() calls the caller will invoke
// on the KVBatch.
NumMerges int
}
// KVBatch is an abstraction for making multiple KV mutations at once
type KVBatch interface {

View File

@ -17,6 +17,16 @@ func (r *Reader) Get(key []byte) (v []byte, err error) {
return
}
func (r *Reader) MultiGet(keys [][]byte) (vals [][]byte, err error) {
r.s.TimerReaderMultiGet.Time(func() {
vals, err = r.o.MultiGet(keys)
if err != nil {
r.s.AddError("Reader.MultiGet", err, nil)
}
})
return
}
func (r *Reader) PrefixIterator(prefix []byte) (i store.KVIterator) {
r.s.TimerReaderPrefixIterator.Time(func() {
i = &Iterator{s: r.s, o: r.o.PrefixIterator(prefix)}

View File

@ -33,6 +33,7 @@ type Store struct {
o store.KVStore
TimerReaderGet metrics.Timer
TimerReaderMultiGet metrics.Timer
TimerReaderPrefixIterator metrics.Timer
TimerReaderRangeIterator metrics.Timer
TimerWriterExecuteBatch metrics.Timer
@ -71,6 +72,7 @@ func New(mo store.MergeOperator, config map[string]interface{}) (store.KVStore,
o: kvs,
TimerReaderGet: metrics.NewTimer(),
TimerReaderMultiGet: metrics.NewTimer(),
TimerReaderPrefixIterator: metrics.NewTimer(),
TimerReaderRangeIterator: metrics.NewTimer(),
TimerWriterExecuteBatch: metrics.NewTimer(),
@ -141,6 +143,11 @@ func (s *Store) WriteJSON(w io.Writer) (err error) {
return
}
WriteTimerJSON(w, s.TimerReaderGet)
_, err = w.Write([]byte(`,"TimerReaderMultiGet":`))
if err != nil {
return
}
WriteTimerJSON(w, s.TimerReaderMultiGet)
_, err = w.Write([]byte(`,"TimerReaderPrefixIterator":`))
if err != nil {
return

View File

@ -23,6 +23,10 @@ func (w *Writer) NewBatch() store.KVBatch {
return &Batch{s: w.s, o: w.o.NewBatch()}
}
func (w *Writer) NewBatchEx(options store.KVBatchOptions) ([]byte, store.KVBatch, error) {
return make([]byte, options.TotalBytes), w.NewBatch(), nil
}
func (w *Writer) ExecuteBatch(b store.KVBatch) (err error) {
batch, ok := b.(*Batch)
if !ok {

28
index/store/multiget.go Normal file
View File

@ -0,0 +1,28 @@
// Copyright (c) 2016 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package store
// MultiGet is a helper function to retrieve mutiple keys from a
// KVReader, and might be used by KVStore implementations that don't
// have a native multi-get facility.
func MultiGet(kvreader KVReader, keys [][]byte) ([][]byte, error) {
vals := make([][]byte, 0, len(keys))
for i, key := range keys {
val, err := kvreader.Get(key)
if err != nil {
return nil, err
}
vals[i] = val
}
return vals, nil
}

View File

@ -40,6 +40,10 @@ func (r *reader) Get(key []byte) ([]byte, error) {
return nil, nil
}
func (r *reader) MultiGet(keys [][]byte) ([][]byte, error) {
return make([][]byte, len(keys)), nil
}
func (r *reader) PrefixIterator(prefix []byte) store.KVIterator {
return &iterator{}
}
@ -92,6 +96,10 @@ func (w *writer) NewBatch() store.KVBatch {
return &batch{}
}
func (w *writer) NewBatchEx(options store.KVBatchOptions) ([]byte, store.KVBatch, error) {
return make([]byte, options.TotalBytes), w.NewBatch(), nil
}
func (w *writer) ExecuteBatch(store.KVBatch) error {
return nil
}

View File

@ -65,15 +65,17 @@ func (udc *UpsideDownCouch) Analyze(d *document.Document) *index.AnalysisResult
analyzeField(field, true)
}
for fieldIndex, tokenFreqs := range fieldTermFreqs {
// see if any of the composite fields need this
for _, compositeField := range d.CompositeFields {
compositeField.Compose(fieldNames[fieldIndex], fieldLengths[fieldIndex], tokenFreqs)
if len(d.CompositeFields) > 0 {
for fieldIndex, tokenFreqs := range fieldTermFreqs {
// see if any of the composite fields need this
for _, compositeField := range d.CompositeFields {
compositeField.Compose(fieldNames[fieldIndex], fieldLengths[fieldIndex], tokenFreqs)
}
}
}
for _, compositeField := range d.CompositeFields {
analyzeField(compositeField, false)
for _, compositeField := range d.CompositeFields {
analyzeField(compositeField, false)
}
}
rowsCapNeeded := len(rv.Rows) + 1

View File

@ -142,19 +142,22 @@ func PutRowBuffer(buf []byte) {
}
func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]UpsideDownCouchRow, updateRowsAll [][]UpsideDownCouchRow, deleteRowsAll [][]UpsideDownCouchRow) (err error) {
// prepare batch
wb := writer.NewBatch()
defer func() {
_ = wb.Close()
}()
// buffer to work with
rowBuf := GetRowBuffer()
dictionaryDeltas := make(map[string]int64)
// add
// count up bytes needed for buffering.
addNum := 0
addKeyBytes := 0
addValBytes := 0
updateNum := 0
updateKeyBytes := 0
updateValBytes := 0
deleteNum := 0
deleteKeyBytes := 0
rowBuf := GetRowBuffer()
for _, addRows := range addRowsAll {
for _, row := range addRows {
tfr, ok := row.(*TermFrequencyRow)
@ -168,37 +171,20 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]Upsi
}
dictionaryDeltas[string(rowBuf[:dictKeySize])] += 1
}
if row.KeySize()+row.ValueSize() > len(rowBuf) {
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
}
keySize, err := row.KeyTo(rowBuf)
if err != nil {
return err
}
valSize, err := row.ValueTo(rowBuf[keySize:])
wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize])
addKeyBytes += row.KeySize()
addValBytes += row.ValueSize()
}
addNum += len(addRows)
}
// update
for _, updateRows := range updateRowsAll {
for _, row := range updateRows {
if row.KeySize()+row.ValueSize() > len(rowBuf) {
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
}
keySize, err := row.KeyTo(rowBuf)
if err != nil {
return err
}
valSize, err := row.ValueTo(rowBuf[keySize:])
if err != nil {
return err
}
wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize])
updateKeyBytes += row.KeySize()
updateValBytes += row.ValueSize()
}
updateNum += len(updateRows)
}
// delete
for _, deleteRows := range deleteRowsAll {
for _, row := range deleteRows {
tfr, ok := row.(*TermFrequencyRow)
@ -213,27 +199,89 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]Upsi
}
dictionaryDeltas[string(rowBuf[:dictKeySize])] -= 1
}
if row.KeySize()+row.ValueSize() > len(rowBuf) {
rowBuf = make([]byte, row.KeySize()+row.ValueSize())
}
keySize, err := row.KeyTo(rowBuf)
if err != nil {
return err
}
wb.Delete(rowBuf[:keySize])
deleteKeyBytes += row.KeySize()
}
}
if 8 > len(rowBuf) {
rowBuf = make([]byte, 8)
}
for dictRowKey, delta := range dictionaryDeltas {
binary.LittleEndian.PutUint64(rowBuf, uint64(delta))
wb.Merge([]byte(dictRowKey), rowBuf[0:8])
deleteNum += len(deleteRows)
}
PutRowBuffer(rowBuf)
mergeNum := len(dictionaryDeltas)
mergeKeyBytes := 0
mergeValBytes := mergeNum * 8
for dictRowKey, _ := range dictionaryDeltas {
mergeKeyBytes += len(dictRowKey)
}
// prepare batch
totBytes := addKeyBytes + addValBytes +
updateKeyBytes + updateValBytes +
deleteKeyBytes +
mergeKeyBytes + mergeValBytes
buf, wb, err := writer.NewBatchEx(store.KVBatchOptions{
TotalBytes: totBytes,
NumSets: addNum + updateNum,
NumDeletes: deleteNum,
NumMerges: mergeNum,
})
if err != nil {
return err
}
defer func() {
_ = wb.Close()
}()
// fill the batch
for _, addRows := range addRowsAll {
for _, row := range addRows {
keySize, err := row.KeyTo(buf)
if err != nil {
return err
}
valSize, err := row.ValueTo(buf[keySize:])
if err != nil {
return err
}
wb.Set(buf[:keySize], buf[keySize:keySize+valSize])
buf = buf[keySize+valSize:]
}
}
for _, updateRows := range updateRowsAll {
for _, row := range updateRows {
keySize, err := row.KeyTo(buf)
if err != nil {
return err
}
valSize, err := row.ValueTo(buf[keySize:])
if err != nil {
return err
}
wb.Set(buf[:keySize], buf[keySize:keySize+valSize])
buf = buf[keySize+valSize:]
}
}
for _, deleteRows := range deleteRowsAll {
for _, row := range deleteRows {
keySize, err := row.KeyTo(buf)
if err != nil {
return err
}
wb.Delete(buf[:keySize])
buf = buf[keySize:]
}
}
for dictRowKey, delta := range dictionaryDeltas {
dictRowKeyLen := copy(buf, dictRowKey)
binary.LittleEndian.PutUint64(buf[dictRowKeyLen:], uint64(delta))
wb.Merge(buf[:dictRowKeyLen], buf[dictRowKeyLen:dictRowKeyLen+8])
buf = buf[dictRowKeyLen+8:]
}
// write out the batch
return writer.ExecuteBatch(wb)
}