diff --git a/index/firestorm/analysis.go b/index/firestorm/analysis.go index 878f2994..7bab22ec 100644 --- a/index/firestorm/analysis.go +++ b/index/firestorm/analysis.go @@ -65,15 +65,17 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult { analyzeField(field, true) } - for fieldIndex, tokenFreqs := range fieldTermFreqs { - // see if any of the composite fields need this - for _, compositeField := range d.CompositeFields { - compositeField.Compose(fieldNames[fieldIndex], fieldLengths[fieldIndex], tokenFreqs) + if len(d.CompositeFields) > 0 { + for fieldIndex, tokenFreqs := range fieldTermFreqs { + // see if any of the composite fields need this + for _, compositeField := range d.CompositeFields { + compositeField.Compose(fieldNames[fieldIndex], fieldLengths[fieldIndex], tokenFreqs) + } } - } - for _, compositeField := range d.CompositeFields { - analyzeField(compositeField, false) + for _, compositeField := range d.CompositeFields { + analyzeField(compositeField, false) + } } rowsCapNeeded := len(rv.Rows) diff --git a/index/firestorm/analysis_test.go b/index/firestorm/analysis_test.go index 26631c41..4fe0c775 100644 --- a/index/firestorm/analysis_test.go +++ b/index/firestorm/analysis_test.go @@ -74,7 +74,7 @@ func TestAnalysis(t *testing.T) { { d: document.NewDocument("a"). AddField( - document.NewTextFieldWithIndexingOptions("name", nil, []byte("test"), document.IndexField|document.StoreField|document.IncludeTermVectors)), + document.NewTextFieldWithIndexingOptions("name", nil, []byte("test"), document.IndexField|document.StoreField|document.IncludeTermVectors)), r: &index.AnalysisResult{ DocID: "a", Rows: []index.IndexRow{ diff --git a/index/firestorm/firestorm.go b/index/firestorm/firestorm.go index 73633f0b..303e8983 100644 --- a/index/firestorm/firestorm.go +++ b/index/firestorm/firestorm.go @@ -192,14 +192,17 @@ func (f *Firestorm) Delete(id string) error { func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexRow, deleteKeys [][]byte) (map[string]int64, error) { - // prepare batch - wb := writer.NewBatch() - defer func() { - _ = wb.Close() - }() + dictionaryDeltas := make(map[string]int64) + + // count up bytes needed for buffering. + addNum := 0 + addKeyBytes := 0 + addValBytes := 0 + + deleteNum := 0 + deleteKeyBytes := 0 var kbuf []byte - var vbuf []byte prepareBuf := func(buf []byte, sizeNeeded int) []byte { if cap(buf) < sizeNeeded { @@ -208,8 +211,6 @@ func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexR return buf[0:sizeNeeded] } - dictionaryDeltas := make(map[string]int64) - for _, rows := range rowsOfRows { for _, row := range rows { tfr, ok := row.(*TermFreqRow) @@ -225,28 +226,59 @@ func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexR } } - kbuf = prepareBuf(kbuf, row.KeySize()) - klen, err := row.KeyTo(kbuf) + addKeyBytes += row.KeySize() + addValBytes += row.ValueSize() + } + addNum += len(rows) + } + + for _, dk := range deleteKeys { + deleteKeyBytes += len(dk) + } + deleteNum += len(deleteKeys) + + // prepare batch + totBytes := addKeyBytes + addValBytes + deleteKeyBytes + + buf, wb, err := writer.NewBatchEx(store.KVBatchOptions{ + TotalBytes: totBytes, + NumSets: addNum, + NumDeletes: deleteNum, + NumMerges: 0, + }) + if err != nil { + return nil, err + } + defer func() { + _ = wb.Close() + }() + + for _, rows := range rowsOfRows { + for _, row := range rows { + klen, err := row.KeyTo(buf) if err != nil { return nil, err } - vbuf = prepareBuf(vbuf, row.ValueSize()) - vlen, err := row.ValueTo(vbuf) + vlen, err := row.ValueTo(buf[klen:]) if err != nil { return nil, err } - wb.Set(kbuf[0:klen], vbuf[0:vlen]) + wb.Set(buf[0:klen], buf[klen:klen+vlen]) + + buf = buf[klen+vlen:] } } for _, dk := range deleteKeys { - wb.Delete(dk) + dklen := copy(buf, dk) + wb.Delete(buf[0:dklen]) + buf = buf[dklen:] } // write out the batch - err := writer.ExecuteBatch(wb) + err = writer.ExecuteBatch(wb) if err != nil { return nil, err } diff --git a/index/store/boltdb/reader.go b/index/store/boltdb/reader.go index efb3e5dd..f8138c5d 100644 --- a/index/store/boltdb/reader.go +++ b/index/store/boltdb/reader.go @@ -30,6 +30,10 @@ func (r *Reader) Get(key []byte) ([]byte, error) { return rv, nil } +func (r *Reader) MultiGet(keys [][]byte) ([][]byte, error) { + return store.MultiGet(r, keys) +} + func (r *Reader) PrefixIterator(prefix []byte) store.KVIterator { cursor := r.bucket.Cursor() diff --git a/index/store/boltdb/writer.go b/index/store/boltdb/writer.go index 1cd29a34..4ba1d082 100644 --- a/index/store/boltdb/writer.go +++ b/index/store/boltdb/writer.go @@ -23,6 +23,10 @@ func (w *Writer) NewBatch() store.KVBatch { return store.NewEmulatedBatch(w.store.mo) } +func (w *Writer) NewBatchEx(options store.KVBatchOptions) ([]byte, store.KVBatch, error) { + return make([]byte, options.TotalBytes), w.NewBatch(), nil +} + func (w *Writer) ExecuteBatch(batch store.KVBatch) error { emulatedBatch, ok := batch.(*store.EmulatedBatch) diff --git a/index/store/goleveldb/reader.go b/index/store/goleveldb/reader.go index 7807d571..ec423679 100644 --- a/index/store/goleveldb/reader.go +++ b/index/store/goleveldb/reader.go @@ -28,6 +28,10 @@ func (r *Reader) Get(key []byte) ([]byte, error) { return b, err } +func (r *Reader) MultiGet(keys [][]byte) ([][]byte, error) { + return store.MultiGet(r, keys) +} + func (r *Reader) PrefixIterator(prefix []byte) store.KVIterator { byteRange := util.BytesPrefix(prefix) iter := r.snapshot.NewIterator(byteRange, r.store.defaultReadOptions) diff --git a/index/store/goleveldb/writer.go b/index/store/goleveldb/writer.go index c5229005..defbea13 100644 --- a/index/store/goleveldb/writer.go +++ b/index/store/goleveldb/writer.go @@ -29,6 +29,10 @@ func (w *Writer) NewBatch() store.KVBatch { return &rv } +func (w *Writer) NewBatchEx(options store.KVBatchOptions) ([]byte, store.KVBatch, error) { + return make([]byte, options.TotalBytes), w.NewBatch(), nil +} + func (w *Writer) ExecuteBatch(b store.KVBatch) error { batch, ok := b.(*Batch) if !ok { diff --git a/index/store/gtreap/reader.go b/index/store/gtreap/reader.go index 6f92a751..842d5e97 100644 --- a/index/store/gtreap/reader.go +++ b/index/store/gtreap/reader.go @@ -35,6 +35,10 @@ func (w *Reader) Get(k []byte) (v []byte, err error) { return nil, nil } +func (r *Reader) MultiGet(keys [][]byte) ([][]byte, error) { + return store.MultiGet(r, keys) +} + func (w *Reader) PrefixIterator(k []byte) store.KVIterator { rv := Iterator{ t: w.t, diff --git a/index/store/gtreap/writer.go b/index/store/gtreap/writer.go index 4490b158..add8535c 100644 --- a/index/store/gtreap/writer.go +++ b/index/store/gtreap/writer.go @@ -29,6 +29,10 @@ func (w *Writer) NewBatch() store.KVBatch { return store.NewEmulatedBatch(w.s.mo) } +func (w *Writer) NewBatchEx(options store.KVBatchOptions) ([]byte, store.KVBatch, error) { + return make([]byte, options.TotalBytes), w.NewBatch(), nil +} + func (w *Writer) ExecuteBatch(batch store.KVBatch) error { emulatedBatch, ok := batch.(*store.EmulatedBatch) diff --git a/index/store/kvstore.go b/index/store/kvstore.go index e0735832..776a5f25 100644 --- a/index/store/kvstore.go +++ b/index/store/kvstore.go @@ -40,6 +40,9 @@ type KVReader interface { // The caller owns the bytes returned. Get(key []byte) ([]byte, error) + // MultiGet retrieves multiple values in one call. + MultiGet(keys [][]byte) ([][]byte, error) + // PrefixIterator returns a KVIterator that will // visit all K/V pairs with the provided prefix PrefixIterator(prefix []byte) KVIterator @@ -91,6 +94,14 @@ type KVWriter interface { // NewBatch returns a KVBatch for performing batch operations on this kvstore NewBatch() KVBatch + // NewBatchEx returns a KVBatch and an associated byte array + // that's pre-sized based on the KVBatchOptions. The caller can + // use the returned byte array for keys and values associated with + // the batch. Once the batch is either executed or closed, the + // associated byte array should no longer be accessed by the + // caller. + NewBatchEx(KVBatchOptions) ([]byte, KVBatch, error) + // ExecuteBatch will execute the KVBatch, the provided KVBatch **MUST** have // been created by the same KVStore (though not necessarily the same KVWriter) // Batch execution is atomic, either all the operations or none will be performed @@ -100,6 +111,27 @@ type KVWriter interface { Close() error } +// KVBatchOptions provides the KVWriter.NewBatchEx() method with batch +// preparation and preallocation information. +type KVBatchOptions struct { + // TotalBytes is the sum of key and value bytes needed by the + // caller for the entire batch. It affects the size of the + // returned byte array of KVWrite.NewBatchEx(). + TotalBytes int + + // NumSets is the number of Set() calls the caller will invoke on + // the KVBatch. + NumSets int + + // NumMerges is the number of Merge() calls the caller will invoke + // on the KVBatch. + NumDeletes int + + // NumMerges is the number of Merge() calls the caller will invoke + // on the KVBatch. + NumMerges int +} + // KVBatch is an abstraction for making multiple KV mutations at once type KVBatch interface { diff --git a/index/store/metrics/reader.go b/index/store/metrics/reader.go index c555c736..01fd8182 100644 --- a/index/store/metrics/reader.go +++ b/index/store/metrics/reader.go @@ -17,6 +17,16 @@ func (r *Reader) Get(key []byte) (v []byte, err error) { return } +func (r *Reader) MultiGet(keys [][]byte) (vals [][]byte, err error) { + r.s.TimerReaderMultiGet.Time(func() { + vals, err = r.o.MultiGet(keys) + if err != nil { + r.s.AddError("Reader.MultiGet", err, nil) + } + }) + return +} + func (r *Reader) PrefixIterator(prefix []byte) (i store.KVIterator) { r.s.TimerReaderPrefixIterator.Time(func() { i = &Iterator{s: r.s, o: r.o.PrefixIterator(prefix)} diff --git a/index/store/metrics/store.go b/index/store/metrics/store.go index c7889e51..c97c7829 100644 --- a/index/store/metrics/store.go +++ b/index/store/metrics/store.go @@ -33,6 +33,7 @@ type Store struct { o store.KVStore TimerReaderGet metrics.Timer + TimerReaderMultiGet metrics.Timer TimerReaderPrefixIterator metrics.Timer TimerReaderRangeIterator metrics.Timer TimerWriterExecuteBatch metrics.Timer @@ -71,6 +72,7 @@ func New(mo store.MergeOperator, config map[string]interface{}) (store.KVStore, o: kvs, TimerReaderGet: metrics.NewTimer(), + TimerReaderMultiGet: metrics.NewTimer(), TimerReaderPrefixIterator: metrics.NewTimer(), TimerReaderRangeIterator: metrics.NewTimer(), TimerWriterExecuteBatch: metrics.NewTimer(), @@ -141,6 +143,11 @@ func (s *Store) WriteJSON(w io.Writer) (err error) { return } WriteTimerJSON(w, s.TimerReaderGet) + _, err = w.Write([]byte(`,"TimerReaderMultiGet":`)) + if err != nil { + return + } + WriteTimerJSON(w, s.TimerReaderMultiGet) _, err = w.Write([]byte(`,"TimerReaderPrefixIterator":`)) if err != nil { return diff --git a/index/store/metrics/writer.go b/index/store/metrics/writer.go index 0f23bfdf..420bf3ec 100644 --- a/index/store/metrics/writer.go +++ b/index/store/metrics/writer.go @@ -23,6 +23,10 @@ func (w *Writer) NewBatch() store.KVBatch { return &Batch{s: w.s, o: w.o.NewBatch()} } +func (w *Writer) NewBatchEx(options store.KVBatchOptions) ([]byte, store.KVBatch, error) { + return make([]byte, options.TotalBytes), w.NewBatch(), nil +} + func (w *Writer) ExecuteBatch(b store.KVBatch) (err error) { batch, ok := b.(*Batch) if !ok { diff --git a/index/store/multiget.go b/index/store/multiget.go new file mode 100644 index 00000000..bab50530 --- /dev/null +++ b/index/store/multiget.go @@ -0,0 +1,28 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed under the +// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +// either express or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package store + +// MultiGet is a helper function to retrieve mutiple keys from a +// KVReader, and might be used by KVStore implementations that don't +// have a native multi-get facility. +func MultiGet(kvreader KVReader, keys [][]byte) ([][]byte, error) { + vals := make([][]byte, 0, len(keys)) + + for i, key := range keys { + val, err := kvreader.Get(key) + if err != nil { + return nil, err + } + + vals[i] = val + } + + return vals, nil +} diff --git a/index/store/null/null.go b/index/store/null/null.go index 92be3330..90adc06a 100644 --- a/index/store/null/null.go +++ b/index/store/null/null.go @@ -40,6 +40,10 @@ func (r *reader) Get(key []byte) ([]byte, error) { return nil, nil } +func (r *reader) MultiGet(keys [][]byte) ([][]byte, error) { + return make([][]byte, len(keys)), nil +} + func (r *reader) PrefixIterator(prefix []byte) store.KVIterator { return &iterator{} } @@ -92,6 +96,10 @@ func (w *writer) NewBatch() store.KVBatch { return &batch{} } +func (w *writer) NewBatchEx(options store.KVBatchOptions) ([]byte, store.KVBatch, error) { + return make([]byte, options.TotalBytes), w.NewBatch(), nil +} + func (w *writer) ExecuteBatch(store.KVBatch) error { return nil } diff --git a/index/upside_down/analysis.go b/index/upside_down/analysis.go index dbb732b7..1fcbdafa 100644 --- a/index/upside_down/analysis.go +++ b/index/upside_down/analysis.go @@ -65,15 +65,17 @@ func (udc *UpsideDownCouch) Analyze(d *document.Document) *index.AnalysisResult analyzeField(field, true) } - for fieldIndex, tokenFreqs := range fieldTermFreqs { - // see if any of the composite fields need this - for _, compositeField := range d.CompositeFields { - compositeField.Compose(fieldNames[fieldIndex], fieldLengths[fieldIndex], tokenFreqs) + if len(d.CompositeFields) > 0 { + for fieldIndex, tokenFreqs := range fieldTermFreqs { + // see if any of the composite fields need this + for _, compositeField := range d.CompositeFields { + compositeField.Compose(fieldNames[fieldIndex], fieldLengths[fieldIndex], tokenFreqs) + } } - } - for _, compositeField := range d.CompositeFields { - analyzeField(compositeField, false) + for _, compositeField := range d.CompositeFields { + analyzeField(compositeField, false) + } } rowsCapNeeded := len(rv.Rows) + 1 diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index c24256b5..657d9eba 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -142,19 +142,22 @@ func PutRowBuffer(buf []byte) { } func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]UpsideDownCouchRow, updateRowsAll [][]UpsideDownCouchRow, deleteRowsAll [][]UpsideDownCouchRow) (err error) { - - // prepare batch - wb := writer.NewBatch() - defer func() { - _ = wb.Close() - }() - - // buffer to work with - rowBuf := GetRowBuffer() - dictionaryDeltas := make(map[string]int64) - // add + // count up bytes needed for buffering. + addNum := 0 + addKeyBytes := 0 + addValBytes := 0 + + updateNum := 0 + updateKeyBytes := 0 + updateValBytes := 0 + + deleteNum := 0 + deleteKeyBytes := 0 + + rowBuf := GetRowBuffer() + for _, addRows := range addRowsAll { for _, row := range addRows { tfr, ok := row.(*TermFrequencyRow) @@ -168,37 +171,20 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]Upsi } dictionaryDeltas[string(rowBuf[:dictKeySize])] += 1 } - if row.KeySize()+row.ValueSize() > len(rowBuf) { - rowBuf = make([]byte, row.KeySize()+row.ValueSize()) - } - keySize, err := row.KeyTo(rowBuf) - if err != nil { - return err - } - valSize, err := row.ValueTo(rowBuf[keySize:]) - wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize]) + addKeyBytes += row.KeySize() + addValBytes += row.ValueSize() } + addNum += len(addRows) } - // update for _, updateRows := range updateRowsAll { for _, row := range updateRows { - if row.KeySize()+row.ValueSize() > len(rowBuf) { - rowBuf = make([]byte, row.KeySize()+row.ValueSize()) - } - keySize, err := row.KeyTo(rowBuf) - if err != nil { - return err - } - valSize, err := row.ValueTo(rowBuf[keySize:]) - if err != nil { - return err - } - wb.Set(rowBuf[:keySize], rowBuf[keySize:keySize+valSize]) + updateKeyBytes += row.KeySize() + updateValBytes += row.ValueSize() } + updateNum += len(updateRows) } - // delete for _, deleteRows := range deleteRowsAll { for _, row := range deleteRows { tfr, ok := row.(*TermFrequencyRow) @@ -213,27 +199,89 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]Upsi } dictionaryDeltas[string(rowBuf[:dictKeySize])] -= 1 } - if row.KeySize()+row.ValueSize() > len(rowBuf) { - rowBuf = make([]byte, row.KeySize()+row.ValueSize()) - } - keySize, err := row.KeyTo(rowBuf) - if err != nil { - return err - } - wb.Delete(rowBuf[:keySize]) + deleteKeyBytes += row.KeySize() } - } - - if 8 > len(rowBuf) { - rowBuf = make([]byte, 8) - } - for dictRowKey, delta := range dictionaryDeltas { - binary.LittleEndian.PutUint64(rowBuf, uint64(delta)) - wb.Merge([]byte(dictRowKey), rowBuf[0:8]) + deleteNum += len(deleteRows) } PutRowBuffer(rowBuf) + mergeNum := len(dictionaryDeltas) + mergeKeyBytes := 0 + mergeValBytes := mergeNum * 8 + + for dictRowKey, _ := range dictionaryDeltas { + mergeKeyBytes += len(dictRowKey) + } + + // prepare batch + totBytes := addKeyBytes + addValBytes + + updateKeyBytes + updateValBytes + + deleteKeyBytes + + mergeKeyBytes + mergeValBytes + + buf, wb, err := writer.NewBatchEx(store.KVBatchOptions{ + TotalBytes: totBytes, + NumSets: addNum + updateNum, + NumDeletes: deleteNum, + NumMerges: mergeNum, + }) + if err != nil { + return err + } + defer func() { + _ = wb.Close() + }() + + // fill the batch + for _, addRows := range addRowsAll { + for _, row := range addRows { + keySize, err := row.KeyTo(buf) + if err != nil { + return err + } + valSize, err := row.ValueTo(buf[keySize:]) + if err != nil { + return err + } + wb.Set(buf[:keySize], buf[keySize:keySize+valSize]) + buf = buf[keySize+valSize:] + } + } + + for _, updateRows := range updateRowsAll { + for _, row := range updateRows { + keySize, err := row.KeyTo(buf) + if err != nil { + return err + } + valSize, err := row.ValueTo(buf[keySize:]) + if err != nil { + return err + } + wb.Set(buf[:keySize], buf[keySize:keySize+valSize]) + buf = buf[keySize+valSize:] + } + } + + for _, deleteRows := range deleteRowsAll { + for _, row := range deleteRows { + keySize, err := row.KeyTo(buf) + if err != nil { + return err + } + wb.Delete(buf[:keySize]) + buf = buf[keySize:] + } + } + + for dictRowKey, delta := range dictionaryDeltas { + dictRowKeyLen := copy(buf, dictRowKey) + binary.LittleEndian.PutUint64(buf[dictRowKeyLen:], uint64(delta)) + wb.Merge(buf[:dictRowKeyLen], buf[dictRowKeyLen:dictRowKeyLen+8]) + buf = buf[dictRowKeyLen+8:] + } + // write out the batch return writer.ExecuteBatch(wb) }