From eb315fa500a6975b2022c8914fff7c6e6cd10eb0 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Tue, 9 Feb 2016 17:29:58 -0800 Subject: [PATCH 1/4] integrate index/store/moss KV store --- config/config.go | 1 + index/store/moss/batch.go | 62 ++++++ index/store/moss/iterator.go | 125 +++++++++++ index/store/moss/lower.go | 375 +++++++++++++++++++++++++++++++++ index/store/moss/reader.go | 79 +++++++ index/store/moss/store.go | 183 ++++++++++++++++ index/store/moss/store_test.go | 88 ++++++++ index/store/moss/writer.go | 85 ++++++++ 8 files changed, 998 insertions(+) create mode 100644 index/store/moss/batch.go create mode 100644 index/store/moss/iterator.go create mode 100644 index/store/moss/lower.go create mode 100644 index/store/moss/reader.go create mode 100644 index/store/moss/store.go create mode 100644 index/store/moss/store_test.go create mode 100644 index/store/moss/writer.go diff --git a/config/config.go b/config/config.go index a3b87f9d..c22fcec5 100644 --- a/config/config.go +++ b/config/config.go @@ -88,6 +88,7 @@ import ( _ "github.com/blevesearch/bleve/index/store/boltdb" _ "github.com/blevesearch/bleve/index/store/goleveldb" _ "github.com/blevesearch/bleve/index/store/gtreap" + _ "github.com/blevesearch/bleve/index/store/moss" // index types _ "github.com/blevesearch/bleve/index/firestorm" diff --git a/index/store/moss/batch.go b/index/store/moss/batch.go new file mode 100644 index 00000000..f2e979a9 --- /dev/null +++ b/index/store/moss/batch.go @@ -0,0 +1,62 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +package moss + +import ( + "github.com/couchbase/moss" + + "github.com/blevesearch/bleve/index/store" +) + +type Batch struct { + store *Store + merge *store.EmulatedMerge + batch moss.Batch + alloced bool +} + +func (b *Batch) Set(key, val []byte) { + if b.alloced { + b.batch.AllocSet(key, val) + } else { + b.batch.Set(key, val) + } +} + +func (b *Batch) Delete(key []byte) { + if b.alloced { + b.batch.AllocDel(key) + } else { + b.batch.Del(key) + } +} + +func (b *Batch) Merge(key, val []byte) { + b.merge.Merge(key, val) +} + +func (b *Batch) Reset() { + b.Close() + + batch, err := b.store.ms.NewBatch(0, 0) + if err == nil { + b.batch = batch + b.merge = store.NewEmulatedMerge(b.store.mo) + } +} + +func (b *Batch) Close() error { + b.merge = nil + b.batch.Close() + b.batch = nil + return nil +} diff --git a/index/store/moss/iterator.go b/index/store/moss/iterator.go new file mode 100644 index 00000000..edba16a2 --- /dev/null +++ b/index/store/moss/iterator.go @@ -0,0 +1,125 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +package moss + +import ( + "bytes" + + "github.com/couchbase/moss" +) + +type Iterator struct { + ss moss.Snapshot + iter moss.Iterator + prefix []byte + start []byte + end []byte + done bool + k []byte + v []byte +} + +func (x *Iterator) Seek(seekToKey []byte) { + x.done = true + x.k = nil + x.v = nil + + if bytes.Compare(seekToKey, x.start) < 0 { + seekToKey = x.start + } + + iter, err := x.ss.StartIterator(seekToKey, x.end, moss.IteratorOptions{}) + if err != nil { + return + } + + x.iter.Close() + x.iter = iter + + x.checkDone() +} + +func (x *Iterator) Next() { + if x.done { + return + } + + x.done = true + x.k = nil + x.v = nil + + err := x.iter.Next() + if err != nil { + return + } + + x.checkDone() +} + +func (x *Iterator) Current() ([]byte, []byte, bool) { + return x.k, x.v, !x.done +} + +func (x *Iterator) Key() []byte { + if x.done { + return nil + } + + return x.k +} + +func (x *Iterator) Value() []byte { + if x.done { + return nil + } + + return x.v +} + +func (x *Iterator) Valid() bool { + return !x.done +} + +func (x *Iterator) Close() error { + x.ss = nil + + if x.iter != nil { + x.iter.Close() + x.iter = nil + } + + x.prefix = nil + x.done = true + x.k = nil + x.v = nil + + return nil +} + +func (x *Iterator) checkDone() { + x.done = true + x.k = nil + x.v = nil + + k, v, err := x.iter.Current() + if err != nil { + return + } + + if x.prefix != nil && !bytes.HasPrefix(k, x.prefix) { + return + } + + x.done = false + x.k = k + x.v = v +} diff --git a/index/store/moss/lower.go b/index/store/moss/lower.go new file mode 100644 index 00000000..d1de764a --- /dev/null +++ b/index/store/moss/lower.go @@ -0,0 +1,375 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +// Package moss provides a KVStore implementation based on the +// github.com/couchbaselabs/moss library. + +package moss + +import ( + "fmt" + "sync" + + "github.com/couchbase/moss" + + "github.com/blevesearch/bleve/index/store" + "github.com/blevesearch/bleve/registry" +) + +func initLowerLevelStore( + mo store.MergeOperator, + config map[string]interface{}, + lowerLevelStoreName string, + lowerLevelStoreConfig map[string]interface{}, + lowerLevelMaxBatchSize uint64, + logf func(format string, a ...interface{}), +) (moss.Snapshot, moss.LowerLevelUpdate, error) { + constructor := registry.KVStoreConstructorByName(lowerLevelStoreName) + if constructor == nil { + return nil, nil, fmt.Errorf("moss store, initLowerLevelStore,"+ + " could not find lower level store: %s", lowerLevelStoreName) + } + + if lowerLevelStoreConfig == nil { + lowerLevelStoreConfig = map[string]interface{}{} + } + + for k, v := range config { + _, exists := lowerLevelStoreConfig[k] + if !exists { + lowerLevelStoreConfig[k] = v + } + } + + kvStore, err := constructor(mo, lowerLevelStoreConfig) + if err != nil { + return nil, nil, err + } + + llStore := &llStore{ + refs: 0, + kvStore: kvStore, + logf: logf, + } + + llUpdate := func(ssHigher moss.Snapshot) (ssLower moss.Snapshot, err error) { + return llStore.update(ssHigher, lowerLevelMaxBatchSize) + } + + llSnapshot, err := llUpdate(nil) + if err != nil { + kvStore.Close() + return nil, nil, err + } + + return llSnapshot, llUpdate, nil // llStore.refs is now 1. +} + +// ------------------------------------------------ + +// llStore is a lower level store and provides ref-counting around a +// bleve store.KVStore. +type llStore struct { + kvStore store.KVStore + + logf func(format string, a ...interface{}) + + m sync.Mutex // Protects fields that follow. + refs int +} + +// llSnapshot represents a lower-level snapshot, wrapping a bleve +// store.KVReader, and implements the moss.Snapshot interface. +type llSnapshot struct { + llStore *llStore // Holds 1 refs on the llStore. + kvReader store.KVReader + + m sync.Mutex // Protects fields that follow. + refs int +} + +// llIterator represents a lower-level iterator, wrapping a bleve +// store.KVIterator, and implements the moss.Iterator interface. +type llIterator struct { + llSnapshot *llSnapshot // Holds 1 refs on the llSnapshot. + + // Some lower-level KVReader implementations need a separate + // KVReader clone, due to KVReader single-threaded'ness. + kvReader store.KVReader + + kvIterator store.KVIterator +} + +type readerSource interface { + Reader() (store.KVReader, error) +} + +// ------------------------------------------------ + +func (s *llStore) addRef() *llStore { + s.m.Lock() + s.refs += 1 + s.m.Unlock() + + return s +} + +func (s *llStore) decRef() { + s.m.Lock() + s.refs -= 1 + if s.refs <= 0 { + s.kvStore.Close() + } + s.m.Unlock() +} + +// update() mutates this lower level store with latest data from the +// given higher level moss.Snapshot and returns a new moss.Snapshot +// that the higher level can use which represents this lower level +// store. +func (s *llStore) update(ssHigher moss.Snapshot, maxBatchSize uint64) ( + ssLower moss.Snapshot, err error) { + if ssHigher != nil { + iter, err := ssHigher.StartIterator(nil, nil, moss.IteratorOptions{ + IncludeDeletions: true, + SkipLowerLevel: true, + }) + if err != nil { + return nil, err + } + + defer iter.Close() + + kvWriter, err := s.kvStore.Writer() + if err != nil { + return nil, err + } + + defer kvWriter.Close() + + batch := kvWriter.NewBatch() + + defer func() { + if batch != nil { + batch.Close() + } + }() + + var readOptions moss.ReadOptions + + i := uint64(0) + for { + if i%1000000 == 0 { + s.logf("llStore.update, i: %d", i) + } + + err = iter.Next() + if err == moss.ErrIteratorDone { + break + } + if err != nil { + return nil, err + } + + ex, key, val, err := iter.CurrentEx() + if err == moss.ErrIteratorDone { + break + } + if err != nil { + return nil, err + } + + switch ex.Operation { + case moss.OperationSet: + batch.Set(key, val) + + case moss.OperationDel: + batch.Delete(key) + + case moss.OperationMerge: + val, err = ssHigher.Get(key, readOptions) + if err != nil { + return nil, err + } + + if val != nil { + batch.Set(key, val) + } else { + batch.Delete(key) + } + + default: + return nil, fmt.Errorf("moss store, update,"+ + " unexpected operation, ex: %v", ex) + } + + if maxBatchSize > 0 && i%maxBatchSize == 0 { + err = kvWriter.ExecuteBatch(batch) + if err != nil { + return nil, err + } + + batch.Close() + batch = nil + + batch = kvWriter.NewBatch() + } + + i++ + } + + if i > 0 { + s.logf("llStore.update, total: %d, ExecuteBatch: ...", i) + + err = kvWriter.ExecuteBatch(batch) + if err != nil { + return nil, err + } + + s.logf("llStore.update, total: %d, ExecuteBatch: done", i) + } + } + + kvReader, err := s.kvStore.Reader() + if err != nil { + return nil, err + } + + s.logf("llStore.update, new reader") + + return &llSnapshot{ + llStore: s.addRef(), + kvReader: kvReader, + refs: 1, + }, nil +} + +// ------------------------------------------------ + +func (llss *llSnapshot) addRef() *llSnapshot { + llss.m.Lock() + llss.refs += 1 + llss.m.Unlock() + + return llss +} + +func (llss *llSnapshot) decRef() { + llss.m.Lock() + llss.refs -= 1 + if llss.refs <= 0 { + if llss.kvReader != nil { + llss.kvReader.Close() + llss.kvReader = nil + } + + if llss.llStore != nil { + llss.llStore.decRef() + llss.llStore = nil + } + } + llss.m.Unlock() +} + +func (llss *llSnapshot) Close() error { + llss.decRef() + + return nil +} + +func (llss *llSnapshot) Get(key []byte, + readOptions moss.ReadOptions) ([]byte, error) { + rs, ok := llss.kvReader.(readerSource) + if ok { + r2, err := rs.Reader() + if err != nil { + return nil, err + } + + val, err := r2.Get(key) + + r2.Close() + + return val, err + } + + return llss.kvReader.Get(key) +} + +func (llss *llSnapshot) StartIterator( + startKeyInclusive, endKeyExclusive []byte, + iteratorOptions moss.IteratorOptions) (moss.Iterator, error) { + rs, ok := llss.kvReader.(readerSource) + if ok { + r2, err := rs.Reader() + if err != nil { + return nil, err + } + + i2 := r2.RangeIterator(startKeyInclusive, endKeyExclusive) + + return &llIterator{llSnapshot: llss.addRef(), kvReader: r2, kvIterator: i2}, nil + } + + i := llss.kvReader.RangeIterator(startKeyInclusive, endKeyExclusive) + + return &llIterator{llSnapshot: llss.addRef(), kvReader: nil, kvIterator: i}, nil +} + +// ------------------------------------------------ + +func (lli *llIterator) Close() error { + var err0 error + if lli.kvIterator != nil { + err0 = lli.kvIterator.Close() + lli.kvIterator = nil + } + + var err1 error + if lli.kvReader != nil { + err1 = lli.kvReader.Close() + lli.kvReader = nil + } + + lli.llSnapshot.decRef() + lli.llSnapshot = nil + + if err0 != nil { + return err0 + } + + if err1 != nil { + return err1 + } + + return nil +} + +func (lli *llIterator) Next() error { + lli.kvIterator.Next() + + return nil +} + +func (lli *llIterator) Current() (key, val []byte, err error) { + key, val, ok := lli.kvIterator.Current() + if !ok { + return nil, nil, moss.ErrIteratorDone + } + + return key, val, nil +} + +func (lli *llIterator) CurrentEx() ( + entryEx moss.EntryEx, key, val []byte, err error) { + return moss.EntryEx{}, nil, nil, moss.ErrUnimplemented + +} diff --git a/index/store/moss/reader.go b/index/store/moss/reader.go new file mode 100644 index 00000000..1da24830 --- /dev/null +++ b/index/store/moss/reader.go @@ -0,0 +1,79 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +package moss + +import ( + "github.com/couchbase/moss" + + "github.com/blevesearch/bleve/index/store" +) + +type Reader struct { + ss moss.Snapshot +} + +func (r *Reader) Get(k []byte) (v []byte, err error) { + v, err = r.ss.Get(k, moss.ReadOptions{}) + if err != nil { + return nil, err + } + if v != nil { + return append([]byte(nil), v...), nil + } + return nil, nil +} + +func (r *Reader) MultiGet(keys [][]byte) ([][]byte, error) { + return store.MultiGet(r, keys) +} + +func (r *Reader) PrefixIterator(k []byte) store.KVIterator { + iter, err := r.ss.StartIterator(k, nil, moss.IteratorOptions{}) + if err != nil { + return nil + } + + rv := &Iterator{ + ss: r.ss, + iter: iter, + prefix: k, + start: k, + end: nil, + } + + rv.checkDone() + + return rv +} + +func (r *Reader) RangeIterator(start, end []byte) store.KVIterator { + iter, err := r.ss.StartIterator(start, end, moss.IteratorOptions{}) + if err != nil { + return nil + } + + rv := &Iterator{ + ss: r.ss, + iter: iter, + prefix: nil, + start: start, + end: end, + } + + rv.checkDone() + + return rv +} + +func (r *Reader) Close() error { + return r.ss.Close() +} diff --git a/index/store/moss/store.go b/index/store/moss/store.go new file mode 100644 index 00000000..0c1de977 --- /dev/null +++ b/index/store/moss/store.go @@ -0,0 +1,183 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +// Package moss provides a KVStore implementation based on the +// github.com/couchbaselabs/moss library. + +package moss + +import ( + "fmt" + "sync" + + "github.com/couchbase/moss" + + "github.com/blevesearch/bleve/index/store" + "github.com/blevesearch/bleve/registry" +) + +const Name = "moss" + +type Store struct { + m sync.Mutex + ms moss.Collection + mo store.MergeOperator +} + +func New(mo store.MergeOperator, config map[string]interface{}) ( + store.KVStore, error) { + return NewEx(mo, config, moss.CollectionOptions{}) +} + +func NewEx(mo store.MergeOperator, config map[string]interface{}, + options moss.CollectionOptions) (store.KVStore, error) { + debug := moss.DefaultCollectionOptions.Debug + v, ok := config["mossDebug"] + if ok { + debugF, ok := v.(float64) + if !ok { + return nil, fmt.Errorf("moss store,"+ + " could not parse config[mossDebug]: %v", v) + } + + debug = int(debugF) + } + + minMergePercentage := moss.DefaultCollectionOptions.MinMergePercentage + v, ok = config["mossMinMergePercentage"] + if ok { + minMergePercentage, ok = v.(float64) + if !ok { + return nil, fmt.Errorf("moss store,"+ + " could not parse config[mossMinMergePercentage]: %v", v) + } + } + + maxStackDirtyTopHeight := moss.DefaultCollectionOptions.MaxStackDirtyTopHeight + v, ok = config["mossMaxStackDirtyTopHeight"] + if ok { + maxStackDirtyTopHeightF, ok := v.(float64) + if !ok { + return nil, fmt.Errorf("moss store,"+ + " could not parse config[mossMaxStackDirtyTopHeight]: %v", v) + } + + maxStackDirtyTopHeight = int(maxStackDirtyTopHeightF) + } + + mossLowerLevelStoreName := "" + v, ok = config["mossLowerLevelStoreName"] + if ok { + mossLowerLevelStoreName, ok = v.(string) + if !ok { + return nil, fmt.Errorf("moss store,"+ + " could not parse config[mossLowerLevelStoreName]: %v", v) + } + } + + mossLowerLevelMaxBatchSize := uint64(0) + v, ok = config["mossLowerLevelMaxBatchSize"] + if ok { + mossLowerLevelMaxBatchSizeF, ok := v.(float64) + if !ok { + return nil, fmt.Errorf("moss store,"+ + " could not parse config[mossLowerLevelMaxBatchSize]: %v", v) + } + + mossLowerLevelMaxBatchSize = uint64(mossLowerLevelMaxBatchSizeF) + } + + // -------------------------------------------------- + + if options.MergeOperator == nil { + options.MergeOperator = mo + } + + if options.MinMergePercentage <= 0 { + options.MinMergePercentage = minMergePercentage + } + + if options.MaxStackDirtyTopHeight <= 0 { + options.MaxStackDirtyTopHeight = maxStackDirtyTopHeight + } + + if options.Debug <= 0 { + options.Debug = debug + } + + if options.Log == nil { + options.Log = func(format string, a ...interface{}) {} + } + + if options.LowerLevelInit == nil && + options.LowerLevelUpdate == nil && + mossLowerLevelStoreName != "" { + mossLowerLevelStoreConfig := map[string]interface{}{} + + v, ok := config["mossLowerLevelStoreConfig"] + if ok { + mossLowerLevelStoreConfig, ok = v.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("moss store, initLowerLevelStore,"+ + " could parse mossLowerLevelStoreConfig: %v", v) + } + } + + lowerLevelInit, lowerLevelUpdate, err := + initLowerLevelStore(mo, config, + mossLowerLevelStoreName, + mossLowerLevelStoreConfig, + mossLowerLevelMaxBatchSize, + options.Log) + if err != nil { + return nil, err + } + + options.LowerLevelInit = lowerLevelInit + options.LowerLevelUpdate = lowerLevelUpdate + } + + // -------------------------------------------------- + + ms, err := moss.NewCollection(options) + if err != nil { + return nil, err + } + err = ms.Start() + if err != nil { + return nil, err + } + rv := Store{ + ms: ms, + mo: mo, + } + return &rv, nil +} + +func (s *Store) Close() error { + return s.ms.Close() +} + +func (s *Store) Reader() (store.KVReader, error) { + ss, err := s.ms.Snapshot() + if err != nil { + return nil, err + } + return &Reader{ss: ss}, nil +} + +func (s *Store) Writer() (store.KVWriter, error) { + return &Writer{s: s}, nil +} + +func init() { + registry.RegisterKVStore(Name, New) +} diff --git a/index/store/moss/store_test.go b/index/store/moss/store_test.go new file mode 100644 index 00000000..69350d4e --- /dev/null +++ b/index/store/moss/store_test.go @@ -0,0 +1,88 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +package moss + +import ( + "testing" + + "github.com/blevesearch/bleve/index/store" + "github.com/blevesearch/bleve/index/store/test" +) + +func open(t *testing.T, mo store.MergeOperator) store.KVStore { + rv, err := New(mo, nil) + if err != nil { + t.Fatal(err) + } + return rv +} + +func cleanup(t *testing.T, s store.KVStore) { + err := s.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestMossKVCrud(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestKVCrud(t, s) +} + +func TestMossReaderIsolation(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestReaderIsolation(t, s) +} + +func TestMossReaderOwnsGetBytes(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestReaderOwnsGetBytes(t, s) +} + +func TestMossWriterOwnsBytes(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestWriterOwnsBytes(t, s) +} + +func TestMossPrefixIterator(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestPrefixIterator(t, s) +} + +func TestMossPrefixIteratorSeek(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestPrefixIteratorSeek(t, s) +} + +func TestMossRangeIterator(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestRangeIterator(t, s) +} + +func TestMossRangeIteratorSeek(t *testing.T) { + s := open(t, nil) + defer cleanup(t, s) + test.CommonTestRangeIteratorSeek(t, s) +} + +func TestMossMerge(t *testing.T) { + s := open(t, &test.TestMergeCounter{}) + defer cleanup(t, s) + test.CommonTestMerge(t, s) +} diff --git a/index/store/moss/writer.go b/index/store/moss/writer.go new file mode 100644 index 00000000..f8c26089 --- /dev/null +++ b/index/store/moss/writer.go @@ -0,0 +1,85 @@ +// Copyright (c) 2016 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +package moss + +import ( + "fmt" + + "github.com/blevesearch/bleve/index/store" + + "github.com/couchbase/moss" +) + +type Writer struct { + s *Store +} + +func (w *Writer) NewBatch() store.KVBatch { + b, err := w.s.ms.NewBatch(0, 0) + if err != nil { + return nil + } + + return &Batch{ + store: w.s, + merge: store.NewEmulatedMerge(w.s.mo), + batch: b, + alloced: false, + } +} + +func (w *Writer) NewBatchEx(options store.KVBatchOptions) ( + []byte, store.KVBatch, error) { + numOps := options.NumSets + options.NumDeletes + options.NumMerges + + b, err := w.s.ms.NewBatch(numOps, options.TotalBytes) + if err != nil { + return nil, nil, err + } + + buf, err := b.Alloc(options.TotalBytes) + if err != nil { + return nil, nil, err + } + + return buf, &Batch{ + store: w.s, + merge: store.NewEmulatedMerge(w.s.mo), + batch: b, + alloced: true, + }, nil +} + +func (w *Writer) ExecuteBatch(b store.KVBatch) error { + batch, ok := b.(*Batch) + if !ok { + return fmt.Errorf("wrong type of batch") + } + + for kStr, mergeOps := range batch.merge.Merges { + k := []byte(kStr) + + for _, v := range mergeOps { + err := batch.batch.Merge(k, v) + if err != nil { + return err + } + } + } + + return w.s.ms.ExecuteBatch(batch.batch, moss.WriteOptions{}) +} + +func (w *Writer) Close() error { + w.s = nil + return nil +} From ea1a52464d3e61e40ce0ef115b19115e1ef70bd5 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 17 Feb 2016 13:03:54 -0800 Subject: [PATCH 2/4] more index/store/moss err handling --- index/store/moss/batch.go | 28 ++++++++++++++++++------- index/store/moss/iterator.go | 15 +++++++++++--- index/store/moss/lower.go | 40 ++++++++++++++++++++++++++++-------- index/store/moss/reader.go | 5 ++++- index/store/moss/store.go | 7 +++++++ 5 files changed, 75 insertions(+), 20 deletions(-) diff --git a/index/store/moss/batch.go b/index/store/moss/batch.go index f2e979a9..2ee85d0d 100644 --- a/index/store/moss/batch.go +++ b/index/store/moss/batch.go @@ -25,18 +25,28 @@ type Batch struct { } func (b *Batch) Set(key, val []byte) { + var err error if b.alloced { - b.batch.AllocSet(key, val) + err = b.batch.AllocSet(key, val) } else { - b.batch.Set(key, val) + err = b.batch.Set(key, val) + } + + if err != nil { + b.store.Logf("bleve moss batch.Set err: %v", err) } } func (b *Batch) Delete(key []byte) { + var err error if b.alloced { - b.batch.AllocDel(key) + err = b.batch.AllocDel(key) } else { - b.batch.Del(key) + err = b.batch.Del(key) + } + + if err != nil { + b.store.Logf("bleve moss batch.Delete err: %v", err) } } @@ -45,7 +55,11 @@ func (b *Batch) Merge(key, val []byte) { } func (b *Batch) Reset() { - b.Close() + err := b.Close() + if err != nil { + b.store.Logf("bleve moss batch.Close err: %v", err) + return + } batch, err := b.store.ms.NewBatch(0, 0) if err == nil { @@ -56,7 +70,7 @@ func (b *Batch) Reset() { func (b *Batch) Close() error { b.merge = nil - b.batch.Close() + err := b.batch.Close() b.batch = nil - return nil + return err } diff --git a/index/store/moss/iterator.go b/index/store/moss/iterator.go index edba16a2..cf616fe4 100644 --- a/index/store/moss/iterator.go +++ b/index/store/moss/iterator.go @@ -18,6 +18,7 @@ import ( ) type Iterator struct { + store *Store ss moss.Snapshot iter moss.Iterator prefix []byte @@ -39,10 +40,16 @@ func (x *Iterator) Seek(seekToKey []byte) { iter, err := x.ss.StartIterator(seekToKey, x.end, moss.IteratorOptions{}) if err != nil { + x.store.Logf("bleve moss StartIterator err: %v", err) + return + } + + err = x.iter.Close() + if err != nil { + x.store.Logf("bleve moss iterator.Seek err: %v", err) return } - x.iter.Close() x.iter = iter x.checkDone() @@ -90,10 +97,12 @@ func (x *Iterator) Valid() bool { } func (x *Iterator) Close() error { + var err error + x.ss = nil if x.iter != nil { - x.iter.Close() + err = x.iter.Close() x.iter = nil } @@ -102,7 +111,7 @@ func (x *Iterator) Close() error { x.k = nil x.v = nil - return nil + return err } func (x *Iterator) checkDone() { diff --git a/index/store/moss/lower.go b/index/store/moss/lower.go index d1de764a..52049f36 100644 --- a/index/store/moss/lower.go +++ b/index/store/moss/lower.go @@ -66,7 +66,7 @@ func initLowerLevelStore( llSnapshot, err := llUpdate(nil) if err != nil { - kvStore.Close() + _ = kvStore.Close() return nil, nil, err } @@ -126,7 +126,10 @@ func (s *llStore) decRef() { s.m.Lock() s.refs -= 1 if s.refs <= 0 { - s.kvStore.Close() + err := s.kvStore.Close() + if err != nil { + s.logf("llStore kvStore.Close err: %v", err) + } } s.m.Unlock() } @@ -146,20 +149,33 @@ func (s *llStore) update(ssHigher moss.Snapshot, maxBatchSize uint64) ( return nil, err } - defer iter.Close() + defer func() { + err = iter.Close() + if err != nil { + s.logf("llStore iter.Close err: %v", err) + } + }() kvWriter, err := s.kvStore.Writer() if err != nil { return nil, err } - defer kvWriter.Close() + defer func() { + err = kvWriter.Close() + if err != nil { + s.logf("llStore kvWriter.Close err: %v", err) + } + }() batch := kvWriter.NewBatch() defer func() { if batch != nil { - batch.Close() + err = batch.Close() + if err != nil { + s.logf("llStore batch.Close err: %v", err) + } } }() @@ -217,8 +233,10 @@ func (s *llStore) update(ssHigher moss.Snapshot, maxBatchSize uint64) ( return nil, err } - batch.Close() - batch = nil + err = batch.Close() + if err != nil { + return nil, err + } batch = kvWriter.NewBatch() } @@ -267,7 +285,11 @@ func (llss *llSnapshot) decRef() { llss.refs -= 1 if llss.refs <= 0 { if llss.kvReader != nil { - llss.kvReader.Close() + err := llss.kvReader.Close() + if err != nil { + llss.llStore.logf("llSnapshot kvReader.Close err: %v", err) + } + llss.kvReader = nil } @@ -296,7 +318,7 @@ func (llss *llSnapshot) Get(key []byte, val, err := r2.Get(key) - r2.Close() + _ = r2.Close() return val, err } diff --git a/index/store/moss/reader.go b/index/store/moss/reader.go index 1da24830..ef7dcc02 100644 --- a/index/store/moss/reader.go +++ b/index/store/moss/reader.go @@ -18,7 +18,8 @@ import ( ) type Reader struct { - ss moss.Snapshot + store *Store + ss moss.Snapshot } func (r *Reader) Get(k []byte) (v []byte, err error) { @@ -43,6 +44,7 @@ func (r *Reader) PrefixIterator(k []byte) store.KVIterator { } rv := &Iterator{ + store: r.store, ss: r.ss, iter: iter, prefix: k, @@ -62,6 +64,7 @@ func (r *Reader) RangeIterator(start, end []byte) store.KVIterator { } rv := &Iterator{ + store: r.store, ss: r.ss, iter: iter, prefix: nil, diff --git a/index/store/moss/store.go b/index/store/moss/store.go index 0c1de977..a3ce0d9a 100644 --- a/index/store/moss/store.go +++ b/index/store/moss/store.go @@ -178,6 +178,13 @@ func (s *Store) Writer() (store.KVWriter, error) { return &Writer{s: s}, nil } +func (s *Store) Logf(fmt string, args ...interface{}) { + options := s.ms.Options() + if options.Log != nil { + options.Log(fmt, args...) + } +} + func init() { registry.RegisterKVStore(Name, New) } From dd1718fa78991f672c3638ea48a1a2fa8fdb0bf1 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Mon, 22 Feb 2016 11:33:27 -0800 Subject: [PATCH 3/4] index/store/moss uses AllocMerge() instead of Merge() Performance optimization. Before this change, by using Merge() instead of AllocMerge(), moss's internal batch buf's would be wastefully, dramatically grown during append()'s to a mis-sized buf. --- index/store/moss/batch.go | 11 ++++++++--- index/store/moss/writer.go | 27 ++++++++++++++++++--------- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/index/store/moss/batch.go b/index/store/moss/batch.go index 2ee85d0d..1263b8b0 100644 --- a/index/store/moss/batch.go +++ b/index/store/moss/batch.go @@ -21,12 +21,14 @@ type Batch struct { store *Store merge *store.EmulatedMerge batch moss.Batch - alloced bool + buf []byte // Non-nil when using pre-alloc'ed / NewBatchEx(). + bufUsed int } func (b *Batch) Set(key, val []byte) { var err error - if b.alloced { + if b.buf != nil { + b.bufUsed += len(key) + len(val) err = b.batch.AllocSet(key, val) } else { err = b.batch.Set(key, val) @@ -39,7 +41,8 @@ func (b *Batch) Set(key, val []byte) { func (b *Batch) Delete(key []byte) { var err error - if b.alloced { + if b.buf != nil { + b.bufUsed += len(key) err = b.batch.AllocDel(key) } else { err = b.batch.Del(key) @@ -65,6 +68,8 @@ func (b *Batch) Reset() { if err == nil { b.batch = batch b.merge = store.NewEmulatedMerge(b.store.mo) + b.buf = nil + b.bufUsed = 0 } } diff --git a/index/store/moss/writer.go b/index/store/moss/writer.go index f8c26089..b944c60c 100644 --- a/index/store/moss/writer.go +++ b/index/store/moss/writer.go @@ -30,10 +30,9 @@ func (w *Writer) NewBatch() store.KVBatch { } return &Batch{ - store: w.s, - merge: store.NewEmulatedMerge(w.s.mo), - batch: b, - alloced: false, + store: w.s, + merge: store.NewEmulatedMerge(w.s.mo), + batch: b, } } @@ -55,21 +54,31 @@ func (w *Writer) NewBatchEx(options store.KVBatchOptions) ( store: w.s, merge: store.NewEmulatedMerge(w.s.mo), batch: b, - alloced: true, + buf: buf, + bufUsed: 0, }, nil } -func (w *Writer) ExecuteBatch(b store.KVBatch) error { +func (w *Writer) ExecuteBatch(b store.KVBatch) (err error) { batch, ok := b.(*Batch) if !ok { return fmt.Errorf("wrong type of batch") } for kStr, mergeOps := range batch.merge.Merges { - k := []byte(kStr) - for _, v := range mergeOps { - err := batch.batch.Merge(k, v) + if batch.buf != nil { + kLen := len(kStr) + vLen := len(v) + kBuf := batch.buf[batch.bufUsed : batch.bufUsed+kLen] + vBuf := batch.buf[batch.bufUsed+kLen : batch.bufUsed+kLen+vLen] + copy(kBuf, kStr) + copy(vBuf, v) + batch.bufUsed += kLen + vLen + err = batch.batch.AllocMerge(kBuf, vBuf) + } else { + err = batch.batch.Merge([]byte(kStr), v) + } if err != nil { return err } From a29dd25a48069cf9dbedab18ba41eb7b73deb94e Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Mon, 22 Feb 2016 11:42:24 -0800 Subject: [PATCH 4/4] upside_down dict row value size accounts for large uvarint's This is somewhat unlikely, but if a term is (incredibly) popular, its uvarint count value representation might go beyond 8 bytes. Some KVStore implementations (like forestdb) provide a BatchEx cgo optimization that depends on proper preallocated counting, so this change provides a proper worst-case estimate based on the max-unvarint of 10 bytes instead of the previously incorrect 8 bytes. --- index/upside_down/row.go | 4 +++- index/upside_down/upside_down.go | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/index/upside_down/row.go b/index/upside_down/row.go index f3f79d5a..5685e521 100644 --- a/index/upside_down/row.go +++ b/index/upside_down/row.go @@ -234,6 +234,8 @@ func NewFieldRowKV(key, value []byte) (*FieldRow, error) { // DICTIONARY +const DictionaryRowMaxValueSize = binary.MaxVarintLen64 + type DictionaryRow struct { field uint16 term []byte @@ -264,7 +266,7 @@ func (dr *DictionaryRow) Value() []byte { } func (dr *DictionaryRow) ValueSize() int { - return binary.MaxVarintLen64 + return DictionaryRowMaxValueSize } func (dr *DictionaryRow) ValueTo(buf []byte) (int, error) { diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index fbe61974..225c14c1 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -208,7 +208,7 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]Upsi mergeNum := len(dictionaryDeltas) mergeKeyBytes := 0 - mergeValBytes := mergeNum * 8 + mergeValBytes := mergeNum * DictionaryRowMaxValueSize for dictRowKey, _ := range dictionaryDeltas { mergeKeyBytes += len(dictRowKey) @@ -278,8 +278,8 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]Upsi for dictRowKey, delta := range dictionaryDeltas { dictRowKeyLen := copy(buf, dictRowKey) binary.LittleEndian.PutUint64(buf[dictRowKeyLen:], uint64(delta)) - wb.Merge(buf[:dictRowKeyLen], buf[dictRowKeyLen:dictRowKeyLen+8]) - buf = buf[dictRowKeyLen+8:] + wb.Merge(buf[:dictRowKeyLen], buf[dictRowKeyLen:dictRowKeyLen+DictionaryRowMaxValueSize]) + buf = buf[dictRowKeyLen+DictionaryRowMaxValueSize:] } // write out the batch