Merge pull request #339 from steveyen/WIP-moss
integrate index/store/moss KV store
This commit is contained in:
commit
265ab24b6e
|
@ -88,6 +88,7 @@ import (
|
||||||
_ "github.com/blevesearch/bleve/index/store/boltdb"
|
_ "github.com/blevesearch/bleve/index/store/boltdb"
|
||||||
_ "github.com/blevesearch/bleve/index/store/goleveldb"
|
_ "github.com/blevesearch/bleve/index/store/goleveldb"
|
||||||
_ "github.com/blevesearch/bleve/index/store/gtreap"
|
_ "github.com/blevesearch/bleve/index/store/gtreap"
|
||||||
|
_ "github.com/blevesearch/bleve/index/store/moss"
|
||||||
|
|
||||||
// index types
|
// index types
|
||||||
_ "github.com/blevesearch/bleve/index/firestorm"
|
_ "github.com/blevesearch/bleve/index/firestorm"
|
||||||
|
|
81
index/store/moss/batch.go
Normal file
81
index/store/moss/batch.go
Normal file
|
@ -0,0 +1,81 @@
|
||||||
|
// Copyright (c) 2016 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the
|
||||||
|
// License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing,
|
||||||
|
// software distributed under the License is distributed on an "AS
|
||||||
|
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||||
|
// express or implied. See the License for the specific language
|
||||||
|
// governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
package moss
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/couchbase/moss"
|
||||||
|
|
||||||
|
"github.com/blevesearch/bleve/index/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Batch struct {
|
||||||
|
store *Store
|
||||||
|
merge *store.EmulatedMerge
|
||||||
|
batch moss.Batch
|
||||||
|
buf []byte // Non-nil when using pre-alloc'ed / NewBatchEx().
|
||||||
|
bufUsed int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Batch) Set(key, val []byte) {
|
||||||
|
var err error
|
||||||
|
if b.buf != nil {
|
||||||
|
b.bufUsed += len(key) + len(val)
|
||||||
|
err = b.batch.AllocSet(key, val)
|
||||||
|
} else {
|
||||||
|
err = b.batch.Set(key, val)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
b.store.Logf("bleve moss batch.Set err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Batch) Delete(key []byte) {
|
||||||
|
var err error
|
||||||
|
if b.buf != nil {
|
||||||
|
b.bufUsed += len(key)
|
||||||
|
err = b.batch.AllocDel(key)
|
||||||
|
} else {
|
||||||
|
err = b.batch.Del(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
b.store.Logf("bleve moss batch.Delete err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Batch) Merge(key, val []byte) {
|
||||||
|
b.merge.Merge(key, val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Batch) Reset() {
|
||||||
|
err := b.Close()
|
||||||
|
if err != nil {
|
||||||
|
b.store.Logf("bleve moss batch.Close err: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
batch, err := b.store.ms.NewBatch(0, 0)
|
||||||
|
if err == nil {
|
||||||
|
b.batch = batch
|
||||||
|
b.merge = store.NewEmulatedMerge(b.store.mo)
|
||||||
|
b.buf = nil
|
||||||
|
b.bufUsed = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Batch) Close() error {
|
||||||
|
b.merge = nil
|
||||||
|
err := b.batch.Close()
|
||||||
|
b.batch = nil
|
||||||
|
return err
|
||||||
|
}
|
134
index/store/moss/iterator.go
Normal file
134
index/store/moss/iterator.go
Normal file
|
@ -0,0 +1,134 @@
|
||||||
|
// Copyright (c) 2016 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the
|
||||||
|
// License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing,
|
||||||
|
// software distributed under the License is distributed on an "AS
|
||||||
|
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||||
|
// express or implied. See the License for the specific language
|
||||||
|
// governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
package moss
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
|
||||||
|
"github.com/couchbase/moss"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Iterator struct {
|
||||||
|
store *Store
|
||||||
|
ss moss.Snapshot
|
||||||
|
iter moss.Iterator
|
||||||
|
prefix []byte
|
||||||
|
start []byte
|
||||||
|
end []byte
|
||||||
|
done bool
|
||||||
|
k []byte
|
||||||
|
v []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Iterator) Seek(seekToKey []byte) {
|
||||||
|
x.done = true
|
||||||
|
x.k = nil
|
||||||
|
x.v = nil
|
||||||
|
|
||||||
|
if bytes.Compare(seekToKey, x.start) < 0 {
|
||||||
|
seekToKey = x.start
|
||||||
|
}
|
||||||
|
|
||||||
|
iter, err := x.ss.StartIterator(seekToKey, x.end, moss.IteratorOptions{})
|
||||||
|
if err != nil {
|
||||||
|
x.store.Logf("bleve moss StartIterator err: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = x.iter.Close()
|
||||||
|
if err != nil {
|
||||||
|
x.store.Logf("bleve moss iterator.Seek err: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
x.iter = iter
|
||||||
|
|
||||||
|
x.checkDone()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Iterator) Next() {
|
||||||
|
if x.done {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
x.done = true
|
||||||
|
x.k = nil
|
||||||
|
x.v = nil
|
||||||
|
|
||||||
|
err := x.iter.Next()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
x.checkDone()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Iterator) Current() ([]byte, []byte, bool) {
|
||||||
|
return x.k, x.v, !x.done
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Iterator) Key() []byte {
|
||||||
|
if x.done {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.k
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Iterator) Value() []byte {
|
||||||
|
if x.done {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Iterator) Valid() bool {
|
||||||
|
return !x.done
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Iterator) Close() error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
x.ss = nil
|
||||||
|
|
||||||
|
if x.iter != nil {
|
||||||
|
err = x.iter.Close()
|
||||||
|
x.iter = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
x.prefix = nil
|
||||||
|
x.done = true
|
||||||
|
x.k = nil
|
||||||
|
x.v = nil
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Iterator) checkDone() {
|
||||||
|
x.done = true
|
||||||
|
x.k = nil
|
||||||
|
x.v = nil
|
||||||
|
|
||||||
|
k, v, err := x.iter.Current()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if x.prefix != nil && !bytes.HasPrefix(k, x.prefix) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
x.done = false
|
||||||
|
x.k = k
|
||||||
|
x.v = v
|
||||||
|
}
|
397
index/store/moss/lower.go
Normal file
397
index/store/moss/lower.go
Normal file
|
@ -0,0 +1,397 @@
|
||||||
|
// Copyright (c) 2016 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the
|
||||||
|
// License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing,
|
||||||
|
// software distributed under the License is distributed on an "AS
|
||||||
|
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||||
|
// express or implied. See the License for the specific language
|
||||||
|
// governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
// Package moss provides a KVStore implementation based on the
|
||||||
|
// github.com/couchbaselabs/moss library.
|
||||||
|
|
||||||
|
package moss
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/couchbase/moss"
|
||||||
|
|
||||||
|
"github.com/blevesearch/bleve/index/store"
|
||||||
|
"github.com/blevesearch/bleve/registry"
|
||||||
|
)
|
||||||
|
|
||||||
|
func initLowerLevelStore(
|
||||||
|
mo store.MergeOperator,
|
||||||
|
config map[string]interface{},
|
||||||
|
lowerLevelStoreName string,
|
||||||
|
lowerLevelStoreConfig map[string]interface{},
|
||||||
|
lowerLevelMaxBatchSize uint64,
|
||||||
|
logf func(format string, a ...interface{}),
|
||||||
|
) (moss.Snapshot, moss.LowerLevelUpdate, error) {
|
||||||
|
constructor := registry.KVStoreConstructorByName(lowerLevelStoreName)
|
||||||
|
if constructor == nil {
|
||||||
|
return nil, nil, fmt.Errorf("moss store, initLowerLevelStore,"+
|
||||||
|
" could not find lower level store: %s", lowerLevelStoreName)
|
||||||
|
}
|
||||||
|
|
||||||
|
if lowerLevelStoreConfig == nil {
|
||||||
|
lowerLevelStoreConfig = map[string]interface{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range config {
|
||||||
|
_, exists := lowerLevelStoreConfig[k]
|
||||||
|
if !exists {
|
||||||
|
lowerLevelStoreConfig[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
kvStore, err := constructor(mo, lowerLevelStoreConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
llStore := &llStore{
|
||||||
|
refs: 0,
|
||||||
|
kvStore: kvStore,
|
||||||
|
logf: logf,
|
||||||
|
}
|
||||||
|
|
||||||
|
llUpdate := func(ssHigher moss.Snapshot) (ssLower moss.Snapshot, err error) {
|
||||||
|
return llStore.update(ssHigher, lowerLevelMaxBatchSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
llSnapshot, err := llUpdate(nil)
|
||||||
|
if err != nil {
|
||||||
|
_ = kvStore.Close()
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return llSnapshot, llUpdate, nil // llStore.refs is now 1.
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------
|
||||||
|
|
||||||
|
// llStore is a lower level store and provides ref-counting around a
|
||||||
|
// bleve store.KVStore.
|
||||||
|
type llStore struct {
|
||||||
|
kvStore store.KVStore
|
||||||
|
|
||||||
|
logf func(format string, a ...interface{})
|
||||||
|
|
||||||
|
m sync.Mutex // Protects fields that follow.
|
||||||
|
refs int
|
||||||
|
}
|
||||||
|
|
||||||
|
// llSnapshot represents a lower-level snapshot, wrapping a bleve
|
||||||
|
// store.KVReader, and implements the moss.Snapshot interface.
|
||||||
|
type llSnapshot struct {
|
||||||
|
llStore *llStore // Holds 1 refs on the llStore.
|
||||||
|
kvReader store.KVReader
|
||||||
|
|
||||||
|
m sync.Mutex // Protects fields that follow.
|
||||||
|
refs int
|
||||||
|
}
|
||||||
|
|
||||||
|
// llIterator represents a lower-level iterator, wrapping a bleve
|
||||||
|
// store.KVIterator, and implements the moss.Iterator interface.
|
||||||
|
type llIterator struct {
|
||||||
|
llSnapshot *llSnapshot // Holds 1 refs on the llSnapshot.
|
||||||
|
|
||||||
|
// Some lower-level KVReader implementations need a separate
|
||||||
|
// KVReader clone, due to KVReader single-threaded'ness.
|
||||||
|
kvReader store.KVReader
|
||||||
|
|
||||||
|
kvIterator store.KVIterator
|
||||||
|
}
|
||||||
|
|
||||||
|
type readerSource interface {
|
||||||
|
Reader() (store.KVReader, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------
|
||||||
|
|
||||||
|
func (s *llStore) addRef() *llStore {
|
||||||
|
s.m.Lock()
|
||||||
|
s.refs += 1
|
||||||
|
s.m.Unlock()
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *llStore) decRef() {
|
||||||
|
s.m.Lock()
|
||||||
|
s.refs -= 1
|
||||||
|
if s.refs <= 0 {
|
||||||
|
err := s.kvStore.Close()
|
||||||
|
if err != nil {
|
||||||
|
s.logf("llStore kvStore.Close err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.m.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// update() mutates this lower level store with latest data from the
|
||||||
|
// given higher level moss.Snapshot and returns a new moss.Snapshot
|
||||||
|
// that the higher level can use which represents this lower level
|
||||||
|
// store.
|
||||||
|
func (s *llStore) update(ssHigher moss.Snapshot, maxBatchSize uint64) (
|
||||||
|
ssLower moss.Snapshot, err error) {
|
||||||
|
if ssHigher != nil {
|
||||||
|
iter, err := ssHigher.StartIterator(nil, nil, moss.IteratorOptions{
|
||||||
|
IncludeDeletions: true,
|
||||||
|
SkipLowerLevel: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
err = iter.Close()
|
||||||
|
if err != nil {
|
||||||
|
s.logf("llStore iter.Close err: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
kvWriter, err := s.kvStore.Writer()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
err = kvWriter.Close()
|
||||||
|
if err != nil {
|
||||||
|
s.logf("llStore kvWriter.Close err: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
batch := kvWriter.NewBatch()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if batch != nil {
|
||||||
|
err = batch.Close()
|
||||||
|
if err != nil {
|
||||||
|
s.logf("llStore batch.Close err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var readOptions moss.ReadOptions
|
||||||
|
|
||||||
|
i := uint64(0)
|
||||||
|
for {
|
||||||
|
if i%1000000 == 0 {
|
||||||
|
s.logf("llStore.update, i: %d", i)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = iter.Next()
|
||||||
|
if err == moss.ErrIteratorDone {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ex, key, val, err := iter.CurrentEx()
|
||||||
|
if err == moss.ErrIteratorDone {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch ex.Operation {
|
||||||
|
case moss.OperationSet:
|
||||||
|
batch.Set(key, val)
|
||||||
|
|
||||||
|
case moss.OperationDel:
|
||||||
|
batch.Delete(key)
|
||||||
|
|
||||||
|
case moss.OperationMerge:
|
||||||
|
val, err = ssHigher.Get(key, readOptions)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if val != nil {
|
||||||
|
batch.Set(key, val)
|
||||||
|
} else {
|
||||||
|
batch.Delete(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("moss store, update,"+
|
||||||
|
" unexpected operation, ex: %v", ex)
|
||||||
|
}
|
||||||
|
|
||||||
|
if maxBatchSize > 0 && i%maxBatchSize == 0 {
|
||||||
|
err = kvWriter.ExecuteBatch(batch)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = batch.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
batch = kvWriter.NewBatch()
|
||||||
|
}
|
||||||
|
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
|
||||||
|
if i > 0 {
|
||||||
|
s.logf("llStore.update, total: %d, ExecuteBatch: ...", i)
|
||||||
|
|
||||||
|
err = kvWriter.ExecuteBatch(batch)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logf("llStore.update, total: %d, ExecuteBatch: done", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
kvReader, err := s.kvStore.Reader()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logf("llStore.update, new reader")
|
||||||
|
|
||||||
|
return &llSnapshot{
|
||||||
|
llStore: s.addRef(),
|
||||||
|
kvReader: kvReader,
|
||||||
|
refs: 1,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------
|
||||||
|
|
||||||
|
func (llss *llSnapshot) addRef() *llSnapshot {
|
||||||
|
llss.m.Lock()
|
||||||
|
llss.refs += 1
|
||||||
|
llss.m.Unlock()
|
||||||
|
|
||||||
|
return llss
|
||||||
|
}
|
||||||
|
|
||||||
|
func (llss *llSnapshot) decRef() {
|
||||||
|
llss.m.Lock()
|
||||||
|
llss.refs -= 1
|
||||||
|
if llss.refs <= 0 {
|
||||||
|
if llss.kvReader != nil {
|
||||||
|
err := llss.kvReader.Close()
|
||||||
|
if err != nil {
|
||||||
|
llss.llStore.logf("llSnapshot kvReader.Close err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
llss.kvReader = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if llss.llStore != nil {
|
||||||
|
llss.llStore.decRef()
|
||||||
|
llss.llStore = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
llss.m.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (llss *llSnapshot) Close() error {
|
||||||
|
llss.decRef()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (llss *llSnapshot) Get(key []byte,
|
||||||
|
readOptions moss.ReadOptions) ([]byte, error) {
|
||||||
|
rs, ok := llss.kvReader.(readerSource)
|
||||||
|
if ok {
|
||||||
|
r2, err := rs.Reader()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
val, err := r2.Get(key)
|
||||||
|
|
||||||
|
_ = r2.Close()
|
||||||
|
|
||||||
|
return val, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return llss.kvReader.Get(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (llss *llSnapshot) StartIterator(
|
||||||
|
startKeyInclusive, endKeyExclusive []byte,
|
||||||
|
iteratorOptions moss.IteratorOptions) (moss.Iterator, error) {
|
||||||
|
rs, ok := llss.kvReader.(readerSource)
|
||||||
|
if ok {
|
||||||
|
r2, err := rs.Reader()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
i2 := r2.RangeIterator(startKeyInclusive, endKeyExclusive)
|
||||||
|
|
||||||
|
return &llIterator{llSnapshot: llss.addRef(), kvReader: r2, kvIterator: i2}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
i := llss.kvReader.RangeIterator(startKeyInclusive, endKeyExclusive)
|
||||||
|
|
||||||
|
return &llIterator{llSnapshot: llss.addRef(), kvReader: nil, kvIterator: i}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------
|
||||||
|
|
||||||
|
func (lli *llIterator) Close() error {
|
||||||
|
var err0 error
|
||||||
|
if lli.kvIterator != nil {
|
||||||
|
err0 = lli.kvIterator.Close()
|
||||||
|
lli.kvIterator = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var err1 error
|
||||||
|
if lli.kvReader != nil {
|
||||||
|
err1 = lli.kvReader.Close()
|
||||||
|
lli.kvReader = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
lli.llSnapshot.decRef()
|
||||||
|
lli.llSnapshot = nil
|
||||||
|
|
||||||
|
if err0 != nil {
|
||||||
|
return err0
|
||||||
|
}
|
||||||
|
|
||||||
|
if err1 != nil {
|
||||||
|
return err1
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lli *llIterator) Next() error {
|
||||||
|
lli.kvIterator.Next()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lli *llIterator) Current() (key, val []byte, err error) {
|
||||||
|
key, val, ok := lli.kvIterator.Current()
|
||||||
|
if !ok {
|
||||||
|
return nil, nil, moss.ErrIteratorDone
|
||||||
|
}
|
||||||
|
|
||||||
|
return key, val, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lli *llIterator) CurrentEx() (
|
||||||
|
entryEx moss.EntryEx, key, val []byte, err error) {
|
||||||
|
return moss.EntryEx{}, nil, nil, moss.ErrUnimplemented
|
||||||
|
|
||||||
|
}
|
82
index/store/moss/reader.go
Normal file
82
index/store/moss/reader.go
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
// Copyright (c) 2016 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the
|
||||||
|
// License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing,
|
||||||
|
// software distributed under the License is distributed on an "AS
|
||||||
|
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||||
|
// express or implied. See the License for the specific language
|
||||||
|
// governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
package moss
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/couchbase/moss"
|
||||||
|
|
||||||
|
"github.com/blevesearch/bleve/index/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Reader struct {
|
||||||
|
store *Store
|
||||||
|
ss moss.Snapshot
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) Get(k []byte) (v []byte, err error) {
|
||||||
|
v, err = r.ss.Get(k, moss.ReadOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if v != nil {
|
||||||
|
return append([]byte(nil), v...), nil
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) MultiGet(keys [][]byte) ([][]byte, error) {
|
||||||
|
return store.MultiGet(r, keys)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) PrefixIterator(k []byte) store.KVIterator {
|
||||||
|
iter, err := r.ss.StartIterator(k, nil, moss.IteratorOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
rv := &Iterator{
|
||||||
|
store: r.store,
|
||||||
|
ss: r.ss,
|
||||||
|
iter: iter,
|
||||||
|
prefix: k,
|
||||||
|
start: k,
|
||||||
|
end: nil,
|
||||||
|
}
|
||||||
|
|
||||||
|
rv.checkDone()
|
||||||
|
|
||||||
|
return rv
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) RangeIterator(start, end []byte) store.KVIterator {
|
||||||
|
iter, err := r.ss.StartIterator(start, end, moss.IteratorOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
rv := &Iterator{
|
||||||
|
store: r.store,
|
||||||
|
ss: r.ss,
|
||||||
|
iter: iter,
|
||||||
|
prefix: nil,
|
||||||
|
start: start,
|
||||||
|
end: end,
|
||||||
|
}
|
||||||
|
|
||||||
|
rv.checkDone()
|
||||||
|
|
||||||
|
return rv
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) Close() error {
|
||||||
|
return r.ss.Close()
|
||||||
|
}
|
190
index/store/moss/store.go
Normal file
190
index/store/moss/store.go
Normal file
|
@ -0,0 +1,190 @@
|
||||||
|
// Copyright (c) 2016 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the
|
||||||
|
// License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing,
|
||||||
|
// software distributed under the License is distributed on an "AS
|
||||||
|
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||||
|
// express or implied. See the License for the specific language
|
||||||
|
// governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
// Package moss provides a KVStore implementation based on the
|
||||||
|
// github.com/couchbaselabs/moss library.
|
||||||
|
|
||||||
|
package moss
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/couchbase/moss"
|
||||||
|
|
||||||
|
"github.com/blevesearch/bleve/index/store"
|
||||||
|
"github.com/blevesearch/bleve/registry"
|
||||||
|
)
|
||||||
|
|
||||||
|
const Name = "moss"
|
||||||
|
|
||||||
|
type Store struct {
|
||||||
|
m sync.Mutex
|
||||||
|
ms moss.Collection
|
||||||
|
mo store.MergeOperator
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(mo store.MergeOperator, config map[string]interface{}) (
|
||||||
|
store.KVStore, error) {
|
||||||
|
return NewEx(mo, config, moss.CollectionOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEx(mo store.MergeOperator, config map[string]interface{},
|
||||||
|
options moss.CollectionOptions) (store.KVStore, error) {
|
||||||
|
debug := moss.DefaultCollectionOptions.Debug
|
||||||
|
v, ok := config["mossDebug"]
|
||||||
|
if ok {
|
||||||
|
debugF, ok := v.(float64)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("moss store,"+
|
||||||
|
" could not parse config[mossDebug]: %v", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
debug = int(debugF)
|
||||||
|
}
|
||||||
|
|
||||||
|
minMergePercentage := moss.DefaultCollectionOptions.MinMergePercentage
|
||||||
|
v, ok = config["mossMinMergePercentage"]
|
||||||
|
if ok {
|
||||||
|
minMergePercentage, ok = v.(float64)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("moss store,"+
|
||||||
|
" could not parse config[mossMinMergePercentage]: %v", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
maxStackDirtyTopHeight := moss.DefaultCollectionOptions.MaxStackDirtyTopHeight
|
||||||
|
v, ok = config["mossMaxStackDirtyTopHeight"]
|
||||||
|
if ok {
|
||||||
|
maxStackDirtyTopHeightF, ok := v.(float64)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("moss store,"+
|
||||||
|
" could not parse config[mossMaxStackDirtyTopHeight]: %v", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
maxStackDirtyTopHeight = int(maxStackDirtyTopHeightF)
|
||||||
|
}
|
||||||
|
|
||||||
|
mossLowerLevelStoreName := ""
|
||||||
|
v, ok = config["mossLowerLevelStoreName"]
|
||||||
|
if ok {
|
||||||
|
mossLowerLevelStoreName, ok = v.(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("moss store,"+
|
||||||
|
" could not parse config[mossLowerLevelStoreName]: %v", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mossLowerLevelMaxBatchSize := uint64(0)
|
||||||
|
v, ok = config["mossLowerLevelMaxBatchSize"]
|
||||||
|
if ok {
|
||||||
|
mossLowerLevelMaxBatchSizeF, ok := v.(float64)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("moss store,"+
|
||||||
|
" could not parse config[mossLowerLevelMaxBatchSize]: %v", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
mossLowerLevelMaxBatchSize = uint64(mossLowerLevelMaxBatchSizeF)
|
||||||
|
}
|
||||||
|
|
||||||
|
// --------------------------------------------------
|
||||||
|
|
||||||
|
if options.MergeOperator == nil {
|
||||||
|
options.MergeOperator = mo
|
||||||
|
}
|
||||||
|
|
||||||
|
if options.MinMergePercentage <= 0 {
|
||||||
|
options.MinMergePercentage = minMergePercentage
|
||||||
|
}
|
||||||
|
|
||||||
|
if options.MaxStackDirtyTopHeight <= 0 {
|
||||||
|
options.MaxStackDirtyTopHeight = maxStackDirtyTopHeight
|
||||||
|
}
|
||||||
|
|
||||||
|
if options.Debug <= 0 {
|
||||||
|
options.Debug = debug
|
||||||
|
}
|
||||||
|
|
||||||
|
if options.Log == nil {
|
||||||
|
options.Log = func(format string, a ...interface{}) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
if options.LowerLevelInit == nil &&
|
||||||
|
options.LowerLevelUpdate == nil &&
|
||||||
|
mossLowerLevelStoreName != "" {
|
||||||
|
mossLowerLevelStoreConfig := map[string]interface{}{}
|
||||||
|
|
||||||
|
v, ok := config["mossLowerLevelStoreConfig"]
|
||||||
|
if ok {
|
||||||
|
mossLowerLevelStoreConfig, ok = v.(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("moss store, initLowerLevelStore,"+
|
||||||
|
" could parse mossLowerLevelStoreConfig: %v", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
lowerLevelInit, lowerLevelUpdate, err :=
|
||||||
|
initLowerLevelStore(mo, config,
|
||||||
|
mossLowerLevelStoreName,
|
||||||
|
mossLowerLevelStoreConfig,
|
||||||
|
mossLowerLevelMaxBatchSize,
|
||||||
|
options.Log)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
options.LowerLevelInit = lowerLevelInit
|
||||||
|
options.LowerLevelUpdate = lowerLevelUpdate
|
||||||
|
}
|
||||||
|
|
||||||
|
// --------------------------------------------------
|
||||||
|
|
||||||
|
ms, err := moss.NewCollection(options)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
err = ms.Start()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
rv := Store{
|
||||||
|
ms: ms,
|
||||||
|
mo: mo,
|
||||||
|
}
|
||||||
|
return &rv, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Close() error {
|
||||||
|
return s.ms.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Reader() (store.KVReader, error) {
|
||||||
|
ss, err := s.ms.Snapshot()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &Reader{ss: ss}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Writer() (store.KVWriter, error) {
|
||||||
|
return &Writer{s: s}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Logf(fmt string, args ...interface{}) {
|
||||||
|
options := s.ms.Options()
|
||||||
|
if options.Log != nil {
|
||||||
|
options.Log(fmt, args...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
registry.RegisterKVStore(Name, New)
|
||||||
|
}
|
88
index/store/moss/store_test.go
Normal file
88
index/store/moss/store_test.go
Normal file
|
@ -0,0 +1,88 @@
|
||||||
|
// Copyright (c) 2016 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the
|
||||||
|
// License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing,
|
||||||
|
// software distributed under the License is distributed on an "AS
|
||||||
|
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||||
|
// express or implied. See the License for the specific language
|
||||||
|
// governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
package moss
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/blevesearch/bleve/index/store"
|
||||||
|
"github.com/blevesearch/bleve/index/store/test"
|
||||||
|
)
|
||||||
|
|
||||||
|
func open(t *testing.T, mo store.MergeOperator) store.KVStore {
|
||||||
|
rv, err := New(mo, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return rv
|
||||||
|
}
|
||||||
|
|
||||||
|
func cleanup(t *testing.T, s store.KVStore) {
|
||||||
|
err := s.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMossKVCrud(t *testing.T) {
|
||||||
|
s := open(t, nil)
|
||||||
|
defer cleanup(t, s)
|
||||||
|
test.CommonTestKVCrud(t, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMossReaderIsolation(t *testing.T) {
|
||||||
|
s := open(t, nil)
|
||||||
|
defer cleanup(t, s)
|
||||||
|
test.CommonTestReaderIsolation(t, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMossReaderOwnsGetBytes(t *testing.T) {
|
||||||
|
s := open(t, nil)
|
||||||
|
defer cleanup(t, s)
|
||||||
|
test.CommonTestReaderOwnsGetBytes(t, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMossWriterOwnsBytes(t *testing.T) {
|
||||||
|
s := open(t, nil)
|
||||||
|
defer cleanup(t, s)
|
||||||
|
test.CommonTestWriterOwnsBytes(t, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMossPrefixIterator(t *testing.T) {
|
||||||
|
s := open(t, nil)
|
||||||
|
defer cleanup(t, s)
|
||||||
|
test.CommonTestPrefixIterator(t, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMossPrefixIteratorSeek(t *testing.T) {
|
||||||
|
s := open(t, nil)
|
||||||
|
defer cleanup(t, s)
|
||||||
|
test.CommonTestPrefixIteratorSeek(t, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMossRangeIterator(t *testing.T) {
|
||||||
|
s := open(t, nil)
|
||||||
|
defer cleanup(t, s)
|
||||||
|
test.CommonTestRangeIterator(t, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMossRangeIteratorSeek(t *testing.T) {
|
||||||
|
s := open(t, nil)
|
||||||
|
defer cleanup(t, s)
|
||||||
|
test.CommonTestRangeIteratorSeek(t, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMossMerge(t *testing.T) {
|
||||||
|
s := open(t, &test.TestMergeCounter{})
|
||||||
|
defer cleanup(t, s)
|
||||||
|
test.CommonTestMerge(t, s)
|
||||||
|
}
|
94
index/store/moss/writer.go
Normal file
94
index/store/moss/writer.go
Normal file
|
@ -0,0 +1,94 @@
|
||||||
|
// Copyright (c) 2016 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the
|
||||||
|
// License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing,
|
||||||
|
// software distributed under the License is distributed on an "AS
|
||||||
|
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||||
|
// express or implied. See the License for the specific language
|
||||||
|
// governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
package moss
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/blevesearch/bleve/index/store"
|
||||||
|
|
||||||
|
"github.com/couchbase/moss"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Writer struct {
|
||||||
|
s *Store
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) NewBatch() store.KVBatch {
|
||||||
|
b, err := w.s.ms.NewBatch(0, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Batch{
|
||||||
|
store: w.s,
|
||||||
|
merge: store.NewEmulatedMerge(w.s.mo),
|
||||||
|
batch: b,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) NewBatchEx(options store.KVBatchOptions) (
|
||||||
|
[]byte, store.KVBatch, error) {
|
||||||
|
numOps := options.NumSets + options.NumDeletes + options.NumMerges
|
||||||
|
|
||||||
|
b, err := w.s.ms.NewBatch(numOps, options.TotalBytes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
buf, err := b.Alloc(options.TotalBytes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return buf, &Batch{
|
||||||
|
store: w.s,
|
||||||
|
merge: store.NewEmulatedMerge(w.s.mo),
|
||||||
|
batch: b,
|
||||||
|
buf: buf,
|
||||||
|
bufUsed: 0,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) ExecuteBatch(b store.KVBatch) (err error) {
|
||||||
|
batch, ok := b.(*Batch)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("wrong type of batch")
|
||||||
|
}
|
||||||
|
|
||||||
|
for kStr, mergeOps := range batch.merge.Merges {
|
||||||
|
for _, v := range mergeOps {
|
||||||
|
if batch.buf != nil {
|
||||||
|
kLen := len(kStr)
|
||||||
|
vLen := len(v)
|
||||||
|
kBuf := batch.buf[batch.bufUsed : batch.bufUsed+kLen]
|
||||||
|
vBuf := batch.buf[batch.bufUsed+kLen : batch.bufUsed+kLen+vLen]
|
||||||
|
copy(kBuf, kStr)
|
||||||
|
copy(vBuf, v)
|
||||||
|
batch.bufUsed += kLen + vLen
|
||||||
|
err = batch.batch.AllocMerge(kBuf, vBuf)
|
||||||
|
} else {
|
||||||
|
err = batch.batch.Merge([]byte(kStr), v)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return w.s.ms.ExecuteBatch(batch.batch, moss.WriteOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) Close() error {
|
||||||
|
w.s = nil
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -234,6 +234,8 @@ func NewFieldRowKV(key, value []byte) (*FieldRow, error) {
|
||||||
|
|
||||||
// DICTIONARY
|
// DICTIONARY
|
||||||
|
|
||||||
|
const DictionaryRowMaxValueSize = binary.MaxVarintLen64
|
||||||
|
|
||||||
type DictionaryRow struct {
|
type DictionaryRow struct {
|
||||||
field uint16
|
field uint16
|
||||||
term []byte
|
term []byte
|
||||||
|
@ -264,7 +266,7 @@ func (dr *DictionaryRow) Value() []byte {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dr *DictionaryRow) ValueSize() int {
|
func (dr *DictionaryRow) ValueSize() int {
|
||||||
return binary.MaxVarintLen64
|
return DictionaryRowMaxValueSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dr *DictionaryRow) ValueTo(buf []byte) (int, error) {
|
func (dr *DictionaryRow) ValueTo(buf []byte) (int, error) {
|
||||||
|
|
|
@ -208,7 +208,7 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]Upsi
|
||||||
|
|
||||||
mergeNum := len(dictionaryDeltas)
|
mergeNum := len(dictionaryDeltas)
|
||||||
mergeKeyBytes := 0
|
mergeKeyBytes := 0
|
||||||
mergeValBytes := mergeNum * 8
|
mergeValBytes := mergeNum * DictionaryRowMaxValueSize
|
||||||
|
|
||||||
for dictRowKey, _ := range dictionaryDeltas {
|
for dictRowKey, _ := range dictionaryDeltas {
|
||||||
mergeKeyBytes += len(dictRowKey)
|
mergeKeyBytes += len(dictRowKey)
|
||||||
|
@ -278,8 +278,8 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]Upsi
|
||||||
for dictRowKey, delta := range dictionaryDeltas {
|
for dictRowKey, delta := range dictionaryDeltas {
|
||||||
dictRowKeyLen := copy(buf, dictRowKey)
|
dictRowKeyLen := copy(buf, dictRowKey)
|
||||||
binary.LittleEndian.PutUint64(buf[dictRowKeyLen:], uint64(delta))
|
binary.LittleEndian.PutUint64(buf[dictRowKeyLen:], uint64(delta))
|
||||||
wb.Merge(buf[:dictRowKeyLen], buf[dictRowKeyLen:dictRowKeyLen+8])
|
wb.Merge(buf[:dictRowKeyLen], buf[dictRowKeyLen:dictRowKeyLen+DictionaryRowMaxValueSize])
|
||||||
buf = buf[dictRowKeyLen+8:]
|
buf = buf[dictRowKeyLen+DictionaryRowMaxValueSize:]
|
||||||
}
|
}
|
||||||
|
|
||||||
// write out the batch
|
// write out the batch
|
||||||
|
|
Loading…
Reference in New Issue
Block a user