adding initial impl of rocksdb kv store
This commit is contained in:
parent
a9c07acbfa
commit
452fea6a24
16
config_rocksdb.go
Normal file
16
config_rocksdb.go
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
// Copyright (c) 2014 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
||||||
|
// except in compliance with the License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed under the
|
||||||
|
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||||||
|
// either express or implied. See the License for the specific language governing permissions
|
||||||
|
// and limitations under the License.
|
||||||
|
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package bleve
|
||||||
|
|
||||||
|
import (
|
||||||
|
_ "github.com/blevesearch/bleve/index/store/gorocksdb"
|
||||||
|
)
|
43
index/store/gorocksdb/batch.go
Normal file
43
index/store/gorocksdb/batch.go
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
// Copyright (c) 2014 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
||||||
|
// except in compliance with the License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed under the
|
||||||
|
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||||||
|
// either express or implied. See the License for the specific language governing permissions
|
||||||
|
// and limitations under the License.
|
||||||
|
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package rocksdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/tecbot/gorocksdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Batch struct {
|
||||||
|
w *Writer
|
||||||
|
batch *gorocksdb.WriteBatch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Batch) Set(key, val []byte) {
|
||||||
|
b.batch.Put(key, val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Batch) Delete(key []byte) {
|
||||||
|
b.batch.Delete(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Batch) Merge(key, val []byte) {
|
||||||
|
b.batch.Merge(key, val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Batch) Execute() error {
|
||||||
|
wopts := defaultWriteOptions()
|
||||||
|
err := b.w.store.db.Write(wopts, b.batch)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Batch) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
76
index/store/gorocksdb/iterator.go
Normal file
76
index/store/gorocksdb/iterator.go
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
// Copyright (c) 2014 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
||||||
|
// except in compliance with the License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed under the
|
||||||
|
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||||||
|
// either express or implied. See the License for the specific language governing permissions
|
||||||
|
// and limitations under the License.
|
||||||
|
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package rocksdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/tecbot/gorocksdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Iterator struct {
|
||||||
|
store *Store
|
||||||
|
iterator *gorocksdb.Iterator
|
||||||
|
}
|
||||||
|
|
||||||
|
func newIterator(store *Store) *Iterator {
|
||||||
|
ropts := defaultReadOptions()
|
||||||
|
rv := Iterator{
|
||||||
|
store: store,
|
||||||
|
iterator: store.db.NewIterator(ropts),
|
||||||
|
}
|
||||||
|
return &rv
|
||||||
|
}
|
||||||
|
|
||||||
|
func newIteratorWithSnapshot(store *Store, snapshot *gorocksdb.Snapshot) *Iterator {
|
||||||
|
options := defaultReadOptions()
|
||||||
|
options.SetSnapshot(snapshot)
|
||||||
|
rv := Iterator{
|
||||||
|
store: store,
|
||||||
|
iterator: store.db.NewIterator(options),
|
||||||
|
}
|
||||||
|
return &rv
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldi *Iterator) SeekFirst() {
|
||||||
|
ldi.iterator.SeekToFirst()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldi *Iterator) Seek(key []byte) {
|
||||||
|
ldi.iterator.Seek(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldi *Iterator) Next() {
|
||||||
|
ldi.iterator.Next()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldi *Iterator) Current() ([]byte, []byte, bool) {
|
||||||
|
if ldi.Valid() {
|
||||||
|
return ldi.Key(), ldi.Value(), true
|
||||||
|
}
|
||||||
|
return nil, nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldi *Iterator) Key() []byte {
|
||||||
|
return ldi.iterator.Key().Data()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldi *Iterator) Value() []byte {
|
||||||
|
return ldi.iterator.Value().Data()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldi *Iterator) Valid() bool {
|
||||||
|
return ldi.iterator.Valid()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldi *Iterator) Close() error {
|
||||||
|
ldi.iterator.Close()
|
||||||
|
return nil
|
||||||
|
}
|
48
index/store/gorocksdb/reader.go
Normal file
48
index/store/gorocksdb/reader.go
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
// Copyright (c) 2014 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
||||||
|
// except in compliance with the License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed under the
|
||||||
|
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||||||
|
// either express or implied. See the License for the specific language governing permissions
|
||||||
|
// and limitations under the License.
|
||||||
|
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package rocksdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/blevesearch/bleve/index/store"
|
||||||
|
"github.com/tecbot/gorocksdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Reader struct {
|
||||||
|
store *Store
|
||||||
|
snapshot *gorocksdb.Snapshot
|
||||||
|
}
|
||||||
|
|
||||||
|
func newReader(store *Store) (*Reader, error) {
|
||||||
|
return &Reader{
|
||||||
|
store: store,
|
||||||
|
snapshot: store.db.NewSnapshot(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) BytesSafeAfterClose() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) Get(key []byte) ([]byte, error) {
|
||||||
|
return r.store.getWithSnapshot(key, r.snapshot)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) Iterator(key []byte) store.KVIterator {
|
||||||
|
rv := newIteratorWithSnapshot(r.store, r.snapshot)
|
||||||
|
rv.Seek(key)
|
||||||
|
return rv
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) Close() error {
|
||||||
|
r.snapshot.Release()
|
||||||
|
return nil
|
||||||
|
}
|
146
index/store/gorocksdb/store.go
Normal file
146
index/store/gorocksdb/store.go
Normal file
@ -0,0 +1,146 @@
|
|||||||
|
// Copyright (c) 2014 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
||||||
|
// except in compliance with the License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed under the
|
||||||
|
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||||||
|
// either express or implied. See the License for the specific language governing permissions
|
||||||
|
// and limitations under the License.
|
||||||
|
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package rocksdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/blevesearch/bleve/index/store"
|
||||||
|
"github.com/blevesearch/bleve/registry"
|
||||||
|
"github.com/tecbot/gorocksdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
const Name = "rocksdb"
|
||||||
|
|
||||||
|
type Store struct {
|
||||||
|
path string
|
||||||
|
opts *gorocksdb.Options
|
||||||
|
db *gorocksdb.DB
|
||||||
|
writer sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(path string, config map[string]interface{}) (*Store, error) {
|
||||||
|
rv := Store{
|
||||||
|
path: path,
|
||||||
|
opts: gorocksdb.NewDefaultOptions(),
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := applyConfig(rv.opts, config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &rv, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldbs *Store) Open() error {
|
||||||
|
var err error
|
||||||
|
ldbs.db, err = gorocksdb.OpenDb(ldbs.opts, ldbs.path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldbs *Store) SetMergeOperator(mo store.MergeOperator) {
|
||||||
|
ldbs.opts.SetMergeOperator(mo)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldbs *Store) get(key []byte) ([]byte, error) {
|
||||||
|
options := defaultReadOptions()
|
||||||
|
b, err := ldbs.db.Get(options, key)
|
||||||
|
return b.Data(), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldbs *Store) getWithSnapshot(key []byte, snapshot *gorocksdb.Snapshot) ([]byte, error) {
|
||||||
|
options := defaultReadOptions()
|
||||||
|
options.SetSnapshot(snapshot)
|
||||||
|
b, err := ldbs.db.Get(options, key)
|
||||||
|
return b.Data(), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldbs *Store) set(key, val []byte) error {
|
||||||
|
ldbs.writer.Lock()
|
||||||
|
defer ldbs.writer.Unlock()
|
||||||
|
return ldbs.setlocked(key, val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldbs *Store) setlocked(key, val []byte) error {
|
||||||
|
options := defaultWriteOptions()
|
||||||
|
err := ldbs.db.Put(options, key, val)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldbs *Store) delete(key []byte) error {
|
||||||
|
ldbs.writer.Lock()
|
||||||
|
defer ldbs.writer.Unlock()
|
||||||
|
return ldbs.deletelocked(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldbs *Store) deletelocked(key []byte) error {
|
||||||
|
options := defaultWriteOptions()
|
||||||
|
err := ldbs.db.Delete(options, key)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldbs *Store) Close() error {
|
||||||
|
ldbs.db.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldbs *Store) iterator(key []byte) store.KVIterator {
|
||||||
|
rv := newIterator(ldbs)
|
||||||
|
rv.Seek(key)
|
||||||
|
return rv
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldbs *Store) Reader() (store.KVReader, error) {
|
||||||
|
return newReader(ldbs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ldbs *Store) Writer() (store.KVWriter, error) {
|
||||||
|
return newWriter(ldbs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func StoreConstructor(config map[string]interface{}) (store.KVStore, error) {
|
||||||
|
path, ok := config["path"].(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("must specify path")
|
||||||
|
}
|
||||||
|
return New(path, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
registry.RegisterKVStore(Name, StoreConstructor)
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyConfig(o *gorocksdb.Options, config map[string]interface{}) (
|
||||||
|
*gorocksdb.Options, error) {
|
||||||
|
|
||||||
|
cim, ok := config["create_if_missing"].(bool)
|
||||||
|
if ok {
|
||||||
|
o.SetCreateIfMissing(cim)
|
||||||
|
}
|
||||||
|
|
||||||
|
eie, ok := config["error_if_exists"].(bool)
|
||||||
|
if ok {
|
||||||
|
o.SetErrorIfExists(eie)
|
||||||
|
}
|
||||||
|
|
||||||
|
wbs, ok := config["write_buffer_size"].(float64)
|
||||||
|
if ok {
|
||||||
|
o.SetWriteBufferSize(int(wbs))
|
||||||
|
}
|
||||||
|
|
||||||
|
return o, nil
|
||||||
|
}
|
298
index/store/gorocksdb/store_test.go
Normal file
298
index/store/gorocksdb/store_test.go
Normal file
@ -0,0 +1,298 @@
|
|||||||
|
// Copyright (c) 2014 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
||||||
|
// except in compliance with the License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed under the
|
||||||
|
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||||||
|
// either express or implied. See the License for the specific language governing permissions
|
||||||
|
// and limitations under the License.
|
||||||
|
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package rocksdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/blevesearch/bleve/index/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
var rocksdbTestOptions = map[string]interface{}{
|
||||||
|
"create_if_missing": true,
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGoRocksDBStore(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
err := os.RemoveAll("test")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
s, err := New("test", rocksdbTestOptions)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = s.Open()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
err := s.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
CommonTestKVStore(t, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReaderIsolation(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
err := os.RemoveAll("test")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
s, err := New("test", rocksdbTestOptions)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = s.Open()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
err := s.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
CommonTestReaderIsolation(t, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func CommonTestKVStore(t *testing.T, s store.KVStore) {
|
||||||
|
|
||||||
|
writer, err := s.Writer()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
err = writer.Set([]byte("a"), []byte("val-a"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = writer.Set([]byte("z"), []byte("val-z"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = writer.Delete([]byte("z"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
batch := writer.NewBatch()
|
||||||
|
batch.Set([]byte("b"), []byte("val-b"))
|
||||||
|
batch.Set([]byte("c"), []byte("val-c"))
|
||||||
|
batch.Set([]byte("d"), []byte("val-d"))
|
||||||
|
batch.Set([]byte("e"), []byte("val-e"))
|
||||||
|
batch.Set([]byte("f"), []byte("val-f"))
|
||||||
|
batch.Set([]byte("g"), []byte("val-g"))
|
||||||
|
batch.Set([]byte("h"), []byte("val-h"))
|
||||||
|
batch.Set([]byte("i"), []byte("val-i"))
|
||||||
|
batch.Set([]byte("j"), []byte("val-j"))
|
||||||
|
|
||||||
|
err = batch.Execute()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = writer.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, err := s.Reader()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
err := reader.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
it := reader.Iterator([]byte("b"))
|
||||||
|
key, val, valid := it.Current()
|
||||||
|
if !valid {
|
||||||
|
t.Fatalf("valid false, expected true")
|
||||||
|
}
|
||||||
|
if string(key) != "b" {
|
||||||
|
t.Fatalf("expected key b, got %s", key)
|
||||||
|
}
|
||||||
|
if string(val) != "val-b" {
|
||||||
|
t.Fatalf("expected value val-b, got %s", val)
|
||||||
|
}
|
||||||
|
|
||||||
|
it.Next()
|
||||||
|
key, val, valid = it.Current()
|
||||||
|
if !valid {
|
||||||
|
t.Fatalf("valid false, expected true")
|
||||||
|
}
|
||||||
|
if string(key) != "c" {
|
||||||
|
t.Fatalf("expected key c, got %s", key)
|
||||||
|
}
|
||||||
|
if string(val) != "val-c" {
|
||||||
|
t.Fatalf("expected value val-c, got %s", val)
|
||||||
|
}
|
||||||
|
|
||||||
|
it.Seek([]byte("i"))
|
||||||
|
key, val, valid = it.Current()
|
||||||
|
if !valid {
|
||||||
|
t.Fatalf("valid false, expected true")
|
||||||
|
}
|
||||||
|
if string(key) != "i" {
|
||||||
|
t.Fatalf("expected key i, got %s", key)
|
||||||
|
}
|
||||||
|
if string(val) != "val-i" {
|
||||||
|
t.Fatalf("expected value val-i, got %s", val)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = it.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func CommonTestReaderIsolation(t *testing.T, s store.KVStore) {
|
||||||
|
// insert a kv pair
|
||||||
|
writer, err := s.Writer()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
err = writer.Set([]byte("a"), []byte("val-a"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = writer.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// create an isolated reader
|
||||||
|
reader, err := s.Reader()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
err := reader.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// verify that we see the value already inserted
|
||||||
|
val, err := reader.Get([]byte("a"))
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(val, []byte("val-a")) {
|
||||||
|
t.Errorf("expected val-a, got nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify that an iterator sees it
|
||||||
|
count := 0
|
||||||
|
it := reader.Iterator([]byte{0})
|
||||||
|
defer func() {
|
||||||
|
err := it.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for it.Valid() {
|
||||||
|
it.Next()
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
if count != 1 {
|
||||||
|
t.Errorf("expected iterator to see 1, saw %d", count)
|
||||||
|
}
|
||||||
|
|
||||||
|
// add something after the reader was created
|
||||||
|
writer, err = s.Writer()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
err = writer.Set([]byte("b"), []byte("val-b"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = writer.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure that a newer reader sees it
|
||||||
|
newReader, err := s.Reader()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
err := newReader.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
val, err = newReader.Get([]byte("b"))
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(val, []byte("val-b")) {
|
||||||
|
t.Errorf("expected val-b, got nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure that the director iterator sees it
|
||||||
|
count = 0
|
||||||
|
it2 := newReader.Iterator([]byte{0})
|
||||||
|
defer func() {
|
||||||
|
err := it2.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for it2.Valid() {
|
||||||
|
it2.Next()
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
if count != 2 {
|
||||||
|
t.Errorf("expected iterator to see 2, saw %d", count)
|
||||||
|
}
|
||||||
|
|
||||||
|
// but that the isolated reader does not
|
||||||
|
val, err = reader.Get([]byte("b"))
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if val != nil {
|
||||||
|
t.Errorf("expected nil, got %v", val)
|
||||||
|
}
|
||||||
|
|
||||||
|
// and ensure that the iterator on the isolated reader also does not
|
||||||
|
count = 0
|
||||||
|
it3 := reader.Iterator([]byte{0})
|
||||||
|
defer func() {
|
||||||
|
err := it3.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for it3.Valid() {
|
||||||
|
it3.Next()
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
if count != 1 {
|
||||||
|
t.Errorf("expected iterator to see 1, saw %d", count)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
28
index/store/gorocksdb/util.go
Normal file
28
index/store/gorocksdb/util.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
// Copyright (c) 2014 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
||||||
|
// except in compliance with the License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed under the
|
||||||
|
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||||||
|
// either express or implied. See the License for the specific language governing permissions
|
||||||
|
// and limitations under the License.
|
||||||
|
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package rocksdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/tecbot/gorocksdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
func defaultWriteOptions() *gorocksdb.WriteOptions {
|
||||||
|
wo := gorocksdb.NewDefaultWriteOptions()
|
||||||
|
// request fsync on write for safety
|
||||||
|
wo.SetSync(true)
|
||||||
|
return wo
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultReadOptions() *gorocksdb.ReadOptions {
|
||||||
|
ro := gorocksdb.NewDefaultReadOptions()
|
||||||
|
return ro
|
||||||
|
}
|
64
index/store/gorocksdb/writer.go
Normal file
64
index/store/gorocksdb/writer.go
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
// Copyright (c) 2014 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
||||||
|
// except in compliance with the License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed under the
|
||||||
|
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||||||
|
// either express or implied. See the License for the specific language governing permissions
|
||||||
|
// and limitations under the License.
|
||||||
|
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package rocksdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/blevesearch/bleve/index/store"
|
||||||
|
"github.com/tecbot/gorocksdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Writer struct {
|
||||||
|
store *Store
|
||||||
|
}
|
||||||
|
|
||||||
|
func newWriter(store *Store) (*Writer, error) {
|
||||||
|
store.writer.Lock()
|
||||||
|
return &Writer{
|
||||||
|
store: store,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) BytesSafeAfterClose() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) Set(key, val []byte) error {
|
||||||
|
return w.store.setlocked(key, val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) Delete(key []byte) error {
|
||||||
|
return w.store.deletelocked(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) NewBatch() store.KVBatch {
|
||||||
|
rv := Batch{
|
||||||
|
w: w,
|
||||||
|
batch: gorocksdb.NewWriteBatch(),
|
||||||
|
}
|
||||||
|
return &rv
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) Close() error {
|
||||||
|
w.store.writer.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// these two methods can safely read using the regular
|
||||||
|
// methods without a read transaction, because we know
|
||||||
|
// that no one else is writing but us
|
||||||
|
func (w *Writer) Get(key []byte) ([]byte, error) {
|
||||||
|
return w.store.get(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) Iterator(key []byte) store.KVIterator {
|
||||||
|
return w.store.iterator(key)
|
||||||
|
}
|
82
index/upside_down/benchmark_gorocksdb_test.go
Normal file
82
index/upside_down/benchmark_gorocksdb_test.go
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
// Copyright (c) 2014 Couchbase, Inc.
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
||||||
|
// except in compliance with the License. You may obtain a copy of the License at
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed under the
|
||||||
|
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||||||
|
// either express or implied. See the License for the specific language governing permissions
|
||||||
|
// and limitations under the License.
|
||||||
|
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package upside_down
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/blevesearch/bleve/index/store"
|
||||||
|
"github.com/blevesearch/bleve/index/store/gorocksdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
var rocksdbTestOptions = map[string]interface{}{
|
||||||
|
"create_if_missing": true,
|
||||||
|
}
|
||||||
|
|
||||||
|
func CreateGoRocksDB() (store.KVStore, error) {
|
||||||
|
return rocksdb.New("test", rocksdbTestOptions)
|
||||||
|
}
|
||||||
|
|
||||||
|
func DestroyGoRocksDB() error {
|
||||||
|
return os.RemoveAll("test")
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRocksDBIndexing1Workers(b *testing.B) {
|
||||||
|
CommonBenchmarkIndex(b, CreateGoRocksDB, DestroyGoRocksDB, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRocksDBIndexing2Workers(b *testing.B) {
|
||||||
|
CommonBenchmarkIndex(b, CreateGoRocksDB, DestroyGoRocksDB, 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRocksDBIndexing4Workers(b *testing.B) {
|
||||||
|
CommonBenchmarkIndex(b, CreateGoRocksDB, DestroyGoRocksDB, 4)
|
||||||
|
}
|
||||||
|
|
||||||
|
// batches
|
||||||
|
|
||||||
|
func BenchmarkRocksDBIndexing1Workers10Batch(b *testing.B) {
|
||||||
|
CommonBenchmarkIndexBatch(b, CreateGoRocksDB, DestroyGoRocksDB, 1, 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRocksDBIndexing2Workers10Batch(b *testing.B) {
|
||||||
|
CommonBenchmarkIndexBatch(b, CreateGoRocksDB, DestroyGoRocksDB, 2, 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRocksDBIndexing4Workers10Batch(b *testing.B) {
|
||||||
|
CommonBenchmarkIndexBatch(b, CreateGoRocksDB, DestroyGoRocksDB, 4, 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRocksDBIndexing1Workers100Batch(b *testing.B) {
|
||||||
|
CommonBenchmarkIndexBatch(b, CreateGoRocksDB, DestroyGoRocksDB, 1, 100)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRocksDBIndexing2Workers100Batch(b *testing.B) {
|
||||||
|
CommonBenchmarkIndexBatch(b, CreateGoRocksDB, DestroyGoRocksDB, 2, 100)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRocksDBIndexing4Workers100Batch(b *testing.B) {
|
||||||
|
CommonBenchmarkIndexBatch(b, CreateGoRocksDB, DestroyGoRocksDB, 4, 100)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRocksDBIndexing1Workers1000Batch(b *testing.B) {
|
||||||
|
CommonBenchmarkIndexBatch(b, CreateGoRocksDB, DestroyGoRocksDB, 1, 1000)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRocksDBIndexing2Workers1000Batch(b *testing.B) {
|
||||||
|
CommonBenchmarkIndexBatch(b, CreateGoRocksDB, DestroyGoRocksDB, 2, 1000)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRocksDBIndexing4Workers1000Batch(b *testing.B) {
|
||||||
|
CommonBenchmarkIndexBatch(b, CreateGoRocksDB, DestroyGoRocksDB, 4, 1000)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user