enable mossStore as configurable lower-level store
Also, bumped moss vendor SHA to latest moss with mossStore.
This commit is contained in:
parent
d8ccda94f1
commit
bf318b489b
|
@ -16,6 +16,7 @@ package moss
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/couchbase/moss"
|
||||
|
@ -32,12 +33,6 @@ func initLowerLevelStore(
|
|||
lowerLevelMaxBatchSize uint64,
|
||||
logf func(format string, a ...interface{}),
|
||||
) (moss.Snapshot, moss.LowerLevelUpdate, store.KVStore, error) {
|
||||
constructor := registry.KVStoreConstructorByName(lowerLevelStoreName)
|
||||
if constructor == nil {
|
||||
return nil, nil, nil, fmt.Errorf("moss store, initLowerLevelStore,"+
|
||||
" could not find lower level store: %s", lowerLevelStoreName)
|
||||
}
|
||||
|
||||
if lowerLevelStoreConfig == nil {
|
||||
lowerLevelStoreConfig = map[string]interface{}{}
|
||||
}
|
||||
|
@ -49,6 +44,16 @@ func initLowerLevelStore(
|
|||
}
|
||||
}
|
||||
|
||||
if lowerLevelStoreName == "mossStore" {
|
||||
return InitMossStore(mo, lowerLevelStoreConfig)
|
||||
}
|
||||
|
||||
constructor := registry.KVStoreConstructorByName(lowerLevelStoreName)
|
||||
if constructor == nil {
|
||||
return nil, nil, nil, fmt.Errorf("moss store, initLowerLevelStore,"+
|
||||
" could not find lower level store: %s", lowerLevelStoreName)
|
||||
}
|
||||
|
||||
kvStore, err := constructor(mo, lowerLevelStoreConfig)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
|
@ -400,5 +405,76 @@ func (lli *llIterator) Current() (key, val []byte, err error) {
|
|||
func (lli *llIterator) CurrentEx() (
|
||||
entryEx moss.EntryEx, key, val []byte, err error) {
|
||||
return moss.EntryEx{}, nil, nil, moss.ErrUnimplemented
|
||||
|
||||
}
|
||||
|
||||
// ------------------------------------------------
|
||||
|
||||
func InitMossStore(mo store.MergeOperator, config map[string]interface{}) (
|
||||
moss.Snapshot, moss.LowerLevelUpdate, store.KVStore, error) {
|
||||
path, ok := config["path"].(string)
|
||||
if !ok {
|
||||
return nil, nil, nil, fmt.Errorf("lower: missing path for InitMossStore config")
|
||||
}
|
||||
|
||||
err := os.MkdirAll(path, 0700)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("lower: InitMossStore mkdir, path: %s, err: %v",
|
||||
path, err)
|
||||
}
|
||||
|
||||
s, err := moss.OpenStore(path, moss.StoreOptions{ // TODO: more options.
|
||||
CollectionOptions: moss.CollectionOptions{
|
||||
MergeOperator: mo,
|
||||
},
|
||||
CompactionPercentage: 0.0,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("lower: moss.OpenStore, path: %s, err: %v",
|
||||
path, err)
|
||||
}
|
||||
|
||||
sw := &mossStoreWrapper{s: s}
|
||||
|
||||
llUpdate := func(ssHigher moss.Snapshot) (moss.Snapshot, error) {
|
||||
ss, err := sw.s.Persist(ssHigher, moss.StorePersistOptions{
|
||||
CompactionConcern: moss.CompactionAllow,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sw.AddRef() // Ref-count to be owned bysnapshot wrapper.
|
||||
|
||||
return moss.NewSnapshotWrapper(ss, sw), nil
|
||||
}
|
||||
|
||||
llSnapshot, err := llUpdate(nil)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
return llSnapshot, llUpdate, nil, nil
|
||||
}
|
||||
|
||||
type mossStoreWrapper struct {
|
||||
m sync.Mutex
|
||||
refs int
|
||||
s *moss.Store
|
||||
}
|
||||
|
||||
func (w *mossStoreWrapper) AddRef() {
|
||||
w.m.Lock()
|
||||
w.refs++
|
||||
w.m.Unlock()
|
||||
}
|
||||
|
||||
func (w *mossStoreWrapper) Close() (err error) {
|
||||
w.m.Lock()
|
||||
w.refs--
|
||||
if w.refs <= 0 {
|
||||
err = w.s.Close()
|
||||
w.s = nil
|
||||
}
|
||||
w.m.Unlock()
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
// Copyright (c) 2016 Couchbase, Inc.
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the
|
||||
// License. You may obtain a copy of the License at
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an "AS
|
||||
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
// express or implied. See the License for the specific language
|
||||
// governing permissions and limitations under the License.
|
||||
|
||||
package moss
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/blevesearch/bleve/index/store"
|
||||
"github.com/blevesearch/bleve/index/store/test"
|
||||
)
|
||||
|
||||
func openWithLower(t *testing.T, mo store.MergeOperator) (string, store.KVStore) {
|
||||
tmpDir, _ := ioutil.TempDir("", "mossStore")
|
||||
|
||||
config := map[string]interface{}{
|
||||
"path": tmpDir,
|
||||
"mossLowerLevelStoreName": "mossStore",
|
||||
}
|
||||
|
||||
rv, err := New(mo, config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return tmpDir, rv
|
||||
}
|
||||
|
||||
func cleanupWithLower(t *testing.T, s store.KVStore, tmpDir string) {
|
||||
err := s.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = os.RemoveAll(tmpDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMossWithLowerKVCrud(t *testing.T) {
|
||||
tmpDir, s := openWithLower(t, nil)
|
||||
defer cleanupWithLower(t, s, tmpDir)
|
||||
test.CommonTestKVCrud(t, s)
|
||||
}
|
||||
|
||||
func TestMossWithLowerReaderIsolation(t *testing.T) {
|
||||
tmpDir, s := openWithLower(t, nil)
|
||||
defer cleanupWithLower(t, s, tmpDir)
|
||||
test.CommonTestReaderIsolation(t, s)
|
||||
}
|
||||
|
||||
func TestMossWithLowerReaderOwnsGetBytes(t *testing.T) {
|
||||
tmpDir, s := openWithLower(t, nil)
|
||||
defer cleanupWithLower(t, s, tmpDir)
|
||||
test.CommonTestReaderOwnsGetBytes(t, s)
|
||||
}
|
||||
|
||||
func TestMossWithLowerWriterOwnsBytes(t *testing.T) {
|
||||
tmpDir, s := openWithLower(t, nil)
|
||||
defer cleanupWithLower(t, s, tmpDir)
|
||||
test.CommonTestWriterOwnsBytes(t, s)
|
||||
}
|
||||
|
||||
func TestMossWithLowerPrefixIterator(t *testing.T) {
|
||||
tmpDir, s := openWithLower(t, nil)
|
||||
defer cleanupWithLower(t, s, tmpDir)
|
||||
test.CommonTestPrefixIterator(t, s)
|
||||
}
|
||||
|
||||
func TestMossWithLowerPrefixIteratorSeek(t *testing.T) {
|
||||
tmpDir, s := openWithLower(t, nil)
|
||||
defer cleanupWithLower(t, s, tmpDir)
|
||||
test.CommonTestPrefixIteratorSeek(t, s)
|
||||
}
|
||||
|
||||
func TestMossWithLowerRangeIterator(t *testing.T) {
|
||||
tmpDir, s := openWithLower(t, nil)
|
||||
defer cleanupWithLower(t, s, tmpDir)
|
||||
test.CommonTestRangeIterator(t, s)
|
||||
}
|
||||
|
||||
func TestMossWithLowerRangeIteratorSeek(t *testing.T) {
|
||||
tmpDir, s := openWithLower(t, nil)
|
||||
defer cleanupWithLower(t, s, tmpDir)
|
||||
test.CommonTestRangeIteratorSeek(t, s)
|
||||
}
|
||||
|
||||
func TestMossWithLowerMerge(t *testing.T) {
|
||||
tmpDir, s := openWithLower(t, &test.TestMergeCounter{})
|
||||
defer cleanupWithLower(t, s, tmpDir)
|
||||
test.CommonTestMerge(t, s)
|
||||
}
|
|
@ -70,13 +70,13 @@ func New(mo store.MergeOperator, config map[string]interface{}) (
|
|||
b, err := json.Marshal(v) // Convert from map[string]interface{}.
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("moss store,"+
|
||||
" could not marshal config[mossCollectionOptions]: %v", v)
|
||||
" could not marshal config[mossCollectionOptions]: %v, err: %v", v, err)
|
||||
}
|
||||
|
||||
err = json.Unmarshal(b, &options)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("moss store,"+
|
||||
" could not unmarshal config[mossCollectionOptions]: %v", v)
|
||||
" could not unmarshal config[mossCollectionOptions]: %v, err: %v", v, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
{
|
||||
"importpath": "github.com/couchbase/moss",
|
||||
"repository": "https://github.com/couchbase/moss",
|
||||
"revision": "ffa548e939e2458ad3fd98809aa9caa5325635a0",
|
||||
"revision": "e013f5f973e5b094ecf61e08ae9aa3754bd22d15",
|
||||
"branch": "master",
|
||||
"notests": true
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue