0
0
Fork 0

integrate index/store/moss KV store

This commit is contained in:
Steve Yen 2016-02-09 17:29:58 -08:00
parent 74a52f94bb
commit eb315fa500
8 changed files with 998 additions and 0 deletions

View File

@ -88,6 +88,7 @@ import (
_ "github.com/blevesearch/bleve/index/store/boltdb"
_ "github.com/blevesearch/bleve/index/store/goleveldb"
_ "github.com/blevesearch/bleve/index/store/gtreap"
_ "github.com/blevesearch/bleve/index/store/moss"
// index types
_ "github.com/blevesearch/bleve/index/firestorm"

62
index/store/moss/batch.go Normal file
View File

@ -0,0 +1,62 @@
// Copyright (c) 2016 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
package moss
import (
"github.com/couchbase/moss"
"github.com/blevesearch/bleve/index/store"
)
type Batch struct {
store *Store
merge *store.EmulatedMerge
batch moss.Batch
alloced bool
}
func (b *Batch) Set(key, val []byte) {
if b.alloced {
b.batch.AllocSet(key, val)
} else {
b.batch.Set(key, val)
}
}
func (b *Batch) Delete(key []byte) {
if b.alloced {
b.batch.AllocDel(key)
} else {
b.batch.Del(key)
}
}
func (b *Batch) Merge(key, val []byte) {
b.merge.Merge(key, val)
}
func (b *Batch) Reset() {
b.Close()
batch, err := b.store.ms.NewBatch(0, 0)
if err == nil {
b.batch = batch
b.merge = store.NewEmulatedMerge(b.store.mo)
}
}
func (b *Batch) Close() error {
b.merge = nil
b.batch.Close()
b.batch = nil
return nil
}

View File

@ -0,0 +1,125 @@
// Copyright (c) 2016 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
package moss
import (
"bytes"
"github.com/couchbase/moss"
)
type Iterator struct {
ss moss.Snapshot
iter moss.Iterator
prefix []byte
start []byte
end []byte
done bool
k []byte
v []byte
}
func (x *Iterator) Seek(seekToKey []byte) {
x.done = true
x.k = nil
x.v = nil
if bytes.Compare(seekToKey, x.start) < 0 {
seekToKey = x.start
}
iter, err := x.ss.StartIterator(seekToKey, x.end, moss.IteratorOptions{})
if err != nil {
return
}
x.iter.Close()
x.iter = iter
x.checkDone()
}
func (x *Iterator) Next() {
if x.done {
return
}
x.done = true
x.k = nil
x.v = nil
err := x.iter.Next()
if err != nil {
return
}
x.checkDone()
}
func (x *Iterator) Current() ([]byte, []byte, bool) {
return x.k, x.v, !x.done
}
func (x *Iterator) Key() []byte {
if x.done {
return nil
}
return x.k
}
func (x *Iterator) Value() []byte {
if x.done {
return nil
}
return x.v
}
func (x *Iterator) Valid() bool {
return !x.done
}
func (x *Iterator) Close() error {
x.ss = nil
if x.iter != nil {
x.iter.Close()
x.iter = nil
}
x.prefix = nil
x.done = true
x.k = nil
x.v = nil
return nil
}
func (x *Iterator) checkDone() {
x.done = true
x.k = nil
x.v = nil
k, v, err := x.iter.Current()
if err != nil {
return
}
if x.prefix != nil && !bytes.HasPrefix(k, x.prefix) {
return
}
x.done = false
x.k = k
x.v = v
}

375
index/store/moss/lower.go Normal file
View File

@ -0,0 +1,375 @@
// Copyright (c) 2016 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
// Package moss provides a KVStore implementation based on the
// github.com/couchbaselabs/moss library.
package moss
import (
"fmt"
"sync"
"github.com/couchbase/moss"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/registry"
)
func initLowerLevelStore(
mo store.MergeOperator,
config map[string]interface{},
lowerLevelStoreName string,
lowerLevelStoreConfig map[string]interface{},
lowerLevelMaxBatchSize uint64,
logf func(format string, a ...interface{}),
) (moss.Snapshot, moss.LowerLevelUpdate, error) {
constructor := registry.KVStoreConstructorByName(lowerLevelStoreName)
if constructor == nil {
return nil, nil, fmt.Errorf("moss store, initLowerLevelStore,"+
" could not find lower level store: %s", lowerLevelStoreName)
}
if lowerLevelStoreConfig == nil {
lowerLevelStoreConfig = map[string]interface{}{}
}
for k, v := range config {
_, exists := lowerLevelStoreConfig[k]
if !exists {
lowerLevelStoreConfig[k] = v
}
}
kvStore, err := constructor(mo, lowerLevelStoreConfig)
if err != nil {
return nil, nil, err
}
llStore := &llStore{
refs: 0,
kvStore: kvStore,
logf: logf,
}
llUpdate := func(ssHigher moss.Snapshot) (ssLower moss.Snapshot, err error) {
return llStore.update(ssHigher, lowerLevelMaxBatchSize)
}
llSnapshot, err := llUpdate(nil)
if err != nil {
kvStore.Close()
return nil, nil, err
}
return llSnapshot, llUpdate, nil // llStore.refs is now 1.
}
// ------------------------------------------------
// llStore is a lower level store and provides ref-counting around a
// bleve store.KVStore.
type llStore struct {
kvStore store.KVStore
logf func(format string, a ...interface{})
m sync.Mutex // Protects fields that follow.
refs int
}
// llSnapshot represents a lower-level snapshot, wrapping a bleve
// store.KVReader, and implements the moss.Snapshot interface.
type llSnapshot struct {
llStore *llStore // Holds 1 refs on the llStore.
kvReader store.KVReader
m sync.Mutex // Protects fields that follow.
refs int
}
// llIterator represents a lower-level iterator, wrapping a bleve
// store.KVIterator, and implements the moss.Iterator interface.
type llIterator struct {
llSnapshot *llSnapshot // Holds 1 refs on the llSnapshot.
// Some lower-level KVReader implementations need a separate
// KVReader clone, due to KVReader single-threaded'ness.
kvReader store.KVReader
kvIterator store.KVIterator
}
type readerSource interface {
Reader() (store.KVReader, error)
}
// ------------------------------------------------
func (s *llStore) addRef() *llStore {
s.m.Lock()
s.refs += 1
s.m.Unlock()
return s
}
func (s *llStore) decRef() {
s.m.Lock()
s.refs -= 1
if s.refs <= 0 {
s.kvStore.Close()
}
s.m.Unlock()
}
// update() mutates this lower level store with latest data from the
// given higher level moss.Snapshot and returns a new moss.Snapshot
// that the higher level can use which represents this lower level
// store.
func (s *llStore) update(ssHigher moss.Snapshot, maxBatchSize uint64) (
ssLower moss.Snapshot, err error) {
if ssHigher != nil {
iter, err := ssHigher.StartIterator(nil, nil, moss.IteratorOptions{
IncludeDeletions: true,
SkipLowerLevel: true,
})
if err != nil {
return nil, err
}
defer iter.Close()
kvWriter, err := s.kvStore.Writer()
if err != nil {
return nil, err
}
defer kvWriter.Close()
batch := kvWriter.NewBatch()
defer func() {
if batch != nil {
batch.Close()
}
}()
var readOptions moss.ReadOptions
i := uint64(0)
for {
if i%1000000 == 0 {
s.logf("llStore.update, i: %d", i)
}
err = iter.Next()
if err == moss.ErrIteratorDone {
break
}
if err != nil {
return nil, err
}
ex, key, val, err := iter.CurrentEx()
if err == moss.ErrIteratorDone {
break
}
if err != nil {
return nil, err
}
switch ex.Operation {
case moss.OperationSet:
batch.Set(key, val)
case moss.OperationDel:
batch.Delete(key)
case moss.OperationMerge:
val, err = ssHigher.Get(key, readOptions)
if err != nil {
return nil, err
}
if val != nil {
batch.Set(key, val)
} else {
batch.Delete(key)
}
default:
return nil, fmt.Errorf("moss store, update,"+
" unexpected operation, ex: %v", ex)
}
if maxBatchSize > 0 && i%maxBatchSize == 0 {
err = kvWriter.ExecuteBatch(batch)
if err != nil {
return nil, err
}
batch.Close()
batch = nil
batch = kvWriter.NewBatch()
}
i++
}
if i > 0 {
s.logf("llStore.update, total: %d, ExecuteBatch: ...", i)
err = kvWriter.ExecuteBatch(batch)
if err != nil {
return nil, err
}
s.logf("llStore.update, total: %d, ExecuteBatch: done", i)
}
}
kvReader, err := s.kvStore.Reader()
if err != nil {
return nil, err
}
s.logf("llStore.update, new reader")
return &llSnapshot{
llStore: s.addRef(),
kvReader: kvReader,
refs: 1,
}, nil
}
// ------------------------------------------------
func (llss *llSnapshot) addRef() *llSnapshot {
llss.m.Lock()
llss.refs += 1
llss.m.Unlock()
return llss
}
func (llss *llSnapshot) decRef() {
llss.m.Lock()
llss.refs -= 1
if llss.refs <= 0 {
if llss.kvReader != nil {
llss.kvReader.Close()
llss.kvReader = nil
}
if llss.llStore != nil {
llss.llStore.decRef()
llss.llStore = nil
}
}
llss.m.Unlock()
}
func (llss *llSnapshot) Close() error {
llss.decRef()
return nil
}
func (llss *llSnapshot) Get(key []byte,
readOptions moss.ReadOptions) ([]byte, error) {
rs, ok := llss.kvReader.(readerSource)
if ok {
r2, err := rs.Reader()
if err != nil {
return nil, err
}
val, err := r2.Get(key)
r2.Close()
return val, err
}
return llss.kvReader.Get(key)
}
func (llss *llSnapshot) StartIterator(
startKeyInclusive, endKeyExclusive []byte,
iteratorOptions moss.IteratorOptions) (moss.Iterator, error) {
rs, ok := llss.kvReader.(readerSource)
if ok {
r2, err := rs.Reader()
if err != nil {
return nil, err
}
i2 := r2.RangeIterator(startKeyInclusive, endKeyExclusive)
return &llIterator{llSnapshot: llss.addRef(), kvReader: r2, kvIterator: i2}, nil
}
i := llss.kvReader.RangeIterator(startKeyInclusive, endKeyExclusive)
return &llIterator{llSnapshot: llss.addRef(), kvReader: nil, kvIterator: i}, nil
}
// ------------------------------------------------
func (lli *llIterator) Close() error {
var err0 error
if lli.kvIterator != nil {
err0 = lli.kvIterator.Close()
lli.kvIterator = nil
}
var err1 error
if lli.kvReader != nil {
err1 = lli.kvReader.Close()
lli.kvReader = nil
}
lli.llSnapshot.decRef()
lli.llSnapshot = nil
if err0 != nil {
return err0
}
if err1 != nil {
return err1
}
return nil
}
func (lli *llIterator) Next() error {
lli.kvIterator.Next()
return nil
}
func (lli *llIterator) Current() (key, val []byte, err error) {
key, val, ok := lli.kvIterator.Current()
if !ok {
return nil, nil, moss.ErrIteratorDone
}
return key, val, nil
}
func (lli *llIterator) CurrentEx() (
entryEx moss.EntryEx, key, val []byte, err error) {
return moss.EntryEx{}, nil, nil, moss.ErrUnimplemented
}

View File

@ -0,0 +1,79 @@
// Copyright (c) 2016 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
package moss
import (
"github.com/couchbase/moss"
"github.com/blevesearch/bleve/index/store"
)
type Reader struct {
ss moss.Snapshot
}
func (r *Reader) Get(k []byte) (v []byte, err error) {
v, err = r.ss.Get(k, moss.ReadOptions{})
if err != nil {
return nil, err
}
if v != nil {
return append([]byte(nil), v...), nil
}
return nil, nil
}
func (r *Reader) MultiGet(keys [][]byte) ([][]byte, error) {
return store.MultiGet(r, keys)
}
func (r *Reader) PrefixIterator(k []byte) store.KVIterator {
iter, err := r.ss.StartIterator(k, nil, moss.IteratorOptions{})
if err != nil {
return nil
}
rv := &Iterator{
ss: r.ss,
iter: iter,
prefix: k,
start: k,
end: nil,
}
rv.checkDone()
return rv
}
func (r *Reader) RangeIterator(start, end []byte) store.KVIterator {
iter, err := r.ss.StartIterator(start, end, moss.IteratorOptions{})
if err != nil {
return nil
}
rv := &Iterator{
ss: r.ss,
iter: iter,
prefix: nil,
start: start,
end: end,
}
rv.checkDone()
return rv
}
func (r *Reader) Close() error {
return r.ss.Close()
}

183
index/store/moss/store.go Normal file
View File

@ -0,0 +1,183 @@
// Copyright (c) 2016 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
// Package moss provides a KVStore implementation based on the
// github.com/couchbaselabs/moss library.
package moss
import (
"fmt"
"sync"
"github.com/couchbase/moss"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/registry"
)
const Name = "moss"
type Store struct {
m sync.Mutex
ms moss.Collection
mo store.MergeOperator
}
func New(mo store.MergeOperator, config map[string]interface{}) (
store.KVStore, error) {
return NewEx(mo, config, moss.CollectionOptions{})
}
func NewEx(mo store.MergeOperator, config map[string]interface{},
options moss.CollectionOptions) (store.KVStore, error) {
debug := moss.DefaultCollectionOptions.Debug
v, ok := config["mossDebug"]
if ok {
debugF, ok := v.(float64)
if !ok {
return nil, fmt.Errorf("moss store,"+
" could not parse config[mossDebug]: %v", v)
}
debug = int(debugF)
}
minMergePercentage := moss.DefaultCollectionOptions.MinMergePercentage
v, ok = config["mossMinMergePercentage"]
if ok {
minMergePercentage, ok = v.(float64)
if !ok {
return nil, fmt.Errorf("moss store,"+
" could not parse config[mossMinMergePercentage]: %v", v)
}
}
maxStackDirtyTopHeight := moss.DefaultCollectionOptions.MaxStackDirtyTopHeight
v, ok = config["mossMaxStackDirtyTopHeight"]
if ok {
maxStackDirtyTopHeightF, ok := v.(float64)
if !ok {
return nil, fmt.Errorf("moss store,"+
" could not parse config[mossMaxStackDirtyTopHeight]: %v", v)
}
maxStackDirtyTopHeight = int(maxStackDirtyTopHeightF)
}
mossLowerLevelStoreName := ""
v, ok = config["mossLowerLevelStoreName"]
if ok {
mossLowerLevelStoreName, ok = v.(string)
if !ok {
return nil, fmt.Errorf("moss store,"+
" could not parse config[mossLowerLevelStoreName]: %v", v)
}
}
mossLowerLevelMaxBatchSize := uint64(0)
v, ok = config["mossLowerLevelMaxBatchSize"]
if ok {
mossLowerLevelMaxBatchSizeF, ok := v.(float64)
if !ok {
return nil, fmt.Errorf("moss store,"+
" could not parse config[mossLowerLevelMaxBatchSize]: %v", v)
}
mossLowerLevelMaxBatchSize = uint64(mossLowerLevelMaxBatchSizeF)
}
// --------------------------------------------------
if options.MergeOperator == nil {
options.MergeOperator = mo
}
if options.MinMergePercentage <= 0 {
options.MinMergePercentage = minMergePercentage
}
if options.MaxStackDirtyTopHeight <= 0 {
options.MaxStackDirtyTopHeight = maxStackDirtyTopHeight
}
if options.Debug <= 0 {
options.Debug = debug
}
if options.Log == nil {
options.Log = func(format string, a ...interface{}) {}
}
if options.LowerLevelInit == nil &&
options.LowerLevelUpdate == nil &&
mossLowerLevelStoreName != "" {
mossLowerLevelStoreConfig := map[string]interface{}{}
v, ok := config["mossLowerLevelStoreConfig"]
if ok {
mossLowerLevelStoreConfig, ok = v.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("moss store, initLowerLevelStore,"+
" could parse mossLowerLevelStoreConfig: %v", v)
}
}
lowerLevelInit, lowerLevelUpdate, err :=
initLowerLevelStore(mo, config,
mossLowerLevelStoreName,
mossLowerLevelStoreConfig,
mossLowerLevelMaxBatchSize,
options.Log)
if err != nil {
return nil, err
}
options.LowerLevelInit = lowerLevelInit
options.LowerLevelUpdate = lowerLevelUpdate
}
// --------------------------------------------------
ms, err := moss.NewCollection(options)
if err != nil {
return nil, err
}
err = ms.Start()
if err != nil {
return nil, err
}
rv := Store{
ms: ms,
mo: mo,
}
return &rv, nil
}
func (s *Store) Close() error {
return s.ms.Close()
}
func (s *Store) Reader() (store.KVReader, error) {
ss, err := s.ms.Snapshot()
if err != nil {
return nil, err
}
return &Reader{ss: ss}, nil
}
func (s *Store) Writer() (store.KVWriter, error) {
return &Writer{s: s}, nil
}
func init() {
registry.RegisterKVStore(Name, New)
}

View File

@ -0,0 +1,88 @@
// Copyright (c) 2016 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
package moss
import (
"testing"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/index/store/test"
)
func open(t *testing.T, mo store.MergeOperator) store.KVStore {
rv, err := New(mo, nil)
if err != nil {
t.Fatal(err)
}
return rv
}
func cleanup(t *testing.T, s store.KVStore) {
err := s.Close()
if err != nil {
t.Fatal(err)
}
}
func TestMossKVCrud(t *testing.T) {
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestKVCrud(t, s)
}
func TestMossReaderIsolation(t *testing.T) {
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestReaderIsolation(t, s)
}
func TestMossReaderOwnsGetBytes(t *testing.T) {
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestReaderOwnsGetBytes(t, s)
}
func TestMossWriterOwnsBytes(t *testing.T) {
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestWriterOwnsBytes(t, s)
}
func TestMossPrefixIterator(t *testing.T) {
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestPrefixIterator(t, s)
}
func TestMossPrefixIteratorSeek(t *testing.T) {
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestPrefixIteratorSeek(t, s)
}
func TestMossRangeIterator(t *testing.T) {
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestRangeIterator(t, s)
}
func TestMossRangeIteratorSeek(t *testing.T) {
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestRangeIteratorSeek(t, s)
}
func TestMossMerge(t *testing.T) {
s := open(t, &test.TestMergeCounter{})
defer cleanup(t, s)
test.CommonTestMerge(t, s)
}

View File

@ -0,0 +1,85 @@
// Copyright (c) 2016 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
package moss
import (
"fmt"
"github.com/blevesearch/bleve/index/store"
"github.com/couchbase/moss"
)
type Writer struct {
s *Store
}
func (w *Writer) NewBatch() store.KVBatch {
b, err := w.s.ms.NewBatch(0, 0)
if err != nil {
return nil
}
return &Batch{
store: w.s,
merge: store.NewEmulatedMerge(w.s.mo),
batch: b,
alloced: false,
}
}
func (w *Writer) NewBatchEx(options store.KVBatchOptions) (
[]byte, store.KVBatch, error) {
numOps := options.NumSets + options.NumDeletes + options.NumMerges
b, err := w.s.ms.NewBatch(numOps, options.TotalBytes)
if err != nil {
return nil, nil, err
}
buf, err := b.Alloc(options.TotalBytes)
if err != nil {
return nil, nil, err
}
return buf, &Batch{
store: w.s,
merge: store.NewEmulatedMerge(w.s.mo),
batch: b,
alloced: true,
}, nil
}
func (w *Writer) ExecuteBatch(b store.KVBatch) error {
batch, ok := b.(*Batch)
if !ok {
return fmt.Errorf("wrong type of batch")
}
for kStr, mergeOps := range batch.merge.Merges {
k := []byte(kStr)
for _, v := range mergeOps {
err := batch.batch.Merge(k, v)
if err != nil {
return err
}
}
}
return w.s.ms.ExecuteBatch(batch.batch, moss.WriteOptions{})
}
func (w *Writer) Close() error {
w.s = nil
return nil
}