From caa19e6c36771172175c0f35e15a7b192704c3fa Mon Sep 17 00:00:00 2001 From: indraniel Date: Fri, 20 Mar 2015 17:40:56 -0500 Subject: [PATCH] + initial stub of goleveldb package - This is a first-pass introduction. Things may not be working correctly yet. --- index/store/goleveldb/batch.go | 106 +++++++++++++ index/store/goleveldb/iterator.go | 75 +++++++++ index/store/goleveldb/reader.go | 43 ++++++ index/store/goleveldb/store.go | 160 ++++++++++++++++++++ index/store/goleveldb/store_test.go | 226 ++++++++++++++++++++++++++++ index/store/goleveldb/util.go | 26 ++++ index/store/goleveldb/writer.go | 53 +++++++ 7 files changed, 689 insertions(+) create mode 100644 index/store/goleveldb/batch.go create mode 100644 index/store/goleveldb/iterator.go create mode 100644 index/store/goleveldb/reader.go create mode 100644 index/store/goleveldb/store.go create mode 100644 index/store/goleveldb/store_test.go create mode 100644 index/store/goleveldb/util.go create mode 100644 index/store/goleveldb/writer.go diff --git a/index/store/goleveldb/batch.go b/index/store/goleveldb/batch.go new file mode 100644 index 00000000..455cfe3f --- /dev/null +++ b/index/store/goleveldb/batch.go @@ -0,0 +1,106 @@ +// Copyright (c) 2014 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed under the +// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +// either express or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package goleveldb + +import ( + indexStore "github.com/blevesearch/bleve/index/store" + "github.com/syndtr/goleveldb/leveldb" +) + +type op struct { + k []byte + v []byte +} + +type Batch struct { + store *Store + ops []op + alreadyLocked bool + merges map[string]indexStore.AssociativeMergeChain +} + +func newBatch(store *Store) *Batch { + rv := Batch{ + store: store, + ops: make([]op, 0, 1000), + merges: make(map[string]indexStore.AssociativeMergeChain), + } + return &rv +} + +func newBatchAlreadyLocked(store *Store) *Batch { + rv := Batch{ + store: store, + ops: make([]op, 0, 1000), + alreadyLocked: true, + merges: make(map[string]indexStore.AssociativeMergeChain), + } + return &rv +} + +func (ldb *Batch) Set(key, val []byte) { + ldb.ops = append(ldb.ops, op{key, val}) +} + +func (ldb *Batch) Delete(key []byte) { + ldb.ops = append(ldb.ops, op{key, nil}) +} + +func (ldb *Batch) Merge(key []byte, oper indexStore.AssociativeMerge) { + opers, ok := ldb.merges[string(key)] + if !ok { + opers = make(indexStore.AssociativeMergeChain, 0, 1) + } + opers = append(opers, oper) + ldb.merges[string(key)] = opers +} + +func (ldb *Batch) Execute() error { + if !ldb.alreadyLocked { + ldb.store.writer.Lock() + defer ldb.store.writer.Unlock() + } + + batch := new(leveldb.Batch) + + // first process the merges + for k, mc := range ldb.merges { + val, err := ldb.store.get([]byte(k)) + if err != nil { + return err + } + val, err = mc.Merge([]byte(k), val) + if err != nil { + return err + } + if val == nil { + batch.Delete([]byte(k)) + } else { + batch.Put([]byte(k), val) + } + } + + // now add all the other ops to the batch + for _, op := range ldb.ops { + if op.v == nil { + batch.Delete(op.k) + } else { + batch.Put(op.k, op.v) + } + } + + wopts := defaultWriteOptions() + err := ldb.store.db.Write(batch, wopts) + return err +} + +func (ldb *Batch) Close() error { + return nil +} diff --git a/index/store/goleveldb/iterator.go b/index/store/goleveldb/iterator.go new file mode 100644 index 00000000..f067aeb5 --- /dev/null +++ b/index/store/goleveldb/iterator.go @@ -0,0 +1,75 @@ +// Copyright (c) 2014 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed under the +// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +// either express or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package goleveldb + +import ( + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" +) + +type Iterator struct { + store *Store + iterator iterator.Iterator +} + +func newIterator(store *Store) *Iterator { + ropts := defaultReadOptions() + iter := store.db.NewIterator(nil, ropts) + rv := Iterator{ + store: store, + iterator: iter, + } + return &rv +} + +func newIteratorWithSnapshot(store *Store, snapshot *leveldb.Snapshot) *Iterator { + options := defaultReadOptions() + iter := snapshot.NewIterator(nil, options) + rv := Iterator{ + store: store, + iterator: iter, + } + return &rv +} + +func (ldi *Iterator) SeekFirst() { + ldi.iterator.First() +} + +func (ldi *Iterator) Seek(key []byte) { + ldi.iterator.Seek(key) +} + +func (ldi *Iterator) Next() { + ldi.iterator.Next() +} + +func (ldi *Iterator) Current() ([]byte, []byte, bool) { + if ldi.Valid() { + return ldi.Key(), ldi.Value(), true + } + return nil, nil, false +} + +func (ldi *Iterator) Key() []byte { + return ldi.iterator.Key() +} + +func (ldi *Iterator) Value() []byte { + return ldi.iterator.Value() +} + +func (ldi *Iterator) Valid() bool { + return ldi.iterator.Valid() +} + +func (ldi *Iterator) Close() error { + return nil +} diff --git a/index/store/goleveldb/reader.go b/index/store/goleveldb/reader.go new file mode 100644 index 00000000..49ab314f --- /dev/null +++ b/index/store/goleveldb/reader.go @@ -0,0 +1,43 @@ +// Copyright (c) 2014 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed under the +// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +// either express or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package goleveldb + +import ( + "github.com/blevesearch/bleve/index/store" + "github.com/syndtr/goleveldb/leveldb" +) + +type Reader struct { + store *Store + snapshot *leveldb.Snapshot +} + +func newReader(store *Store) (*Reader, error) { + snapshot, _ := store.db.GetSnapshot() + return &Reader{ + store: store, + snapshot: snapshot, + }, nil +} + +func (r *Reader) Get(key []byte) ([]byte, error) { + return r.store.getWithSnapshot(key, r.snapshot) +} + +func (r *Reader) Iterator(key []byte) store.KVIterator { + rv := newIteratorWithSnapshot(r.store, r.snapshot) + rv.Seek(key) + return rv +} + +func (r *Reader) Close() error { + r.snapshot.Release() + return nil +} diff --git a/index/store/goleveldb/store.go b/index/store/goleveldb/store.go new file mode 100644 index 00000000..4d85d616 --- /dev/null +++ b/index/store/goleveldb/store.go @@ -0,0 +1,160 @@ +// Copyright (c) 2014 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed under the +// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +// either express or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package goleveldb + +import ( + "fmt" + "sync" + + "github.com/blevesearch/bleve/index/store" + "github.com/blevesearch/bleve/registry" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/filter" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +const Name = "goleveldb" + +type Store struct { + path string + opts *opt.Options + db *leveldb.DB + writer sync.Mutex +} + +func Open(path string, config map[string]interface{}) (*Store, error) { + rv := Store{ + path: path, + opts: &opt.Options{}, + } + + applyConfig(rv.opts, config) + + var err error + rv.db, err = leveldb.OpenFile(rv.path, rv.opts) + if err != nil { + return nil, err + } + + return &rv, nil +} + +func (ldbs *Store) get(key []byte) ([]byte, error) { + options := defaultReadOptions() + b, err := ldbs.db.Get(key, options) + return b, err +} + +func (ldbs *Store) getWithSnapshot(key []byte, snapshot *leveldb.Snapshot) ([]byte, error) { + options := defaultReadOptions() + b, err := snapshot.Get(key, options) + return b, err +} + +func (ldbs *Store) set(key, val []byte) error { + ldbs.writer.Lock() + defer ldbs.writer.Unlock() + return ldbs.setlocked(key, val) +} + +func (ldbs *Store) setlocked(key, val []byte) error { + options := defaultWriteOptions() + err := ldbs.db.Put(key, val, options) + return err +} + +func (ldbs *Store) delete(key []byte) error { + ldbs.writer.Lock() + defer ldbs.writer.Unlock() + return ldbs.deletelocked(key) +} + +func (ldbs *Store) deletelocked(key []byte) error { + options := defaultWriteOptions() + err := ldbs.db.Delete(key, options) + return err +} + +func (ldbs *Store) Close() error { + ldbs.db.Close() + return nil +} + +func (ldbs *Store) iterator(key []byte) store.KVIterator { + rv := newIterator(ldbs) + rv.Seek(key) + return rv +} + +func (ldbs *Store) Reader() (store.KVReader, error) { + return newReader(ldbs) +} + +func (ldbs *Store) Writer() (store.KVWriter, error) { + return newWriter(ldbs) +} + +func (ldbs *Store) newBatch() store.KVBatch { + return newBatch(ldbs) +} + +func StoreConstructor(config map[string]interface{}) (store.KVStore, error) { + path, ok := config["path"].(string) + if !ok { + return nil, fmt.Errorf("must specify path") + } + return Open(path, config) +} + +func init() { + registry.RegisterKVStore(Name, StoreConstructor) +} + +func applyConfig(o *opt.Options, config map[string]interface{}) ( + *opt.Options, error) { + + cim, ok := config["create_if_missing"].(bool) + if ok { + o.ErrorIfMissing = !cim + } + + eie, ok := config["error_if_exists"].(bool) + if ok { + o.ErrorIfExist = eie + } + + wbs, ok := config["write_buffer_size"].(float64) + if ok { + o.WriteBuffer = int(wbs) + } + + bs, ok := config["block_size"].(float64) + if ok { + o.BlockSize = int(bs) + } + + bri, ok := config["block_restart_interval"].(float64) + if ok { + o.BlockRestartInterval = int(bri) + } + + lcc, ok := config["lru_cache_capacity"].(float64) + if ok { + o.BlockCacheCapacity = int(lcc) + } + + bfbpk, ok := config["bloom_filter_bits_per_key"].(float64) + if ok { + bf := filter.NewBloomFilter(int(bfbpk)) + o.Filter = bf + } + + return o, nil +} diff --git a/index/store/goleveldb/store_test.go b/index/store/goleveldb/store_test.go new file mode 100644 index 00000000..d2dc2631 --- /dev/null +++ b/index/store/goleveldb/store_test.go @@ -0,0 +1,226 @@ +// Copyright (c) 2014 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed under the +// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +// either express or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package goleveldb + +import ( + "os" + "reflect" + "testing" + + "github.com/blevesearch/bleve/index/store" +) + +var leveldbTestOptions = map[string]interface{}{ + "create_if_missing": true, +} + +func TestLevelDBStore(t *testing.T) { + defer os.RemoveAll("test") + + s, err := Open("test", leveldbTestOptions) + if err != nil { + t.Fatal(err) + } + defer s.Close() + + CommonTestKVStore(t, s) +} + +func TestReaderIsolation(t *testing.T) { + defer os.RemoveAll("test") + + s, err := Open("test", leveldbTestOptions) + if err != nil { + t.Fatal(err) + } + defer s.Close() + + CommonTestReaderIsolation(t, s) +} + +func CommonTestKVStore(t *testing.T, s store.KVStore) { + + writer, err := s.Writer() + if err != nil { + t.Error(err) + } + err = writer.Set([]byte("a"), []byte("val-a")) + if err != nil { + t.Fatal(err) + } + err = writer.Set([]byte("z"), []byte("val-z")) + if err != nil { + t.Fatal(err) + } + err = writer.Delete([]byte("z")) + if err != nil { + t.Fatal(err) + } + + batch := writer.NewBatch() + batch.Set([]byte("b"), []byte("val-b")) + batch.Set([]byte("c"), []byte("val-c")) + batch.Set([]byte("d"), []byte("val-d")) + batch.Set([]byte("e"), []byte("val-e")) + batch.Set([]byte("f"), []byte("val-f")) + batch.Set([]byte("g"), []byte("val-g")) + batch.Set([]byte("h"), []byte("val-h")) + batch.Set([]byte("i"), []byte("val-i")) + batch.Set([]byte("j"), []byte("val-j")) + + err = batch.Execute() + if err != nil { + t.Fatal(err) + } + writer.Close() + + reader, err := s.Reader() + if err != nil { + t.Error(err) + } + defer reader.Close() + it := reader.Iterator([]byte("b")) + key, val, valid := it.Current() + if !valid { + t.Fatalf("valid false, expected true") + } + if string(key) != "b" { + t.Fatalf("expected key b, got %s", key) + } + if string(val) != "val-b" { + t.Fatalf("expected value val-b, got %s", val) + } + + it.Next() + key, val, valid = it.Current() + if !valid { + t.Fatalf("valid false, expected true") + } + if string(key) != "c" { + t.Fatalf("expected key c, got %s", key) + } + if string(val) != "val-c" { + t.Fatalf("expected value val-c, got %s", val) + } + + it.Seek([]byte("i")) + key, val, valid = it.Current() + if !valid { + t.Fatalf("valid false, expected true") + } + if string(key) != "i" { + t.Fatalf("expected key i, got %s", key) + } + if string(val) != "val-i" { + t.Fatalf("expected value val-i, got %s", val) + } + + it.Close() +} + +func CommonTestReaderIsolation(t *testing.T, s store.KVStore) { + // insert a kv pair + writer, err := s.Writer() + if err != nil { + t.Error(err) + } + err = writer.Set([]byte("a"), []byte("val-a")) + if err != nil { + t.Fatal(err) + } + writer.Close() + + // create an isolated reader + reader, err := s.Reader() + if err != nil { + t.Error(err) + } + defer reader.Close() + + // verify that we see the value already inserted + val, err := reader.Get([]byte("a")) + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(val, []byte("val-a")) { + t.Errorf("expected val-a, got nil") + } + + // verify that an iterator sees it + count := 0 + it := reader.Iterator([]byte{0}) + defer it.Close() + for it.Valid() { + it.Next() + count++ + } + if count != 1 { + t.Errorf("expected iterator to see 1, saw %d", count) + } + + // add something after the reader was created + writer, err = s.Writer() + if err != nil { + t.Error(err) + } + err = writer.Set([]byte("b"), []byte("val-b")) + if err != nil { + t.Fatal(err) + } + writer.Close() + + // ensure that a newer reader sees it + newReader, err := s.Reader() + if err != nil { + t.Error(err) + } + defer newReader.Close() + val, err = newReader.Get([]byte("b")) + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(val, []byte("val-b")) { + t.Errorf("expected val-b, got nil") + } + + // ensure that the director iterator sees it + count = 0 + it = newReader.Iterator([]byte{0}) + defer it.Close() + for it.Valid() { + it.Next() + count++ + } + if count != 2 { + t.Errorf("expected iterator to see 2, saw %d", count) + } + + // but that the isolated reader does not + val, err = reader.Get([]byte("b")) + if err != nil && err.Error() != "leveldb: not found" { + t.Error(err) + } + if val != nil { + t.Errorf("expected nil, got %v", val) + } + + // and ensure that the iterator on the isolated reader also does not + count = 0 + it = reader.Iterator([]byte{0}) + defer it.Close() + for it.Valid() { + it.Next() + count++ + } + if count != 1 { + t.Errorf("expected iterator to see 1, saw %d", count) + } + +} diff --git a/index/store/goleveldb/util.go b/index/store/goleveldb/util.go new file mode 100644 index 00000000..4e22808b --- /dev/null +++ b/index/store/goleveldb/util.go @@ -0,0 +1,26 @@ +// Copyright (c) 2014 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed under the +// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +// either express or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package goleveldb + +import ( + "github.com/syndtr/goleveldb/leveldb/opt" +) + +func defaultWriteOptions() *opt.WriteOptions { + wo := &opt.WriteOptions{} + // request fsync on write for safety + wo.Sync = true + return wo +} + +func defaultReadOptions() *opt.ReadOptions { + ro := &opt.ReadOptions{} + return ro +} diff --git a/index/store/goleveldb/writer.go b/index/store/goleveldb/writer.go new file mode 100644 index 00000000..946c1187 --- /dev/null +++ b/index/store/goleveldb/writer.go @@ -0,0 +1,53 @@ +// Copyright (c) 2014 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed under the +// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +// either express or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package goleveldb + +import ( + "github.com/blevesearch/bleve/index/store" +) + +type Writer struct { + store *Store +} + +func newWriter(store *Store) (*Writer, error) { + store.writer.Lock() + return &Writer{ + store: store, + }, nil +} + +func (w *Writer) Set(key, val []byte) error { + return w.store.setlocked(key, val) +} + +func (w *Writer) Delete(key []byte) error { + return w.store.deletelocked(key) +} + +func (w *Writer) NewBatch() store.KVBatch { + return newBatchAlreadyLocked(w.store) +} + +func (w *Writer) Close() error { + w.store.writer.Unlock() + return nil +} + +// these two methods can safely read using the regular +// methods without a read transaction, because we know +// that no one else is writing but us +func (w *Writer) Get(key []byte) ([]byte, error) { + return w.store.get(key) +} + +func (w *Writer) Iterator(key []byte) store.KVIterator { + return w.store.iterator(key) +}