diff --git a/config/config.go b/config/config.go index a3b87f9d..c22fcec5 100644 --- a/config/config.go +++ b/config/config.go @@ -88,6 +88,7 @@ import ( _ "github.com/blevesearch/bleve/index/store/boltdb" _ "github.com/blevesearch/bleve/index/store/goleveldb" _ "github.com/blevesearch/bleve/index/store/gtreap" + _ "github.com/blevesearch/bleve/index/store/moss" // index types _ "github.com/blevesearch/bleve/index/firestorm" diff --git a/index/store/moss/batch.go b/index/store/moss/batch.go new file mode 100644 index 00000000..1263b8b0 --- /dev/null +++ b/index/store/moss/batch.go @@ -0,0 +1,81 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +package moss + +import ( + "github.com/couchbase/moss" + + "github.com/blevesearch/bleve/index/store" +) + +type Batch struct { + store *Store + merge *store.EmulatedMerge + batch moss.Batch + buf []byte // Non-nil when using pre-alloc'ed / NewBatchEx(). + bufUsed int +} + +func (b *Batch) Set(key, val []byte) { + var err error + if b.buf != nil { + b.bufUsed += len(key) + len(val) + err = b.batch.AllocSet(key, val) + } else { + err = b.batch.Set(key, val) + } + + if err != nil { + b.store.Logf("bleve moss batch.Set err: %v", err) + } +} + +func (b *Batch) Delete(key []byte) { + var err error + if b.buf != nil { + b.bufUsed += len(key) + err = b.batch.AllocDel(key) + } else { + err = b.batch.Del(key) + } + + if err != nil { + b.store.Logf("bleve moss batch.Delete err: %v", err) + } +} + +func (b *Batch) Merge(key, val []byte) { + b.merge.Merge(key, val) +} + +func (b *Batch) Reset() { + err := b.Close() + if err != nil { + b.store.Logf("bleve moss batch.Close err: %v", err) + return + } + + batch, err := b.store.ms.NewBatch(0, 0) + if err == nil { + b.batch = batch + b.merge = store.NewEmulatedMerge(b.store.mo) + b.buf = nil + b.bufUsed = 0 + } +} + +func (b *Batch) Close() error { + b.merge = nil + err := b.batch.Close() + b.batch = nil + return err +} diff --git a/index/store/moss/iterator.go b/index/store/moss/iterator.go new file mode 100644 index 00000000..cf616fe4 --- /dev/null +++ b/index/store/moss/iterator.go @@ -0,0 +1,134 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +package moss + +import ( + "bytes" + + "github.com/couchbase/moss" +) + +type Iterator struct { + store *Store + ss moss.Snapshot + iter moss.Iterator + prefix []byte + start []byte + end []byte + done bool + k []byte + v []byte +} + +func (x *Iterator) Seek(seekToKey []byte) { + x.done = true + x.k = nil + x.v = nil + + if bytes.Compare(seekToKey, x.start) < 0 { + seekToKey = x.start + } + + iter, err := x.ss.StartIterator(seekToKey, x.end, moss.IteratorOptions{}) + if err != nil { + x.store.Logf("bleve moss StartIterator err: %v", err) + return + } + + err = x.iter.Close() + if err != nil { + x.store.Logf("bleve moss iterator.Seek err: %v", err) + return + } + + x.iter = iter + + x.checkDone() +} + +func (x *Iterator) Next() { + if x.done { + return + } + + x.done = true + x.k = nil + x.v = nil + + err := x.iter.Next() + if err != nil { + return + } + + x.checkDone() +} + +func (x *Iterator) Current() ([]byte, []byte, bool) { + return x.k, x.v, !x.done +} + +func (x *Iterator) Key() []byte { + if x.done { + return nil + } + + return x.k +} + +func (x *Iterator) Value() []byte { + if x.done { + return nil + } + + return x.v +} + +func (x *Iterator) Valid() bool { + return !x.done +} + +func (x *Iterator) Close() error { + var err error + + x.ss = nil + + if x.iter != nil { + err = x.iter.Close() + x.iter = nil + } + + x.prefix = nil + x.done = true + x.k = nil + x.v = nil + + return err +} + +func (x *Iterator) checkDone() { + x.done = true + x.k = nil + x.v = nil + + k, v, err := x.iter.Current() + if err != nil { + return + } + + if x.prefix != nil && !bytes.HasPrefix(k, x.prefix) { + return + } + + x.done = false + x.k = k + x.v = v +} diff --git a/index/store/moss/lower.go b/index/store/moss/lower.go new file mode 100644 index 00000000..52049f36 --- /dev/null +++ b/index/store/moss/lower.go @@ -0,0 +1,397 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +// Package moss provides a KVStore implementation based on the +// github.com/couchbaselabs/moss library. + +package moss + +import ( + "fmt" + "sync" + + "github.com/couchbase/moss" + + "github.com/blevesearch/bleve/index/store" + "github.com/blevesearch/bleve/registry" +) + +func initLowerLevelStore( + mo store.MergeOperator, + config map[string]interface{}, + lowerLevelStoreName string, + lowerLevelStoreConfig map[string]interface{}, + lowerLevelMaxBatchSize uint64, + logf func(format string, a ...interface{}), +) (moss.Snapshot, moss.LowerLevelUpdate, error) { + constructor := registry.KVStoreConstructorByName(lowerLevelStoreName) + if constructor == nil { + return nil, nil, fmt.Errorf("moss store, initLowerLevelStore,"+ + " could not find lower level store: %s", lowerLevelStoreName) + } + + if lowerLevelStoreConfig == nil { + lowerLevelStoreConfig = map[string]interface{}{} + } + + for k, v := range config { + _, exists := lowerLevelStoreConfig[k] + if !exists { + lowerLevelStoreConfig[k] = v + } + } + + kvStore, err := constructor(mo, lowerLevelStoreConfig) + if err != nil { + return nil, nil, err + } + + llStore := &llStore{ + refs: 0, + kvStore: kvStore, + logf: logf, + } + + llUpdate := func(ssHigher moss.Snapshot) (ssLower moss.Snapshot, err error) { + return llStore.update(ssHigher, lowerLevelMaxBatchSize) + } + + llSnapshot, err := llUpdate(nil) + if err != nil { + _ = kvStore.Close() + return nil, nil, err + } + + return llSnapshot, llUpdate, nil // llStore.refs is now 1. +} + +// ------------------------------------------------ + +// llStore is a lower level store and provides ref-counting around a +// bleve store.KVStore. +type llStore struct { + kvStore store.KVStore + + logf func(format string, a ...interface{}) + + m sync.Mutex // Protects fields that follow. + refs int +} + +// llSnapshot represents a lower-level snapshot, wrapping a bleve +// store.KVReader, and implements the moss.Snapshot interface. +type llSnapshot struct { + llStore *llStore // Holds 1 refs on the llStore. + kvReader store.KVReader + + m sync.Mutex // Protects fields that follow. + refs int +} + +// llIterator represents a lower-level iterator, wrapping a bleve +// store.KVIterator, and implements the moss.Iterator interface. +type llIterator struct { + llSnapshot *llSnapshot // Holds 1 refs on the llSnapshot. + + // Some lower-level KVReader implementations need a separate + // KVReader clone, due to KVReader single-threaded'ness. + kvReader store.KVReader + + kvIterator store.KVIterator +} + +type readerSource interface { + Reader() (store.KVReader, error) +} + +// ------------------------------------------------ + +func (s *llStore) addRef() *llStore { + s.m.Lock() + s.refs += 1 + s.m.Unlock() + + return s +} + +func (s *llStore) decRef() { + s.m.Lock() + s.refs -= 1 + if s.refs <= 0 { + err := s.kvStore.Close() + if err != nil { + s.logf("llStore kvStore.Close err: %v", err) + } + } + s.m.Unlock() +} + +// update() mutates this lower level store with latest data from the +// given higher level moss.Snapshot and returns a new moss.Snapshot +// that the higher level can use which represents this lower level +// store. +func (s *llStore) update(ssHigher moss.Snapshot, maxBatchSize uint64) ( + ssLower moss.Snapshot, err error) { + if ssHigher != nil { + iter, err := ssHigher.StartIterator(nil, nil, moss.IteratorOptions{ + IncludeDeletions: true, + SkipLowerLevel: true, + }) + if err != nil { + return nil, err + } + + defer func() { + err = iter.Close() + if err != nil { + s.logf("llStore iter.Close err: %v", err) + } + }() + + kvWriter, err := s.kvStore.Writer() + if err != nil { + return nil, err + } + + defer func() { + err = kvWriter.Close() + if err != nil { + s.logf("llStore kvWriter.Close err: %v", err) + } + }() + + batch := kvWriter.NewBatch() + + defer func() { + if batch != nil { + err = batch.Close() + if err != nil { + s.logf("llStore batch.Close err: %v", err) + } + } + }() + + var readOptions moss.ReadOptions + + i := uint64(0) + for { + if i%1000000 == 0 { + s.logf("llStore.update, i: %d", i) + } + + err = iter.Next() + if err == moss.ErrIteratorDone { + break + } + if err != nil { + return nil, err + } + + ex, key, val, err := iter.CurrentEx() + if err == moss.ErrIteratorDone { + break + } + if err != nil { + return nil, err + } + + switch ex.Operation { + case moss.OperationSet: + batch.Set(key, val) + + case moss.OperationDel: + batch.Delete(key) + + case moss.OperationMerge: + val, err = ssHigher.Get(key, readOptions) + if err != nil { + return nil, err + } + + if val != nil { + batch.Set(key, val) + } else { + batch.Delete(key) + } + + default: + return nil, fmt.Errorf("moss store, update,"+ + " unexpected operation, ex: %v", ex) + } + + if maxBatchSize > 0 && i%maxBatchSize == 0 { + err = kvWriter.ExecuteBatch(batch) + if err != nil { + return nil, err + } + + err = batch.Close() + if err != nil { + return nil, err + } + + batch = kvWriter.NewBatch() + } + + i++ + } + + if i > 0 { + s.logf("llStore.update, total: %d, ExecuteBatch: ...", i) + + err = kvWriter.ExecuteBatch(batch) + if err != nil { + return nil, err + } + + s.logf("llStore.update, total: %d, ExecuteBatch: done", i) + } + } + + kvReader, err := s.kvStore.Reader() + if err != nil { + return nil, err + } + + s.logf("llStore.update, new reader") + + return &llSnapshot{ + llStore: s.addRef(), + kvReader: kvReader, + refs: 1, + }, nil +} + +// ------------------------------------------------ + +func (llss *llSnapshot) addRef() *llSnapshot { + llss.m.Lock() + llss.refs += 1 + llss.m.Unlock() + + return llss +} + +func (llss *llSnapshot) decRef() { + llss.m.Lock() + llss.refs -= 1 + if llss.refs <= 0 { + if llss.kvReader != nil { + err := llss.kvReader.Close() + if err != nil { + llss.llStore.logf("llSnapshot kvReader.Close err: %v", err) + } + + llss.kvReader = nil + } + + if llss.llStore != nil { + llss.llStore.decRef() + llss.llStore = nil + } + } + llss.m.Unlock() +} + +func (llss *llSnapshot) Close() error { + llss.decRef() + + return nil +} + +func (llss *llSnapshot) Get(key []byte, + readOptions moss.ReadOptions) ([]byte, error) { + rs, ok := llss.kvReader.(readerSource) + if ok { + r2, err := rs.Reader() + if err != nil { + return nil, err + } + + val, err := r2.Get(key) + + _ = r2.Close() + + return val, err + } + + return llss.kvReader.Get(key) +} + +func (llss *llSnapshot) StartIterator( + startKeyInclusive, endKeyExclusive []byte, + iteratorOptions moss.IteratorOptions) (moss.Iterator, error) { + rs, ok := llss.kvReader.(readerSource) + if ok { + r2, err := rs.Reader() + if err != nil { + return nil, err + } + + i2 := r2.RangeIterator(startKeyInclusive, endKeyExclusive) + + return &llIterator{llSnapshot: llss.addRef(), kvReader: r2, kvIterator: i2}, nil + } + + i := llss.kvReader.RangeIterator(startKeyInclusive, endKeyExclusive) + + return &llIterator{llSnapshot: llss.addRef(), kvReader: nil, kvIterator: i}, nil +} + +// ------------------------------------------------ + +func (lli *llIterator) Close() error { + var err0 error + if lli.kvIterator != nil { + err0 = lli.kvIterator.Close() + lli.kvIterator = nil + } + + var err1 error + if lli.kvReader != nil { + err1 = lli.kvReader.Close() + lli.kvReader = nil + } + + lli.llSnapshot.decRef() + lli.llSnapshot = nil + + if err0 != nil { + return err0 + } + + if err1 != nil { + return err1 + } + + return nil +} + +func (lli *llIterator) Next() error { + lli.kvIterator.Next() + + return nil +} + +func (lli *llIterator) Current() (key, val []byte, err error) { + key, val, ok := lli.kvIterator.Current() + if !ok { + return nil, nil, moss.ErrIteratorDone + } + + return key, val, nil +} + +func (lli *llIterator) CurrentEx() ( + entryEx moss.EntryEx, key, val []byte, err error) { + return moss.EntryEx{}, nil, nil, moss.ErrUnimplemented + +} diff --git a/index/store/moss/reader.go b/index/store/moss/reader.go new file mode 100644 index 00000000..ef7dcc02 --- /dev/null +++ b/index/store/moss/reader.go @@ -0,0 +1,82 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +package moss + +import ( + "github.com/couchbase/moss" + + "github.com/blevesearch/bleve/index/store" +) + +type Reader struct { + store *Store + ss moss.Snapshot +} + +func (r *Reader) Get(k []byte) (v []byte, err error) { + v, err = r.ss.Get(k, moss.ReadOptions{}) + if err != nil { + return nil, err + } + if v != nil { + return append([]byte(nil), v...), nil + } + return nil, nil +} + +func (r *Reader) MultiGet(keys [][]byte) ([][]byte, error) { + return store.MultiGet(r, keys) +} + +func (r *Reader) PrefixIterator(k []byte) store.KVIterator { + iter, err := r.ss.StartIterator(k, nil, moss.IteratorOptions{}) + if err != nil { + return nil + } + + rv := &Iterator{ + store: r.store, + ss: r.ss, + iter: iter, + prefix: k, + start: k, + end: nil, + } + + rv.checkDone() + + return rv +} + +func (r *Reader) RangeIterator(start, end []byte) store.KVIterator { + iter, err := r.ss.StartIterator(start, end, moss.IteratorOptions{}) + if err != nil { + return nil + } + + rv := &Iterator{ + store: r.store, + ss: r.ss, + iter: iter, + prefix: nil, + start: start, + end: end, + } + + rv.checkDone() + + return rv +} + +func (r *Reader) Close() error { + return r.ss.Close() +} diff --git a/index/store/moss/store.go b/index/store/moss/store.go new file mode 100644 index 00000000..a3ce0d9a --- /dev/null +++ b/index/store/moss/store.go @@ -0,0 +1,190 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +// Package moss provides a KVStore implementation based on the +// github.com/couchbaselabs/moss library. + +package moss + +import ( + "fmt" + "sync" + + "github.com/couchbase/moss" + + "github.com/blevesearch/bleve/index/store" + "github.com/blevesearch/bleve/registry" +) + +const Name = "moss" + +type Store struct { + m sync.Mutex + ms moss.Collection + mo store.MergeOperator +} + +func New(mo store.MergeOperator, config map[string]interface{}) ( + store.KVStore, error) { + return NewEx(mo, config, moss.CollectionOptions{}) +} + +func NewEx(mo store.MergeOperator, config map[string]interface{}, + options moss.CollectionOptions) (store.KVStore, error) { + debug := moss.DefaultCollectionOptions.Debug + v, ok := config["mossDebug"] + if ok { + debugF, ok := v.(float64) + if !ok { + return nil, fmt.Errorf("moss store,"+ + " could not parse config[mossDebug]: %v", v) + } + + debug = int(debugF) + } + + minMergePercentage := moss.DefaultCollectionOptions.MinMergePercentage + v, ok = config["mossMinMergePercentage"] + if ok { + minMergePercentage, ok = v.(float64) + if !ok { + return nil, fmt.Errorf("moss store,"+ + " could not parse config[mossMinMergePercentage]: %v", v) + } + } + + maxStackDirtyTopHeight := moss.DefaultCollectionOptions.MaxStackDirtyTopHeight + v, ok = config["mossMaxStackDirtyTopHeight"] + if ok { + maxStackDirtyTopHeightF, ok := v.(float64) + if !ok { + return nil, fmt.Errorf("moss store,"+ + " could not parse config[mossMaxStackDirtyTopHeight]: %v", v) + } + + maxStackDirtyTopHeight = int(maxStackDirtyTopHeightF) + } + + mossLowerLevelStoreName := "" + v, ok = config["mossLowerLevelStoreName"] + if ok { + mossLowerLevelStoreName, ok = v.(string) + if !ok { + return nil, fmt.Errorf("moss store,"+ + " could not parse config[mossLowerLevelStoreName]: %v", v) + } + } + + mossLowerLevelMaxBatchSize := uint64(0) + v, ok = config["mossLowerLevelMaxBatchSize"] + if ok { + mossLowerLevelMaxBatchSizeF, ok := v.(float64) + if !ok { + return nil, fmt.Errorf("moss store,"+ + " could not parse config[mossLowerLevelMaxBatchSize]: %v", v) + } + + mossLowerLevelMaxBatchSize = uint64(mossLowerLevelMaxBatchSizeF) + } + + // -------------------------------------------------- + + if options.MergeOperator == nil { + options.MergeOperator = mo + } + + if options.MinMergePercentage <= 0 { + options.MinMergePercentage = minMergePercentage + } + + if options.MaxStackDirtyTopHeight <= 0 { + options.MaxStackDirtyTopHeight = maxStackDirtyTopHeight + } + + if options.Debug <= 0 { + options.Debug = debug + } + + if options.Log == nil { + options.Log = func(format string, a ...interface{}) {} + } + + if options.LowerLevelInit == nil && + options.LowerLevelUpdate == nil && + mossLowerLevelStoreName != "" { + mossLowerLevelStoreConfig := map[string]interface{}{} + + v, ok := config["mossLowerLevelStoreConfig"] + if ok { + mossLowerLevelStoreConfig, ok = v.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("moss store, initLowerLevelStore,"+ + " could parse mossLowerLevelStoreConfig: %v", v) + } + } + + lowerLevelInit, lowerLevelUpdate, err := + initLowerLevelStore(mo, config, + mossLowerLevelStoreName, + mossLowerLevelStoreConfig, + mossLowerLevelMaxBatchSize, + options.Log) + if err != nil { + return nil, err + } + + options.LowerLevelInit = lowerLevelInit + options.LowerLevelUpdate = lowerLevelUpdate + } + + // -------------------------------------------------- + + ms, err := moss.NewCollection(options) + if err != nil { + return nil, err + } + err = ms.Start() + if err != nil { + return nil, err + } + rv := Store{ + ms: ms, + mo: mo, + } + return &rv, nil +} + +func (s *Store) Close() error { + return s.ms.Close() +} + +func (s *Store) Reader() (store.KVReader, error) { + ss, err := s.ms.Snapshot() + if err != nil { + return nil, err + } + return &Reader{ss: ss}, nil +} + +func (s *Store) Writer() (store.KVWriter, error) { + return &Writer{s: s}, nil +} + +func (s *Store) Logf(fmt string, args ...interface{}) { + options := s.ms.Options() + if options.Log != nil { + options.Log(fmt, args...) + } +} + +func init() { + registry.RegisterKVStore(Name, New) +} diff --git a/index/store/moss/store_test.go b/index/store/moss/store_test.go new file mode 100644 index 00000000..69350d4e --- /dev/null +++ b/index/store/moss/store_test.go @@ -0,0 +1,88 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +package moss + +import ( + "testing" + + "github.com/blevesearch/bleve/index/store" + "github.com/blevesearch/bleve/index/store/test" +) + +func open(t *testing.T, mo store.MergeOperator) store.KVStore { + rv, err := New(mo, nil) + if err != nil { + t.Fatal(err) + } + return rv +} + +func cleanup(t *testing.T, s store.KVStore) { + err := s.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestMossKVCrud(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestKVCrud(t, s) +} + +func TestMossReaderIsolation(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestReaderIsolation(t, s) +} + +func TestMossReaderOwnsGetBytes(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestReaderOwnsGetBytes(t, s) +} + +func TestMossWriterOwnsBytes(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestWriterOwnsBytes(t, s) +} + +func TestMossPrefixIterator(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestPrefixIterator(t, s) +} + +func TestMossPrefixIteratorSeek(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestPrefixIteratorSeek(t, s) +} + +func TestMossRangeIterator(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestRangeIterator(t, s) +} + +func TestMossRangeIteratorSeek(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestRangeIteratorSeek(t, s) +} + +func TestMossMerge(t *testing.T) { + s := open(t, &test.TestMergeCounter{}) + defer cleanup(t, s) + test.CommonTestMerge(t, s) +} diff --git a/index/store/moss/writer.go b/index/store/moss/writer.go new file mode 100644 index 00000000..b944c60c --- /dev/null +++ b/index/store/moss/writer.go @@ -0,0 +1,94 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +package moss + +import ( + "fmt" + + "github.com/blevesearch/bleve/index/store" + + "github.com/couchbase/moss" +) + +type Writer struct { + s *Store +} + +func (w *Writer) NewBatch() store.KVBatch { + b, err := w.s.ms.NewBatch(0, 0) + if err != nil { + return nil + } + + return &Batch{ + store: w.s, + merge: store.NewEmulatedMerge(w.s.mo), + batch: b, + } +} + +func (w *Writer) NewBatchEx(options store.KVBatchOptions) ( + []byte, store.KVBatch, error) { + numOps := options.NumSets + options.NumDeletes + options.NumMerges + + b, err := w.s.ms.NewBatch(numOps, options.TotalBytes) + if err != nil { + return nil, nil, err + } + + buf, err := b.Alloc(options.TotalBytes) + if err != nil { + return nil, nil, err + } + + return buf, &Batch{ + store: w.s, + merge: store.NewEmulatedMerge(w.s.mo), + batch: b, + buf: buf, + bufUsed: 0, + }, nil +} + +func (w *Writer) ExecuteBatch(b store.KVBatch) (err error) { + batch, ok := b.(*Batch) + if !ok { + return fmt.Errorf("wrong type of batch") + } + + for kStr, mergeOps := range batch.merge.Merges { + for _, v := range mergeOps { + if batch.buf != nil { + kLen := len(kStr) + vLen := len(v) + kBuf := batch.buf[batch.bufUsed : batch.bufUsed+kLen] + vBuf := batch.buf[batch.bufUsed+kLen : batch.bufUsed+kLen+vLen] + copy(kBuf, kStr) + copy(vBuf, v) + batch.bufUsed += kLen + vLen + err = batch.batch.AllocMerge(kBuf, vBuf) + } else { + err = batch.batch.Merge([]byte(kStr), v) + } + if err != nil { + return err + } + } + } + + return w.s.ms.ExecuteBatch(batch.batch, moss.WriteOptions{}) +} + +func (w *Writer) Close() error { + w.s = nil + return nil +} diff --git a/index/upside_down/row.go b/index/upside_down/row.go index f3f79d5a..5685e521 100644 --- a/index/upside_down/row.go +++ b/index/upside_down/row.go @@ -234,6 +234,8 @@ func NewFieldRowKV(key, value []byte) (*FieldRow, error) { // DICTIONARY +const DictionaryRowMaxValueSize = binary.MaxVarintLen64 + type DictionaryRow struct { field uint16 term []byte @@ -264,7 +266,7 @@ func (dr *DictionaryRow) Value() []byte { } func (dr *DictionaryRow) ValueSize() int { - return binary.MaxVarintLen64 + return DictionaryRowMaxValueSize } func (dr *DictionaryRow) ValueTo(buf []byte) (int, error) { diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index fbe61974..225c14c1 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -208,7 +208,7 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]Upsi mergeNum := len(dictionaryDeltas) mergeKeyBytes := 0 - mergeValBytes := mergeNum * 8 + mergeValBytes := mergeNum * DictionaryRowMaxValueSize for dictRowKey, _ := range dictionaryDeltas { mergeKeyBytes += len(dictRowKey) @@ -278,8 +278,8 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]Upsi for dictRowKey, delta := range dictionaryDeltas { dictRowKeyLen := copy(buf, dictRowKey) binary.LittleEndian.PutUint64(buf[dictRowKeyLen:], uint64(delta)) - wb.Merge(buf[:dictRowKeyLen], buf[dictRowKeyLen:dictRowKeyLen+8]) - buf = buf[dictRowKeyLen+8:] + wb.Merge(buf[:dictRowKeyLen], buf[dictRowKeyLen:dictRowKeyLen+DictionaryRowMaxValueSize]) + buf = buf[dictRowKeyLen+DictionaryRowMaxValueSize:] } // write out the batch