0
0

+ initial stub of goleveldb package

- This is a first-pass introduction. Things may not be working
    correctly yet.
This commit is contained in:
indraniel 2015-03-20 17:40:56 -05:00
parent 8ed7dc1505
commit caa19e6c36
7 changed files with 689 additions and 0 deletions

View File

@ -0,0 +1,106 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package goleveldb
import (
indexStore "github.com/blevesearch/bleve/index/store"
"github.com/syndtr/goleveldb/leveldb"
)
type op struct {
k []byte
v []byte
}
type Batch struct {
store *Store
ops []op
alreadyLocked bool
merges map[string]indexStore.AssociativeMergeChain
}
func newBatch(store *Store) *Batch {
rv := Batch{
store: store,
ops: make([]op, 0, 1000),
merges: make(map[string]indexStore.AssociativeMergeChain),
}
return &rv
}
func newBatchAlreadyLocked(store *Store) *Batch {
rv := Batch{
store: store,
ops: make([]op, 0, 1000),
alreadyLocked: true,
merges: make(map[string]indexStore.AssociativeMergeChain),
}
return &rv
}
func (ldb *Batch) Set(key, val []byte) {
ldb.ops = append(ldb.ops, op{key, val})
}
func (ldb *Batch) Delete(key []byte) {
ldb.ops = append(ldb.ops, op{key, nil})
}
func (ldb *Batch) Merge(key []byte, oper indexStore.AssociativeMerge) {
opers, ok := ldb.merges[string(key)]
if !ok {
opers = make(indexStore.AssociativeMergeChain, 0, 1)
}
opers = append(opers, oper)
ldb.merges[string(key)] = opers
}
func (ldb *Batch) Execute() error {
if !ldb.alreadyLocked {
ldb.store.writer.Lock()
defer ldb.store.writer.Unlock()
}
batch := new(leveldb.Batch)
// first process the merges
for k, mc := range ldb.merges {
val, err := ldb.store.get([]byte(k))
if err != nil {
return err
}
val, err = mc.Merge([]byte(k), val)
if err != nil {
return err
}
if val == nil {
batch.Delete([]byte(k))
} else {
batch.Put([]byte(k), val)
}
}
// now add all the other ops to the batch
for _, op := range ldb.ops {
if op.v == nil {
batch.Delete(op.k)
} else {
batch.Put(op.k, op.v)
}
}
wopts := defaultWriteOptions()
err := ldb.store.db.Write(batch, wopts)
return err
}
func (ldb *Batch) Close() error {
return nil
}

View File

@ -0,0 +1,75 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package goleveldb
import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
)
type Iterator struct {
store *Store
iterator iterator.Iterator
}
func newIterator(store *Store) *Iterator {
ropts := defaultReadOptions()
iter := store.db.NewIterator(nil, ropts)
rv := Iterator{
store: store,
iterator: iter,
}
return &rv
}
func newIteratorWithSnapshot(store *Store, snapshot *leveldb.Snapshot) *Iterator {
options := defaultReadOptions()
iter := snapshot.NewIterator(nil, options)
rv := Iterator{
store: store,
iterator: iter,
}
return &rv
}
func (ldi *Iterator) SeekFirst() {
ldi.iterator.First()
}
func (ldi *Iterator) Seek(key []byte) {
ldi.iterator.Seek(key)
}
func (ldi *Iterator) Next() {
ldi.iterator.Next()
}
func (ldi *Iterator) Current() ([]byte, []byte, bool) {
if ldi.Valid() {
return ldi.Key(), ldi.Value(), true
}
return nil, nil, false
}
func (ldi *Iterator) Key() []byte {
return ldi.iterator.Key()
}
func (ldi *Iterator) Value() []byte {
return ldi.iterator.Value()
}
func (ldi *Iterator) Valid() bool {
return ldi.iterator.Valid()
}
func (ldi *Iterator) Close() error {
return nil
}

View File

@ -0,0 +1,43 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package goleveldb
import (
"github.com/blevesearch/bleve/index/store"
"github.com/syndtr/goleveldb/leveldb"
)
type Reader struct {
store *Store
snapshot *leveldb.Snapshot
}
func newReader(store *Store) (*Reader, error) {
snapshot, _ := store.db.GetSnapshot()
return &Reader{
store: store,
snapshot: snapshot,
}, nil
}
func (r *Reader) Get(key []byte) ([]byte, error) {
return r.store.getWithSnapshot(key, r.snapshot)
}
func (r *Reader) Iterator(key []byte) store.KVIterator {
rv := newIteratorWithSnapshot(r.store, r.snapshot)
rv.Seek(key)
return rv
}
func (r *Reader) Close() error {
r.snapshot.Release()
return nil
}

View File

@ -0,0 +1,160 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package goleveldb
import (
"fmt"
"sync"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/registry"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
)
const Name = "goleveldb"
type Store struct {
path string
opts *opt.Options
db *leveldb.DB
writer sync.Mutex
}
func Open(path string, config map[string]interface{}) (*Store, error) {
rv := Store{
path: path,
opts: &opt.Options{},
}
applyConfig(rv.opts, config)
var err error
rv.db, err = leveldb.OpenFile(rv.path, rv.opts)
if err != nil {
return nil, err
}
return &rv, nil
}
func (ldbs *Store) get(key []byte) ([]byte, error) {
options := defaultReadOptions()
b, err := ldbs.db.Get(key, options)
return b, err
}
func (ldbs *Store) getWithSnapshot(key []byte, snapshot *leveldb.Snapshot) ([]byte, error) {
options := defaultReadOptions()
b, err := snapshot.Get(key, options)
return b, err
}
func (ldbs *Store) set(key, val []byte) error {
ldbs.writer.Lock()
defer ldbs.writer.Unlock()
return ldbs.setlocked(key, val)
}
func (ldbs *Store) setlocked(key, val []byte) error {
options := defaultWriteOptions()
err := ldbs.db.Put(key, val, options)
return err
}
func (ldbs *Store) delete(key []byte) error {
ldbs.writer.Lock()
defer ldbs.writer.Unlock()
return ldbs.deletelocked(key)
}
func (ldbs *Store) deletelocked(key []byte) error {
options := defaultWriteOptions()
err := ldbs.db.Delete(key, options)
return err
}
func (ldbs *Store) Close() error {
ldbs.db.Close()
return nil
}
func (ldbs *Store) iterator(key []byte) store.KVIterator {
rv := newIterator(ldbs)
rv.Seek(key)
return rv
}
func (ldbs *Store) Reader() (store.KVReader, error) {
return newReader(ldbs)
}
func (ldbs *Store) Writer() (store.KVWriter, error) {
return newWriter(ldbs)
}
func (ldbs *Store) newBatch() store.KVBatch {
return newBatch(ldbs)
}
func StoreConstructor(config map[string]interface{}) (store.KVStore, error) {
path, ok := config["path"].(string)
if !ok {
return nil, fmt.Errorf("must specify path")
}
return Open(path, config)
}
func init() {
registry.RegisterKVStore(Name, StoreConstructor)
}
func applyConfig(o *opt.Options, config map[string]interface{}) (
*opt.Options, error) {
cim, ok := config["create_if_missing"].(bool)
if ok {
o.ErrorIfMissing = !cim
}
eie, ok := config["error_if_exists"].(bool)
if ok {
o.ErrorIfExist = eie
}
wbs, ok := config["write_buffer_size"].(float64)
if ok {
o.WriteBuffer = int(wbs)
}
bs, ok := config["block_size"].(float64)
if ok {
o.BlockSize = int(bs)
}
bri, ok := config["block_restart_interval"].(float64)
if ok {
o.BlockRestartInterval = int(bri)
}
lcc, ok := config["lru_cache_capacity"].(float64)
if ok {
o.BlockCacheCapacity = int(lcc)
}
bfbpk, ok := config["bloom_filter_bits_per_key"].(float64)
if ok {
bf := filter.NewBloomFilter(int(bfbpk))
o.Filter = bf
}
return o, nil
}

View File

@ -0,0 +1,226 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package goleveldb
import (
"os"
"reflect"
"testing"
"github.com/blevesearch/bleve/index/store"
)
var leveldbTestOptions = map[string]interface{}{
"create_if_missing": true,
}
func TestLevelDBStore(t *testing.T) {
defer os.RemoveAll("test")
s, err := Open("test", leveldbTestOptions)
if err != nil {
t.Fatal(err)
}
defer s.Close()
CommonTestKVStore(t, s)
}
func TestReaderIsolation(t *testing.T) {
defer os.RemoveAll("test")
s, err := Open("test", leveldbTestOptions)
if err != nil {
t.Fatal(err)
}
defer s.Close()
CommonTestReaderIsolation(t, s)
}
func CommonTestKVStore(t *testing.T, s store.KVStore) {
writer, err := s.Writer()
if err != nil {
t.Error(err)
}
err = writer.Set([]byte("a"), []byte("val-a"))
if err != nil {
t.Fatal(err)
}
err = writer.Set([]byte("z"), []byte("val-z"))
if err != nil {
t.Fatal(err)
}
err = writer.Delete([]byte("z"))
if err != nil {
t.Fatal(err)
}
batch := writer.NewBatch()
batch.Set([]byte("b"), []byte("val-b"))
batch.Set([]byte("c"), []byte("val-c"))
batch.Set([]byte("d"), []byte("val-d"))
batch.Set([]byte("e"), []byte("val-e"))
batch.Set([]byte("f"), []byte("val-f"))
batch.Set([]byte("g"), []byte("val-g"))
batch.Set([]byte("h"), []byte("val-h"))
batch.Set([]byte("i"), []byte("val-i"))
batch.Set([]byte("j"), []byte("val-j"))
err = batch.Execute()
if err != nil {
t.Fatal(err)
}
writer.Close()
reader, err := s.Reader()
if err != nil {
t.Error(err)
}
defer reader.Close()
it := reader.Iterator([]byte("b"))
key, val, valid := it.Current()
if !valid {
t.Fatalf("valid false, expected true")
}
if string(key) != "b" {
t.Fatalf("expected key b, got %s", key)
}
if string(val) != "val-b" {
t.Fatalf("expected value val-b, got %s", val)
}
it.Next()
key, val, valid = it.Current()
if !valid {
t.Fatalf("valid false, expected true")
}
if string(key) != "c" {
t.Fatalf("expected key c, got %s", key)
}
if string(val) != "val-c" {
t.Fatalf("expected value val-c, got %s", val)
}
it.Seek([]byte("i"))
key, val, valid = it.Current()
if !valid {
t.Fatalf("valid false, expected true")
}
if string(key) != "i" {
t.Fatalf("expected key i, got %s", key)
}
if string(val) != "val-i" {
t.Fatalf("expected value val-i, got %s", val)
}
it.Close()
}
func CommonTestReaderIsolation(t *testing.T, s store.KVStore) {
// insert a kv pair
writer, err := s.Writer()
if err != nil {
t.Error(err)
}
err = writer.Set([]byte("a"), []byte("val-a"))
if err != nil {
t.Fatal(err)
}
writer.Close()
// create an isolated reader
reader, err := s.Reader()
if err != nil {
t.Error(err)
}
defer reader.Close()
// verify that we see the value already inserted
val, err := reader.Get([]byte("a"))
if err != nil {
t.Error(err)
}
if !reflect.DeepEqual(val, []byte("val-a")) {
t.Errorf("expected val-a, got nil")
}
// verify that an iterator sees it
count := 0
it := reader.Iterator([]byte{0})
defer it.Close()
for it.Valid() {
it.Next()
count++
}
if count != 1 {
t.Errorf("expected iterator to see 1, saw %d", count)
}
// add something after the reader was created
writer, err = s.Writer()
if err != nil {
t.Error(err)
}
err = writer.Set([]byte("b"), []byte("val-b"))
if err != nil {
t.Fatal(err)
}
writer.Close()
// ensure that a newer reader sees it
newReader, err := s.Reader()
if err != nil {
t.Error(err)
}
defer newReader.Close()
val, err = newReader.Get([]byte("b"))
if err != nil {
t.Error(err)
}
if !reflect.DeepEqual(val, []byte("val-b")) {
t.Errorf("expected val-b, got nil")
}
// ensure that the director iterator sees it
count = 0
it = newReader.Iterator([]byte{0})
defer it.Close()
for it.Valid() {
it.Next()
count++
}
if count != 2 {
t.Errorf("expected iterator to see 2, saw %d", count)
}
// but that the isolated reader does not
val, err = reader.Get([]byte("b"))
if err != nil && err.Error() != "leveldb: not found" {
t.Error(err)
}
if val != nil {
t.Errorf("expected nil, got %v", val)
}
// and ensure that the iterator on the isolated reader also does not
count = 0
it = reader.Iterator([]byte{0})
defer it.Close()
for it.Valid() {
it.Next()
count++
}
if count != 1 {
t.Errorf("expected iterator to see 1, saw %d", count)
}
}

View File

@ -0,0 +1,26 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package goleveldb
import (
"github.com/syndtr/goleveldb/leveldb/opt"
)
func defaultWriteOptions() *opt.WriteOptions {
wo := &opt.WriteOptions{}
// request fsync on write for safety
wo.Sync = true
return wo
}
func defaultReadOptions() *opt.ReadOptions {
ro := &opt.ReadOptions{}
return ro
}

View File

@ -0,0 +1,53 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package goleveldb
import (
"github.com/blevesearch/bleve/index/store"
)
type Writer struct {
store *Store
}
func newWriter(store *Store) (*Writer, error) {
store.writer.Lock()
return &Writer{
store: store,
}, nil
}
func (w *Writer) Set(key, val []byte) error {
return w.store.setlocked(key, val)
}
func (w *Writer) Delete(key []byte) error {
return w.store.deletelocked(key)
}
func (w *Writer) NewBatch() store.KVBatch {
return newBatchAlreadyLocked(w.store)
}
func (w *Writer) Close() error {
w.store.writer.Unlock()
return nil
}
// these two methods can safely read using the regular
// methods without a read transaction, because we know
// that no one else is writing but us
func (w *Writer) Get(key []byte) ([]byte, error) {
return w.store.get(key)
}
func (w *Writer) Iterator(key []byte) store.KVIterator {
return w.store.iterator(key)
}