0
0
Fork 0

refactor of kvstore api to support native merge in rocksdb

refactor to share code in emulated batch
refactor to share code in emulated merge
refactor index kvstore benchmarks to share more code
refactor index kvstore benchmarks to be more repeatable
This commit is contained in:
Marty Schoch 2015-04-24 17:11:47 -04:00
parent 4ef34e1b28
commit a9c07acbfa
56 changed files with 1493 additions and 2451 deletions

View File

@ -118,7 +118,7 @@ type configuration struct {
DefaultHighlighter string
DefaultKVStore string
SlowSearchLogThreshold time.Duration
analysisQueue upside_down.AnalysisQueue
analysisQueue *upside_down.AnalysisQueue
}
func newConfiguration() *configuration {

69
index/store/batch.go Normal file
View File

@ -0,0 +1,69 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package store
type op struct {
K []byte
V []byte
}
type EmulatedBatch struct {
w KVWriter
ops []*op
merge *EmulatedMerge
}
func NewEmulatedBatch(w KVWriter, mo MergeOperator) *EmulatedBatch {
return &EmulatedBatch{
w: w,
ops: make([]*op, 0, 1000),
merge: NewEmulatedMerge(mo),
}
}
func (b *EmulatedBatch) Set(key, val []byte) {
b.ops = append(b.ops, &op{key, val})
}
func (b *EmulatedBatch) Delete(key []byte) {
b.ops = append(b.ops, &op{key, nil})
}
func (b *EmulatedBatch) Merge(key, val []byte) {
b.merge.Merge(key, val)
}
func (b *EmulatedBatch) Execute() error {
// first process merges
err := b.merge.Execute(b.w)
if err != nil {
return err
}
// now apply all the ops
for _, op := range b.ops {
if op.V != nil {
err := b.w.Set(op.K, op.V)
if err != nil {
return err
}
} else {
err := b.w.Delete(op.K)
if err != nil {
return err
}
}
}
return nil
}
func (b *EmulatedBatch) Close() error {
return nil
}

View File

@ -1,86 +0,0 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package boltdb
import (
"github.com/blevesearch/bleve/index/store"
)
type op struct {
k []byte
v []byte
}
type Batch struct {
writer *Writer
ops []op
merges map[string]store.AssociativeMergeChain
}
func (i *Batch) Set(key, val []byte) {
i.ops = append(i.ops, op{key, val})
}
func (i *Batch) Delete(key []byte) {
i.ops = append(i.ops, op{key, nil})
}
func (i *Batch) Merge(key []byte, oper store.AssociativeMerge) {
opers, ok := i.merges[string(key)]
if !ok {
opers = make(store.AssociativeMergeChain, 0, 1)
}
opers = append(opers, oper)
i.merges[string(key)] = opers
}
func (i *Batch) Execute() error {
b := i.writer.tx.Bucket([]byte(i.writer.store.bucket))
// first process the merges
for k, mc := range i.merges {
val := b.Get([]byte(k))
var err error
val, err = mc.Merge([]byte(k), val)
if err != nil {
return err
}
if val == nil {
err := b.Delete([]byte(k))
if err != nil {
return err
}
} else {
err := b.Put([]byte(k), val)
if err != nil {
return err
}
}
}
// now process the regular get/set ops
for _, o := range i.ops {
if o.v == nil {
if err := b.Delete(o.k); err != nil {
return err
}
} else {
if err := b.Put(o.k, o.v); err != nil {
return err
}
}
}
return nil
}
func (i *Batch) Close() error {
return nil
}

View File

@ -25,30 +25,39 @@ type Store struct {
bucket string
db *bolt.DB
writer sync.Mutex
mo store.MergeOperator
}
func Open(path string, bucket string) (*Store, error) {
func New(path string, bucket string) *Store {
rv := Store{
path: path,
bucket: bucket,
}
return &rv
}
func (bs *Store) Open() error {
var err error
rv.db, err = bolt.Open(path, 0600, nil)
bs.db, err = bolt.Open(bs.path, 0600, nil)
if err != nil {
return nil, err
return err
}
err = rv.db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(rv.bucket))
err = bs.db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(bs.bucket))
return err
})
if err != nil {
return nil, err
return err
}
return &rv, nil
return nil
}
func (bs *Store) SetMergeOperator(mo store.MergeOperator) {
bs.mo = mo
}
func (bs *Store) Close() error {
@ -95,7 +104,7 @@ func StoreConstructor(config map[string]interface{}) (store.KVStore, error) {
bucket = "bleve"
}
return Open(path, bucket)
return New(path, bucket), nil
}
func init() {

View File

@ -18,7 +18,8 @@ import (
)
func TestStore(t *testing.T) {
s, err := Open("test", "bleve")
s := New("test", "bleve")
err := s.Open()
if err != nil {
t.Fatal(err)
}
@ -33,7 +34,8 @@ func TestStore(t *testing.T) {
}
func TestReaderIsolation(t *testing.T) {
s, err := Open("test", "bleve")
s := New("test", "bleve")
err := s.Open()
if err != nil {
t.Fatal(err)
}

View File

@ -29,12 +29,7 @@ func (w *Writer) Delete(key []byte) error {
}
func (w *Writer) NewBatch() store.KVBatch {
rv := Batch{
writer: w,
ops: make([]op, 0),
merges: make(map[string]store.AssociativeMergeChain),
}
return &rv
return store.NewEmulatedBatch(w, w.store.mo)
}
func (w *Writer) Close() error {

View File

@ -0,0 +1,88 @@
// Copyright (c) 2015 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
// +build go1.4
package cznicb
import ()
type op struct {
k []byte
v []byte
}
type Batch struct {
s *Store
ops []op
merges map[string][][]byte
}
func (b *Batch) Set(k, v []byte) {
b.ops = append(b.ops, op{k, v})
}
func (b *Batch) Delete(k []byte) {
b.ops = append(b.ops, op{k, nil})
}
func (b *Batch) Merge(key, val []byte) {
ops, ok := b.merges[string(key)]
if ok && len(ops) > 0 {
last := ops[len(ops)-1]
mergedVal, partialMergeOk := b.s.mo.PartialMerge(key, last, val)
if partialMergeOk {
// replace last entry with the result of the merge
ops[len(ops)-1] = mergedVal
} else {
// could not partial merge, append this to the end
ops = append(ops, val)
}
} else {
ops = [][]byte{val}
}
b.merges[string(key)] = ops
}
func (b *Batch) Execute() (err error) {
b.s.m.Lock()
defer b.s.m.Unlock()
t := b.s.t
for key, mergeOps := range b.merges {
k := []byte(key)
t.Put(k, func(oldV interface{}, exists bool) (newV interface{}, write bool) {
ob := []byte(nil)
if exists && oldV != nil {
ob = oldV.([]byte)
}
mergedVal, fullMergeOk := b.s.mo.FullMerge(k, ob, mergeOps)
if !fullMergeOk {
return nil, false
}
return mergedVal, true
})
}
for _, op := range b.ops {
if op.v != nil {
t.Set(op.k, op.v)
} else {
t.Delete(op.k)
}
}
return nil
}
func (b *Batch) Close() error {
return nil
}

View File

@ -18,7 +18,7 @@ package cznicb
import (
"bytes"
"errors"
"fmt"
"sync"
"github.com/blevesearch/bleve/index/store"
@ -29,15 +29,21 @@ import (
const Name = "cznicb"
var iteratorDoneErr = errors.New("iteratorDoneErr") // A sentinel value.
const MAX_CONCURRENT_WRITERS = 1
func init() {
registry.RegisterKVStore(Name, StoreConstructor)
}
func StoreConstructor(config map[string]interface{}) (
store.KVStore, error) {
return &Store{t: b.TreeNew(itemCompare)}, nil
func StoreConstructor(config map[string]interface{}) (store.KVStore, error) {
s := &Store{
t: b.TreeNew(itemCompare),
availableWriters: make(chan bool, MAX_CONCURRENT_WRITERS),
}
for i := 0; i < MAX_CONCURRENT_WRITERS; i++ {
s.availableWriters <- true
}
return s, nil
}
func itemCompare(a, b interface{}) int {
@ -45,209 +51,62 @@ func itemCompare(a, b interface{}) int {
}
type Store struct {
m sync.Mutex
t *b.Tree
availableWriters chan bool
m sync.RWMutex
t *b.Tree
mo store.MergeOperator
}
type Iterator struct { // Assuming that iterators are used single-threaded.
s *Store
e *b.Enumerator
currK interface{}
currV interface{}
currErr error
func (s *Store) Open() error {
return nil
}
type op struct {
k []byte
v []byte
func (s *Store) SetMergeOperator(mo store.MergeOperator) {
s.mo = mo
}
type Batch struct {
s *Store
ops []op
ms map[string]store.AssociativeMergeChain
func (s *Store) Reader() (store.KVReader, error) {
return &Reader{s: s}, nil
}
func (s *Store) Writer() (store.KVWriter, error) {
available, ok := <-s.availableWriters
if !ok || !available {
return nil, fmt.Errorf("no available writers")
}
return &Writer{s: s, r: &Reader{s: s}}, nil
}
func (s *Store) Close() error {
return nil
}
func (s *Store) Reader() (store.KVReader, error) {
return s, nil
}
func (s *Store) BytesSafeAfterClose() bool {
return false
}
func (s *Store) Writer() (store.KVWriter, error) {
return s, nil
}
func (s *Store) Get(k []byte) ([]byte, error) {
s.m.Lock()
func (s *Store) get(k []byte) ([]byte, error) {
s.m.RLock()
defer s.m.RUnlock()
v, ok := s.t.Get(k)
s.m.Unlock()
if !ok || v == nil {
return nil, nil
}
return v.([]byte), nil
}
func (s *Store) Iterator(k []byte) store.KVIterator {
func (s *Store) iterator(k []byte) store.KVIterator {
iter := &Iterator{s: s}
iter.Seek(k)
return iter
}
func (s *Store) Set(k, v []byte) (err error) {
func (s *Store) set(k, v []byte) (err error) {
s.m.Lock()
defer s.m.Unlock()
s.t.Set(k, v)
s.m.Unlock()
return nil
}
func (s *Store) Delete(k []byte) (err error) {
func (s *Store) delete(k []byte) (err error) {
s.m.Lock()
defer s.m.Unlock()
s.t.Delete(k)
s.m.Unlock()
return nil
}
func (s *Store) NewBatch() store.KVBatch {
return &Batch{
s: s,
ops: make([]op, 0, 1000),
ms: map[string]store.AssociativeMergeChain{},
}
}
func (w *Iterator) SeekFirst() {
w.currK = nil
w.currV = nil
w.currErr = nil
var err error
w.s.m.Lock()
w.e, err = w.s.t.SeekFirst()
w.s.m.Unlock()
if err != nil {
w.currK = nil
w.currV = nil
w.currErr = iteratorDoneErr
}
w.Next()
}
func (w *Iterator) Seek(k []byte) {
w.currK = nil
w.currV = nil
w.currErr = nil
w.s.m.Lock()
w.e, _ = w.s.t.Seek(k)
w.s.m.Unlock()
w.Next()
}
func (w *Iterator) Next() {
if w.currErr != nil {
w.currK = nil
w.currV = nil
w.currErr = iteratorDoneErr
return
}
w.s.m.Lock()
w.currK, w.currV, w.currErr = w.e.Next()
w.s.m.Unlock()
}
func (w *Iterator) Current() ([]byte, []byte, bool) {
if w.currErr == iteratorDoneErr ||
w.currK == nil ||
w.currV == nil {
return nil, nil, false
}
return w.currK.([]byte), w.currV.([]byte), true
}
func (w *Iterator) Key() []byte {
k, _, ok := w.Current()
if !ok {
return nil
}
return k
}
func (w *Iterator) Value() []byte {
_, v, ok := w.Current()
if !ok {
return nil
}
return v
}
func (w *Iterator) Valid() bool {
_, _, ok := w.Current()
return ok
}
func (w *Iterator) Close() error {
if w.e != nil {
w.e.Close()
}
w.e = nil
return nil
}
func (w *Batch) Set(k, v []byte) {
w.ops = append(w.ops, op{k, v})
}
func (w *Batch) Delete(k []byte) {
w.ops = append(w.ops, op{k, nil})
}
func (w *Batch) Merge(k []byte, oper store.AssociativeMerge) {
w.ms[string(k)] = append(w.ms[string(k)], oper)
}
func (w *Batch) Execute() (err error) {
w.s.m.Lock()
defer w.s.m.Unlock()
t := w.s.t
for key, mc := range w.ms {
k := []byte(key)
t.Put(k, func(oldV interface{}, exists bool) (newV interface{}, write bool) {
b := []byte(nil)
if exists && oldV != nil {
b = oldV.([]byte)
}
b, err := mc.Merge(k, b)
if err != nil {
return nil, false
}
return b, b != nil
})
}
for _, op := range w.ops {
if op.v != nil {
t.Set(op.k, op.v)
} else {
t.Delete(op.k)
}
}
return nil
}
func (w *Batch) Close() error {
return nil
}

View File

@ -0,0 +1,113 @@
// Copyright (c) 2015 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
// +build go1.4
package cznicb
import (
"errors"
"github.com/cznic/b"
)
var iteratorDoneErr = errors.New("iteratorDoneErr") // A sentinel value.
type Iterator struct { // Assuming that iterators are used single-threaded.
s *Store
e *b.Enumerator
currK interface{}
currV interface{}
currErr error
}
func (i *Iterator) SeekFirst() {
i.currK = nil
i.currV = nil
i.currErr = nil
var err error
i.s.m.RLock()
i.e, err = i.s.t.SeekFirst()
i.s.m.RUnlock() // cannot defer, must unlock before Next
if err != nil {
i.currK = nil
i.currV = nil
i.currErr = iteratorDoneErr
}
i.Next()
}
func (i *Iterator) Seek(k []byte) {
i.currK = nil
i.currV = nil
i.currErr = nil
i.s.m.RLock()
i.e, _ = i.s.t.Seek(k)
i.s.m.RUnlock() // cannot defer, must unlock before Next
i.Next()
}
func (i *Iterator) Next() {
if i.currErr != nil {
i.currK = nil
i.currV = nil
i.currErr = iteratorDoneErr
return
}
i.s.m.RLock()
defer i.s.m.RUnlock()
i.currK, i.currV, i.currErr = i.e.Next()
}
func (i *Iterator) Current() ([]byte, []byte, bool) {
if i.currErr == iteratorDoneErr ||
i.currK == nil ||
i.currV == nil {
return nil, nil, false
}
return i.currK.([]byte), i.currV.([]byte), true
}
func (i *Iterator) Key() []byte {
k, _, ok := i.Current()
if !ok {
return nil
}
return k
}
func (i *Iterator) Value() []byte {
_, v, ok := i.Current()
if !ok {
return nil
}
return v
}
func (i *Iterator) Valid() bool {
_, _, ok := i.Current()
return ok
}
func (i *Iterator) Close() error {
if i.e != nil {
i.e.Close()
}
i.e = nil
return nil
}

View File

@ -0,0 +1,44 @@
// Copyright (c) 2015 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
// +build go1.4
package cznicb
import (
"github.com/blevesearch/bleve/index/store"
)
type Reader struct {
s *Store
}
func newReader(s *Store) (*Reader, error) {
return &Reader{
s: s,
}, nil
}
func (r *Reader) BytesSafeAfterClose() bool {
return false
}
func (r *Reader) Get(key []byte) ([]byte, error) {
return r.s.get(key)
}
func (r *Reader) Iterator(key []byte) store.KVIterator {
return r.s.iterator(key)
}
func (r *Reader) Close() error {
return nil
}

View File

@ -0,0 +1,55 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
// +build go1.4
package cznicb
import (
"github.com/blevesearch/bleve/index/store"
)
type Writer struct {
s *Store
r *Reader
}
func (w *Writer) BytesSafeAfterClose() bool {
return false
}
func (w *Writer) Set(key, val []byte) error {
return w.s.set(key, val)
}
func (w *Writer) Delete(key []byte) error {
return w.s.delete(key)
}
func (w *Writer) NewBatch() store.KVBatch {
return &Batch{
s: w.s,
ops: make([]op, 0, 1000),
merges: make(map[string][][]byte),
}
}
func (w *Writer) Close() error {
w.s.availableWriters <- true
w.s = nil
return nil
}
func (w *Writer) Get(key []byte) ([]byte, error) {
return w.r.s.get(key)
}
func (w *Writer) Iterator(key []byte) store.KVIterator {
return w.r.s.iterator(key)
}

View File

@ -12,7 +12,7 @@
package forestdb
import (
indexStore "github.com/blevesearch/bleve/index/store"
"fmt"
)
type op struct {
@ -21,81 +21,64 @@ type op struct {
}
type Batch struct {
store *Store
ops []op
alreadyLocked bool
merges map[string]indexStore.AssociativeMergeChain
s *Store
ops []op
merges map[string][][]byte
}
func newBatch(store *Store) *Batch {
rv := Batch{
store: store,
ops: make([]op, 0),
merges: make(map[string]indexStore.AssociativeMergeChain),
}
return &rv
func (b *Batch) Set(k, v []byte) {
b.ops = append(b.ops, op{k, v})
}
func newBatchAlreadyLocked(store *Store) *Batch {
rv := Batch{
store: store,
ops: make([]op, 0),
alreadyLocked: true,
merges: make(map[string]indexStore.AssociativeMergeChain),
}
return &rv
func (b *Batch) Delete(k []byte) {
b.ops = append(b.ops, op{k, nil})
}
func (b *Batch) Set(key, val []byte) {
b.ops = append(b.ops, op{key, val})
}
func (b *Batch) Delete(key []byte) {
b.ops = append(b.ops, op{key, nil})
}
func (b *Batch) Merge(key []byte, oper indexStore.AssociativeMerge) {
opers, ok := b.merges[string(key)]
if !ok {
opers = make(indexStore.AssociativeMergeChain, 0, 1)
}
opers = append(opers, oper)
b.merges[string(key)] = opers
}
func (b *Batch) Execute() error {
if !b.alreadyLocked {
b.store.writer.Lock()
defer b.store.writer.Unlock()
}
// first process the merges
for k, mc := range b.merges {
val, err := b.store.get([]byte(k))
if err != nil {
return err
}
val, err = mc.Merge([]byte(k), val)
if err != nil {
return err
}
if val == nil {
b.store.deletelocked([]byte(k))
func (b *Batch) Merge(key, val []byte) {
ops, ok := b.merges[string(key)]
if ok && len(ops) > 0 {
last := ops[len(ops)-1]
mergedVal, partialMergeOk := b.s.mo.PartialMerge(key, last, val)
if partialMergeOk {
// replace last entry with the result of the merge
ops[len(ops)-1] = mergedVal
} else {
b.store.setlocked([]byte(k), val)
// could not partial merge, append this to the end
ops = append(ops, val)
}
} else {
ops = [][]byte{val}
}
b.merges[string(key)] = ops
}
func (b *Batch) Execute() (err error) {
for k, mergeOps := range b.merges {
kb := []byte(k)
existingVal, err := b.s.get(kb)
if err != nil {
return err
}
mergedVal, fullMergeOk := b.s.mo.FullMerge(kb, existingVal, mergeOps)
if !fullMergeOk {
return fmt.Errorf("merge operator returned failure")
}
err = b.s.setlocked(kb, mergedVal)
if err != nil {
return err
}
}
// now add all the other ops to the batch
for _, op := range b.ops {
if op.v == nil {
b.store.deletelocked(op.k)
if op.v != nil {
b.s.setlocked(op.k, op.v)
} else {
b.store.setlocked(op.k, op.v)
b.s.deletelocked(op.k)
}
}
return b.store.commit()
return b.s.commit()
}
func (b *Batch) Close() error {

View File

@ -51,9 +51,10 @@ type Store struct {
dbfile *forestdb.File
dbkv *forestdb.KVStore
writer sync.Mutex
mo store.MergeOperator
}
func Open(path string, createIfMissing bool,
func New(path string, createIfMissing bool,
config map[string]interface{}) (*Store, error) {
if config == nil {
config = map[string]interface{}{}
@ -76,19 +77,28 @@ func Open(path string, createIfMissing bool,
rv.kvconfig.SetCreateIfMissing(true)
}
rv.dbfile, err = forestdb.Open(rv.path, rv.config)
if err != nil {
return nil, err
}
rv.dbkv, err = rv.dbfile.OpenKVStoreDefault(rv.kvconfig)
if err != nil {
return nil, err
}
return &rv, nil
}
func (s *Store) Open() error {
var err error
s.dbfile, err = forestdb.Open(s.path, s.config)
if err != nil {
return err
}
s.dbkv, err = s.dbfile.OpenKVStoreDefault(s.kvconfig)
if err != nil {
return err
}
return nil
}
func (s *Store) SetMergeOperator(mo store.MergeOperator) {
s.mo = mo
}
func (s *Store) get(key []byte) ([]byte, error) {
res, err := s.dbkv.GetKV(key)
if err != nil && err != forestdb.RESULT_KEY_NOT_FOUND {
@ -144,10 +154,6 @@ func (ldbs *Store) Writer() (store.KVWriter, error) {
return newWriter(ldbs)
}
func (ldbs *Store) newBatch() store.KVBatch {
return newBatch(ldbs)
}
func (s *Store) getSeqNum() (forestdb.SeqNum, error) {
dbinfo, err := s.dbkv.Info()
if err != nil {
@ -211,7 +217,7 @@ func StoreConstructor(config map[string]interface{}) (store.KVStore, error) {
if ok {
createIfMissing = cim
}
return Open(path, createIfMissing, config)
return New(path, createIfMissing, config)
}
func init() {

View File

@ -31,7 +31,11 @@ func TestForestDBStore(t *testing.T) {
if err != nil {
t.Fatal(err)
}
s, err := Open("testdir/test", true, nil)
s, err := New("testdir/test", true, nil)
if err != nil {
t.Fatal(err)
}
err = s.Open()
if err != nil {
t.Fatal(err)
}
@ -57,7 +61,11 @@ func TestReaderIsolation(t *testing.T) {
if err != nil {
t.Fatal(err)
}
s, err := Open("testdir/test", true, nil)
s, err := New("testdir/test", true, nil)
if err != nil {
t.Fatal(err)
}
err = s.Open()
if err != nil {
t.Fatal(err)
}
@ -86,7 +94,11 @@ func TestRollbackSameHandle(t *testing.T) {
if err != nil {
t.Fatal(err)
}
s, err := Open("testdir/test", true, nil)
s, err := New("testdir/test", true, nil)
if err != nil {
t.Fatal(err)
}
err = s.Open()
if err != nil {
t.Fatal(err)
}
@ -186,7 +198,11 @@ func TestRollbackNewHandle(t *testing.T) {
if err != nil {
t.Fatal(err)
}
s, err := Open("testdir/test", true, nil)
s, err := New("testdir/test", true, nil)
if err != nil {
t.Fatal(err)
}
err = s.Open()
if err != nil {
t.Fatal(err)
}
@ -254,7 +270,11 @@ func TestRollbackNewHandle(t *testing.T) {
}
// now lets open another handle
s2, err := Open("testdir/test", true, nil)
s2, err := New("testdir/test", true, nil)
if err != nil {
t.Fatal(err)
}
err = s2.Open()
if err != nil {
t.Fatal(err)
}
@ -293,7 +313,11 @@ func TestRollbackOtherHandle(t *testing.T) {
if err != nil {
t.Fatal(err)
}
s, err := Open("testdir/test", true, nil)
s, err := New("testdir/test", true, nil)
if err != nil {
t.Fatal(err)
}
err = s.Open()
if err != nil {
t.Fatal(err)
}
@ -305,7 +329,11 @@ func TestRollbackOtherHandle(t *testing.T) {
}()
// open another handle at the same time
s2, err := Open("testdir/test", true, nil)
s2, err := New("testdir/test", true, nil)
if err != nil {
t.Fatal(err)
}
err = s2.Open()
if err != nil {
t.Fatal(err)
}

View File

@ -47,7 +47,11 @@ func (w *Writer) Delete(key []byte) error {
}
func (w *Writer) NewBatch() store.KVBatch {
return newBatchAlreadyLocked(w.store)
return &Batch{
s: w.store,
ops: make([]op, 0, 1000),
merges: make(map[string][][]byte),
}
}
func (w *Writer) Close() error {

View File

@ -10,97 +10,44 @@
package goleveldb
import (
indexStore "github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/index/store"
"github.com/syndtr/goleveldb/leveldb"
)
type op struct {
k []byte
v []byte
}
type Batch struct {
store *Store
ops []op
alreadyLocked bool
merges map[string]indexStore.AssociativeMergeChain
w *Writer
merge *store.EmulatedMerge
batch *leveldb.Batch
}
func newBatch(store *Store) *Batch {
rv := Batch{
store: store,
ops: make([]op, 0, 1000),
merges: make(map[string]indexStore.AssociativeMergeChain),
func (b *Batch) Set(key, val []byte) {
b.batch.Put(key, val)
}
func (b *Batch) Delete(key []byte) {
b.batch.Delete(key)
}
func (b *Batch) Merge(key, val []byte) {
b.merge.Merge(key, val)
}
func (b *Batch) Execute() error {
// first process merges
ops, err := b.merge.ExecuteDeferred(b.w)
if err != nil {
return err
}
return &rv
}
func newBatchAlreadyLocked(store *Store) *Batch {
rv := Batch{
store: store,
ops: make([]op, 0, 1000),
alreadyLocked: true,
merges: make(map[string]indexStore.AssociativeMergeChain),
}
return &rv
}
func (ldb *Batch) Set(key, val []byte) {
ldb.ops = append(ldb.ops, op{key, val})
}
func (ldb *Batch) Delete(key []byte) {
ldb.ops = append(ldb.ops, op{key, nil})
}
func (ldb *Batch) Merge(key []byte, oper indexStore.AssociativeMerge) {
opers, ok := ldb.merges[string(key)]
if !ok {
opers = make(indexStore.AssociativeMergeChain, 0, 1)
}
opers = append(opers, oper)
ldb.merges[string(key)] = opers
}
func (ldb *Batch) Execute() error {
if !ldb.alreadyLocked {
ldb.store.writer.Lock()
defer ldb.store.writer.Unlock()
}
batch := new(leveldb.Batch)
// first process the merges
for k, mc := range ldb.merges {
val, err := ldb.store.get([]byte(k))
if err != nil {
return err
}
val, err = mc.Merge([]byte(k), val)
if err != nil {
return err
}
if val == nil {
batch.Delete([]byte(k))
} else {
batch.Put([]byte(k), val)
}
}
// now add all the other ops to the batch
for _, op := range ldb.ops {
if op.v == nil {
batch.Delete(op.k)
} else {
batch.Put(op.k, op.v)
}
for _, op := range ops {
b.batch.Put(op.K, op.V)
}
wopts := defaultWriteOptions()
err := ldb.store.db.Write(batch, wopts)
err = b.w.store.db.Write(b.batch, wopts)
return err
}
func (ldb *Batch) Close() error {
func (b *Batch) Close() error {
return nil
}

View File

@ -27,9 +27,10 @@ type Store struct {
opts *opt.Options
db *leveldb.DB
writer sync.Mutex
mo store.MergeOperator
}
func Open(path string, config map[string]interface{}) (*Store, error) {
func New(path string, config map[string]interface{}) (*Store, error) {
rv := Store{
path: path,
opts: &opt.Options{},
@ -40,14 +41,22 @@ func Open(path string, config map[string]interface{}) (*Store, error) {
return nil, err
}
rv.db, err = leveldb.OpenFile(rv.path, rv.opts)
if err != nil {
return nil, err
}
return &rv, nil
}
func (ldbs *Store) Open() error {
var err error
ldbs.db, err = leveldb.OpenFile(ldbs.path, ldbs.opts)
if err != nil {
return err
}
return nil
}
func (ldbs *Store) SetMergeOperator(mo store.MergeOperator) {
ldbs.mo = mo
}
func (ldbs *Store) get(key []byte) ([]byte, error) {
options := defaultReadOptions()
b, err := ldbs.db.Get(key, options)
@ -108,16 +117,12 @@ func (ldbs *Store) Writer() (store.KVWriter, error) {
return newWriter(ldbs)
}
func (ldbs *Store) newBatch() store.KVBatch {
return newBatch(ldbs)
}
func StoreConstructor(config map[string]interface{}) (store.KVStore, error) {
path, ok := config["path"].(string)
if !ok {
return nil, fmt.Errorf("must specify path")
}
return Open(path, config)
return New(path, config)
}
func init() {

View File

@ -29,7 +29,11 @@ func TestLevelDBStore(t *testing.T) {
}
}()
s, err := Open("test", leveldbTestOptions)
s, err := New("test", leveldbTestOptions)
if err != nil {
t.Fatal(err)
}
err = s.Open()
if err != nil {
t.Fatal(err)
}
@ -51,7 +55,11 @@ func TestReaderIsolation(t *testing.T) {
}
}()
s, err := Open("test", leveldbTestOptions)
s, err := New("test", leveldbTestOptions)
if err != nil {
t.Fatal(err)
}
err = s.Open()
if err != nil {
t.Fatal(err)
}

View File

@ -11,6 +11,7 @@ package goleveldb
import (
"github.com/blevesearch/bleve/index/store"
"github.com/syndtr/goleveldb/leveldb"
)
type Writer struct {
@ -37,7 +38,12 @@ func (w *Writer) Delete(key []byte) error {
}
func (w *Writer) NewBatch() store.KVBatch {
return newBatchAlreadyLocked(w.store)
rv := Batch{
w: w,
merge: store.NewEmulatedMerge(w.store.mo),
batch: new(leveldb.Batch),
}
return &rv
}
func (w *Writer) Close() error {

View File

@ -17,7 +17,6 @@ package gtreap
import (
"bytes"
"fmt"
"math/rand"
"sync"
"github.com/blevesearch/bleve/index/store"
@ -59,34 +58,20 @@ type Store struct {
m sync.Mutex
t *gtreap.Treap
}
type Reader struct {
t *gtreap.Treap
mo store.MergeOperator
}
type Writer struct {
s *Store
}
type Iterator struct {
t *gtreap.Treap
m sync.Mutex
cancelCh chan struct{}
nextCh chan *Item
curr *Item
currOk bool
func (s *Store) Open() error {
return nil
}
func newIterator(t *gtreap.Treap) *Iterator {
return &Iterator{t: t}
}
type Batch struct {
s *Store
items []*Item
ms map[string]store.AssociativeMergeChain
func (s *Store) SetMergeOperator(mo store.MergeOperator) {
s.mo = mo
}
func (s *Store) Close() error {
@ -109,240 +94,3 @@ func (s *Store) Writer() (store.KVWriter, error) {
return &Writer{s: s}, nil
}
func (w *Reader) BytesSafeAfterClose() bool {
return false
}
func (w *Reader) Get(k []byte) (v []byte, err error) {
itm := w.t.Get(&Item{k: k})
if itm != nil {
return itm.(*Item).v, nil
}
return nil, nil
}
func (w *Reader) Iterator(k []byte) store.KVIterator {
return newIterator(w.t).restart(&Item{k: k})
}
func (w *Reader) Close() error {
return nil
}
func (w *Writer) BytesSafeAfterClose() bool {
return false
}
func (w *Writer) Get(k []byte) (v []byte, err error) {
w.s.m.Lock()
t := w.s.t
w.s.m.Unlock()
itm := t.Get(&Item{k: k})
if itm != nil {
return itm.(*Item).v, nil
}
return nil, nil
}
func (w *Writer) Iterator(k []byte) store.KVIterator {
w.s.m.Lock()
t := w.s.t
w.s.m.Unlock()
return newIterator(t).restart(&Item{k: k})
}
func (w *Writer) Close() error {
w.s.availableWriters <- true
w.s = nil
return nil
}
func (w *Writer) Set(k, v []byte) (err error) {
w.s.m.Lock()
w.s.t = w.s.t.Upsert(&Item{k: k, v: v}, rand.Int())
w.s.m.Unlock()
return nil
}
func (w *Writer) Delete(k []byte) (err error) {
w.s.m.Lock()
w.s.t = w.s.t.Delete(&Item{k: k})
w.s.m.Unlock()
return nil
}
func (w *Writer) NewBatch() store.KVBatch {
return &Batch{
s: w.s,
items: make([]*Item, 0, 100),
ms: map[string]store.AssociativeMergeChain{},
}
}
func (w *Iterator) SeekFirst() {
min := w.t.Min()
if min != nil {
w.restart(min.(*Item))
} else {
w.restart(nil)
}
}
func (w *Iterator) Seek(k []byte) {
w.restart(&Item{k: k})
}
func (w *Iterator) restart(start *Item) *Iterator {
cancelCh := make(chan struct{})
nextCh := make(chan *Item, 1)
w.m.Lock()
if w.cancelCh != nil {
close(w.cancelCh)
}
w.cancelCh = cancelCh
w.nextCh = nextCh
w.curr = nil
w.currOk = false
w.m.Unlock()
go func() {
if start != nil {
w.t.VisitAscend(start, func(itm gtreap.Item) bool {
select {
case <-cancelCh:
return false
case nextCh <- itm.(*Item):
return true
}
})
}
close(nextCh)
}()
w.Next()
return w
}
func (w *Iterator) Next() {
w.m.Lock()
nextCh := w.nextCh
w.m.Unlock()
w.curr, w.currOk = <-nextCh
}
func (w *Iterator) Current() ([]byte, []byte, bool) {
w.m.Lock()
defer w.m.Unlock()
if !w.currOk || w.curr == nil {
return nil, nil, false
}
return w.curr.k, w.curr.v, w.currOk
}
func (w *Iterator) Key() []byte {
k, _, ok := w.Current()
if !ok {
return nil
}
return k
}
func (w *Iterator) Value() []byte {
_, v, ok := w.Current()
if !ok {
return nil
}
return v
}
func (w *Iterator) Valid() bool {
_, _, ok := w.Current()
return ok
}
func (w *Iterator) Close() error {
w.m.Lock()
if w.cancelCh != nil {
close(w.cancelCh)
}
w.cancelCh = nil
w.nextCh = nil
w.curr = nil
w.currOk = false
w.m.Unlock()
return nil
}
func (w *Batch) Set(k, v []byte) {
w.items = append(w.items, &Item{k, v})
}
func (w *Batch) Delete(k []byte) {
w.items = append(w.items, &Item{k, nil})
}
func (w *Batch) Merge(k []byte, oper store.AssociativeMerge) {
w.ms[string(k)] = append(w.ms[string(k)], oper)
}
func (w *Batch) Execute() (err error) {
done := false
for !done {
w.s.m.Lock()
torig := w.s.t
w.s.m.Unlock()
t := torig
for key, mc := range w.ms {
k := []byte(key)
itm := t.Get(&Item{k: k})
v := []byte(nil)
if itm != nil {
v = itm.(*Item).v
}
// NOTE: mc.Merge() doesn't modify its k or v params.
v, err := mc.Merge(k, v)
if err != nil {
return err
}
if v != nil {
// NOTE: We don't re-copy the result from mc.Merge(),
// as it'll return brand new unshared bytes.
t = t.Upsert(&Item{k: k, v: v}, rand.Int())
} else {
t = t.Delete(&Item{k: k})
}
}
for _, item := range w.items {
v := item.v
if v != nil {
t = t.Upsert(item, rand.Int())
} else {
t = t.Delete(item)
}
}
w.s.m.Lock()
if w.s.t == torig {
w.s.t = t
done = true
}
w.s.m.Unlock()
}
return nil
}
func (w *Batch) Close() error {
return nil
}

View File

@ -0,0 +1,132 @@
// Copyright (c) 2015 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
// Package gtreap provides an in-memory implementation of the
// KVStore interfaces using the gtreap balanced-binary treap,
// copy-on-write data structure.
package gtreap
import (
"sync"
"github.com/steveyen/gtreap"
)
type Iterator struct {
t *gtreap.Treap
m sync.Mutex
cancelCh chan struct{}
nextCh chan *Item
curr *Item
currOk bool
}
func newIterator(t *gtreap.Treap) *Iterator {
return &Iterator{t: t}
}
func (w *Iterator) SeekFirst() {
min := w.t.Min()
if min != nil {
w.restart(min.(*Item))
} else {
w.restart(nil)
}
}
func (w *Iterator) Seek(k []byte) {
w.restart(&Item{k: k})
}
func (w *Iterator) restart(start *Item) *Iterator {
cancelCh := make(chan struct{})
nextCh := make(chan *Item, 1)
w.m.Lock()
if w.cancelCh != nil {
close(w.cancelCh)
}
w.cancelCh = cancelCh
w.nextCh = nextCh
w.curr = nil
w.currOk = false
w.m.Unlock()
go func() {
if start != nil {
w.t.VisitAscend(start, func(itm gtreap.Item) bool {
select {
case <-cancelCh:
return false
case nextCh <- itm.(*Item):
return true
}
})
}
close(nextCh)
}()
w.Next()
return w
}
func (w *Iterator) Next() {
w.m.Lock()
nextCh := w.nextCh
w.m.Unlock()
w.curr, w.currOk = <-nextCh
}
func (w *Iterator) Current() ([]byte, []byte, bool) {
w.m.Lock()
defer w.m.Unlock()
if !w.currOk || w.curr == nil {
return nil, nil, false
}
return w.curr.k, w.curr.v, w.currOk
}
func (w *Iterator) Key() []byte {
k, _, ok := w.Current()
if !ok {
return nil
}
return k
}
func (w *Iterator) Value() []byte {
_, v, ok := w.Current()
if !ok {
return nil
}
return v
}
func (w *Iterator) Valid() bool {
_, _, ok := w.Current()
return ok
}
func (w *Iterator) Close() error {
w.m.Lock()
if w.cancelCh != nil {
close(w.cancelCh)
}
w.cancelCh = nil
w.nextCh = nil
w.curr = nil
w.currOk = false
w.m.Unlock()
return nil
}

View File

@ -0,0 +1,45 @@
// Copyright (c) 2015 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
// Package gtreap provides an in-memory implementation of the
// KVStore interfaces using the gtreap balanced-binary treap,
// copy-on-write data structure.
package gtreap
import (
"github.com/blevesearch/bleve/index/store"
"github.com/steveyen/gtreap"
)
type Reader struct {
t *gtreap.Treap
}
func (w *Reader) BytesSafeAfterClose() bool {
return false
}
func (w *Reader) Get(k []byte) (v []byte, err error) {
itm := w.t.Get(&Item{k: k})
if itm != nil {
return itm.(*Item).v, nil
}
return nil, nil
}
func (w *Reader) Iterator(k []byte) store.KVIterator {
return newIterator(w.t).restart(&Item{k: k})
}
func (w *Reader) Close() error {
return nil
}

View File

@ -0,0 +1,72 @@
// Copyright (c) 2015 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
// Package gtreap provides an in-memory implementation of the
// KVStore interfaces using the gtreap balanced-binary treap,
// copy-on-write data structure.
package gtreap
import (
"math/rand"
"github.com/blevesearch/bleve/index/store"
)
func (w *Writer) BytesSafeAfterClose() bool {
return false
}
func (w *Writer) Get(k []byte) (v []byte, err error) {
w.s.m.Lock()
t := w.s.t
w.s.m.Unlock()
itm := t.Get(&Item{k: k})
if itm != nil {
return itm.(*Item).v, nil
}
return nil, nil
}
func (w *Writer) Iterator(k []byte) store.KVIterator {
w.s.m.Lock()
t := w.s.t
w.s.m.Unlock()
return newIterator(t).restart(&Item{k: k})
}
func (w *Writer) Close() error {
w.s.availableWriters <- true
w.s = nil
return nil
}
func (w *Writer) Set(k, v []byte) (err error) {
w.s.m.Lock()
w.s.t = w.s.t.Upsert(&Item{k: k, v: v}, rand.Int())
w.s.m.Unlock()
return nil
}
func (w *Writer) Delete(k []byte) (err error) {
w.s.m.Lock()
w.s.t = w.s.t.Delete(&Item{k: k})
w.s.m.Unlock()
return nil
}
func (w *Writer) NewBatch() store.KVBatch {
return store.NewEmulatedBatch(w, w.s.mo)
}

View File

@ -1,111 +0,0 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package inmem
import (
indexStore "github.com/blevesearch/bleve/index/store"
)
type op struct {
k []byte
v []byte
}
type Batch struct {
store *Store
ops []op
alreadyLocked bool
merges map[string]indexStore.AssociativeMergeChain
}
func newBatch(store *Store) *Batch {
rv := Batch{
store: store,
ops: make([]op, 0, 100),
merges: make(map[string]indexStore.AssociativeMergeChain),
}
return &rv
}
func newBatchAlreadyLocked(store *Store) *Batch {
rv := Batch{
store: store,
ops: make([]op, 0, 100),
alreadyLocked: true,
merges: make(map[string]indexStore.AssociativeMergeChain),
}
return &rv
}
func (i *Batch) Set(key, val []byte) {
i.ops = append(i.ops, op{key, val})
}
func (i *Batch) Delete(key []byte) {
i.ops = append(i.ops, op{key, nil})
}
func (i *Batch) Merge(key []byte, oper indexStore.AssociativeMerge) {
opers, ok := i.merges[string(key)]
if !ok {
opers = make(indexStore.AssociativeMergeChain, 0, 1)
}
opers = append(opers, oper)
i.merges[string(key)] = opers
}
func (i *Batch) Execute() error {
if !i.alreadyLocked {
i.store.writer.Lock()
defer i.store.writer.Unlock()
}
// first process the merges
for k, mc := range i.merges {
val, err := i.store.get([]byte(k))
if err != nil {
return err
}
val, err = mc.Merge([]byte(k), val)
if err != nil {
return err
}
if val == nil {
err := i.store.deletelocked([]byte(k))
if err != nil {
return err
}
} else {
err := i.store.setlocked([]byte(k), val)
if err != nil {
return err
}
}
}
for _, op := range i.ops {
if op.v == nil {
err := i.store.deletelocked(op.k)
if err != nil {
return err
}
} else {
err := i.store.setlocked(op.k, op.v)
if err != nil {
return err
}
}
}
return nil
}
func (i *Batch) Close() error {
return nil
}

View File

@ -22,9 +22,10 @@ const Name = "mem"
type Store struct {
list *skiplist.SkipList
writer sync.Mutex
mo store.MergeOperator
}
func Open() (*Store, error) {
func New() (*Store, error) {
rv := Store{
list: skiplist.NewStringMap(),
}
@ -40,6 +41,14 @@ func MustOpen() *Store {
return &rv
}
func (i *Store) Open() error {
return nil
}
func (i *Store) SetMergeOperator(mo store.MergeOperator) {
i.mo = mo
}
func (i *Store) get(key []byte) ([]byte, error) {
val, ok := i.list.Get(string(key))
if ok {
@ -88,12 +97,8 @@ func (i *Store) Writer() (store.KVWriter, error) {
return newWriter(i)
}
func (i *Store) newBatch() store.KVBatch {
return newBatch(i)
}
func StoreConstructor(config map[string]interface{}) (store.KVStore, error) {
return Open()
return New()
}
func init() {

View File

@ -17,7 +17,7 @@ import (
)
func TestStore(t *testing.T) {
s, err := Open()
s, err := New()
if err != nil {
t.Fatal(err)
}

View File

@ -37,7 +37,7 @@ func (w *Writer) Delete(key []byte) error {
}
func (w *Writer) NewBatch() store.KVBatch {
return newBatchAlreadyLocked(w.store)
return store.NewEmulatedBatch(w, w.store.mo)
}
func (w *Writer) Close() error {

View File

@ -12,7 +12,7 @@ package store
type KVBatch interface {
Set(key, val []byte)
Delete(key []byte)
Merge(key []byte, oper AssociativeMerge)
Merge(key, val []byte)
Execute() error
Close() error
}
@ -31,6 +31,8 @@ type KVIterator interface {
}
type KVStore interface {
Open() error
SetMergeOperator(MergeOperator)
Writer() (KVWriter, error)
Reader() (KVReader, error)
Close() error
@ -49,21 +51,3 @@ type KVReader interface {
Iterator(key []byte) KVIterator
Close() error
}
type AssociativeMerge interface {
Merge(key, existing []byte) ([]byte, error)
}
type AssociativeMergeChain []AssociativeMerge
func (a AssociativeMergeChain) Merge(key, orig []byte) ([]byte, error) {
curr := orig
for _, m := range a {
var err error
curr, err = m.Merge(key, curr)
if err != nil {
return nil, err
}
}
return curr, nil
}

View File

@ -1,56 +0,0 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package store
import (
"encoding/binary"
"testing"
)
type addUint64Operator struct {
offset uint64
}
func newAddUint64Operator(offset uint64) *addUint64Operator {
return &addUint64Operator{offset: offset}
}
func (a *addUint64Operator) Merge(key, existing []byte) ([]byte, error) {
var existingUint64 uint64
if len(existing) > 0 {
existingUint64, _ = binary.Uvarint(existing)
}
existingUint64 += a.offset
result := make([]byte, 8)
binary.PutUvarint(result, existingUint64)
return result, nil
}
func TestAssociativeMerge(t *testing.T) {
// simulate original lookup of value
existingValue := make([]byte, 8)
binary.PutUvarint(existingValue, 27)
mergeChain := make(AssociativeMergeChain, 0)
mergeChain = append(mergeChain, newAddUint64Operator(6))
mergeChain = append(mergeChain, newAddUint64Operator(3))
mergeChain = append(mergeChain, newAddUint64Operator(25))
mergeChain = append(mergeChain, newAddUint64Operator(1))
newValueBytes, err := mergeChain.Merge([]byte("key"), existingValue)
if err != nil {
t.Fatal(err)
}
newValue, _ := binary.Uvarint(newValueBytes)
if newValue != 62 {
t.Errorf("expected 62, got %d", newValue)
}
}

View File

@ -12,99 +12,44 @@
package leveldb
import (
indexStore "github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/index/store"
"github.com/jmhodges/levigo"
)
type op struct {
k []byte
v []byte
}
type Batch struct {
store *Store
ops []op
alreadyLocked bool
merges map[string]indexStore.AssociativeMergeChain
w *Writer
merge *store.EmulatedMerge
batch *levigo.WriteBatch
}
func newBatch(store *Store) *Batch {
rv := Batch{
store: store,
ops: make([]op, 0, 1000),
merges: make(map[string]indexStore.AssociativeMergeChain),
func (b *Batch) Set(key, val []byte) {
b.batch.Put(key, val)
}
func (b *Batch) Delete(key []byte) {
b.batch.Delete(key)
}
func (b *Batch) Merge(key, val []byte) {
b.merge.Merge(key, val)
}
func (b *Batch) Execute() error {
// first process merges
ops, err := b.merge.ExecuteDeferred(b.w)
if err != nil {
return err
}
return &rv
}
func newBatchAlreadyLocked(store *Store) *Batch {
rv := Batch{
store: store,
ops: make([]op, 0, 1000),
alreadyLocked: true,
merges: make(map[string]indexStore.AssociativeMergeChain),
}
return &rv
}
func (ldb *Batch) Set(key, val []byte) {
ldb.ops = append(ldb.ops, op{key, val})
}
func (ldb *Batch) Delete(key []byte) {
ldb.ops = append(ldb.ops, op{key, nil})
}
func (ldb *Batch) Merge(key []byte, oper indexStore.AssociativeMerge) {
opers, ok := ldb.merges[string(key)]
if !ok {
opers = make(indexStore.AssociativeMergeChain, 0, 1)
}
opers = append(opers, oper)
ldb.merges[string(key)] = opers
}
func (ldb *Batch) Execute() error {
if !ldb.alreadyLocked {
ldb.store.writer.Lock()
defer ldb.store.writer.Unlock()
}
batch := levigo.NewWriteBatch()
defer batch.Close()
// first process the merges
for k, mc := range ldb.merges {
val, err := ldb.store.get([]byte(k))
if err != nil {
return err
}
val, err = mc.Merge([]byte(k), val)
if err != nil {
return err
}
if val == nil {
batch.Delete([]byte(k))
} else {
batch.Put([]byte(k), val)
}
}
// now add all the other ops to the batch
for _, op := range ldb.ops {
if op.v == nil {
batch.Delete(op.k)
} else {
batch.Put(op.k, op.v)
}
for _, op := range ops {
b.batch.Put(op.K, op.V)
}
wopts := defaultWriteOptions()
err := ldb.store.db.Write(wopts, batch)
wopts.Close()
defer wopts.Close()
err = b.w.store.db.Write(wopts, b.batch)
return err
}
func (ldb *Batch) Close() error {
func (b *Batch) Close() error {
return nil
}

View File

@ -27,18 +27,16 @@ type Store struct {
opts *levigo.Options
db *levigo.DB
writer sync.Mutex
mo store.MergeOperator
}
func Open(path string, config map[string]interface{}) (*Store, error) {
func New(path string, config map[string]interface{}) (*Store, error) {
rv := Store{
path: path,
opts: levigo.NewOptions(),
}
applyConfig(rv.opts, config)
var err error
rv.db, err = levigo.Open(rv.path, rv.opts)
_, err := applyConfig(rv.opts, config)
if err != nil {
return nil, err
}
@ -46,6 +44,19 @@ func Open(path string, config map[string]interface{}) (*Store, error) {
return &rv, nil
}
func (ldbs *Store) Open() error {
var err error
ldbs.db, err = levigo.Open(ldbs.path, ldbs.opts)
if err != nil {
return err
}
return nil
}
func (ldbs *Store) SetMergeOperator(mo store.MergeOperator) {
ldbs.mo = mo
}
func (ldbs *Store) get(key []byte) ([]byte, error) {
options := defaultReadOptions()
b, err := ldbs.db.Get(options, key)
@ -107,16 +118,12 @@ func (ldbs *Store) Writer() (store.KVWriter, error) {
return newWriter(ldbs)
}
func (ldbs *Store) newBatch() store.KVBatch {
return newBatch(ldbs)
}
func StoreConstructor(config map[string]interface{}) (store.KVStore, error) {
path, ok := config["path"].(string)
if !ok {
return nil, fmt.Errorf("must specify path")
}
return Open(path, config)
return New(path, config)
}
func init() {

View File

@ -31,7 +31,11 @@ func TestLevelDBStore(t *testing.T) {
}
}()
s, err := Open("test", leveldbTestOptions)
s, err := New("test", leveldbTestOptions)
if err != nil {
t.Fatal(err)
}
err = s.Open()
if err != nil {
t.Fatal(err)
}
@ -53,7 +57,11 @@ func TestReaderIsolation(t *testing.T) {
}
}()
s, err := Open("test", leveldbTestOptions)
s, err := New("test", leveldbTestOptions)
if err != nil {
t.Fatal(err)
}
err = s.Open()
if err != nil {
t.Fatal(err)
}

View File

@ -13,6 +13,7 @@ package leveldb
import (
"github.com/blevesearch/bleve/index/store"
"github.com/jmhodges/levigo"
)
type Writer struct {
@ -39,7 +40,12 @@ func (w *Writer) Delete(key []byte) error {
}
func (w *Writer) NewBatch() store.KVBatch {
return newBatchAlreadyLocked(w.store)
rv := Batch{
w: w,
merge: store.NewEmulatedMerge(w.store.mo),
batch: levigo.NewWriteBatch(),
}
return &rv
}
func (w *Writer) Close() error {

120
index/store/merge.go Normal file
View File

@ -0,0 +1,120 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package store
import (
"fmt"
)
// At the moment this happens to be the same interface as described by
// RocksDB, but this may not always be the case.
type MergeOperator interface {
// FullMerge the full sequence of operands on top of the existingValue
// if no value currently exists, existingValue is nil
// return the merged value, and success/failure
FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool)
// Partially merge these two operands.
// If partial merge cannot be done, return nil,false, which will defer
// all processing until the FullMerge is done.
PartialMerge(key, leftOperand, rightOperand []byte) ([]byte, bool)
// Name returns an identifier for the operator
Name() string
}
// EmulatedMergeSingle removes some duplicated code across
// KV stores which do not support merge operations
// on their own. It is up to the caller to ensure
// that an appropriate lock has been acquired in
// order for this behavior to be valid
func EmulatedMergeSingle(writer KVWriter, mo MergeOperator, key []byte, operand []byte) error {
existingValue, err := writer.Get(key)
if err != nil {
return err
}
newValue, ok := mo.FullMerge(key, existingValue, [][]byte{operand})
if !ok {
return fmt.Errorf("merge operator returned failure")
}
err = writer.Set(key, newValue)
if err != nil {
return err
}
return nil
}
type EmulatedMerge struct {
merges map[string][][]byte
mo MergeOperator
}
func NewEmulatedMerge(mo MergeOperator) *EmulatedMerge {
return &EmulatedMerge{
merges: make(map[string][][]byte),
mo: mo,
}
}
func (m *EmulatedMerge) Merge(key, val []byte) {
ops, ok := m.merges[string(key)]
if ok && len(ops) > 0 {
last := ops[len(ops)-1]
mergedVal, partialMergeOk := m.mo.PartialMerge(key, last, val)
if partialMergeOk {
// replace last entry with the result of the merge
ops[len(ops)-1] = mergedVal
} else {
// could not partial merge, append this to the end
ops = append(ops, val)
}
} else {
ops = [][]byte{val}
}
m.merges[string(key)] = ops
}
func (m *EmulatedMerge) Execute(w KVWriter) error {
for k, mergeOps := range m.merges {
kb := []byte(k)
existingVal, err := w.Get(kb)
if err != nil {
return err
}
mergedVal, fullMergeOk := m.mo.FullMerge(kb, existingVal, mergeOps)
if !fullMergeOk {
return fmt.Errorf("merge operator returned failure")
}
err = w.Set(kb, mergedVal)
if err != nil {
return err
}
}
return nil
}
func (m *EmulatedMerge) ExecuteDeferred(w KVWriter) ([]*op, error) {
rv := make([]*op, 0, 1000)
for k, mergeOps := range m.merges {
kb := []byte(k)
existingVal, err := w.Get(kb)
if err != nil {
return nil, err
}
mergedVal, fullMergeOk := m.mo.FullMerge(kb, existingVal, mergeOps)
if !fullMergeOk {
return nil, fmt.Errorf("merge operator returned failure")
}
rv = append(rv, &op{kb, mergedVal})
}
return rv, nil
}

View File

@ -15,11 +15,19 @@ import (
type Store struct{}
func Open() (*Store, error) {
func New() (*Store, error) {
rv := Store{}
return &rv, nil
}
func (i *Store) Open() error {
return nil
}
func (i *Store) SetMergeOperator(mo store.MergeOperator) {
}
func (i *Store) Close() error {
return nil
}
@ -112,7 +120,7 @@ func (i *Batch) Set(key, val []byte) {
func (i *Batch) Delete(key []byte) {
}
func (i *Batch) Merge(key []byte, oper store.AssociativeMerge) {
func (i *Batch) Merge(key, val []byte) {
}
func (i *Batch) Execute() error {

View File

@ -7,7 +7,7 @@ import (
)
func TestStore(t *testing.T) {
s, err := Open()
s, err := New()
if err != nil {
t.Fatal(err)
}

View File

@ -24,78 +24,98 @@ type AnalysisWork struct {
rc chan *AnalysisResult
}
type AnalysisQueue chan AnalysisWork
type AnalysisQueue struct {
queue chan *AnalysisWork
done chan struct{}
}
func NewAnalysisQueue(numWorkers int) AnalysisQueue {
rv := make(AnalysisQueue)
func (q *AnalysisQueue) Queue(work *AnalysisWork) {
q.queue <- work
}
func (q *AnalysisQueue) Close() {
close(q.done)
}
func NewAnalysisQueue(numWorkers int) *AnalysisQueue {
rv := AnalysisQueue{
queue: make(chan *AnalysisWork),
done: make(chan struct{}),
}
for i := 0; i < numWorkers; i++ {
go AnalysisWorker(rv)
}
return rv
return &rv
}
func AnalysisWorker(q AnalysisQueue) {
// read work off the queue
for {
w := <-q
select {
case <-q.done:
return
case w := <-q.queue:
rv := &AnalysisResult{
docID: w.d.ID,
rows: make([]UpsideDownCouchRow, 0, 100),
}
// track our back index entries
backIndexTermEntries := make([]*BackIndexTermEntry, 0)
backIndexStoredEntries := make([]*BackIndexStoreEntry, 0)
for _, field := range w.d.Fields {
fieldIndex, newFieldRow := w.udc.fieldIndexCache.FieldIndex(field.Name())
if newFieldRow != nil {
rv.rows = append(rv.rows, newFieldRow)
rv := &AnalysisResult{
docID: w.d.ID,
rows: make([]UpsideDownCouchRow, 0, 100),
}
if field.Options().IsIndexed() {
// track our back index entries
backIndexTermEntries := make([]*BackIndexTermEntry, 0)
backIndexStoredEntries := make([]*BackIndexStoreEntry, 0)
fieldLength, tokenFreqs := field.Analyze()
// see if any of the composite fields need this
for _, compositeField := range w.d.CompositeFields {
compositeField.Compose(field.Name(), fieldLength, tokenFreqs)
for _, field := range w.d.Fields {
fieldIndex, newFieldRow := w.udc.fieldIndexCache.FieldIndex(field.Name())
if newFieldRow != nil {
rv.rows = append(rv.rows, newFieldRow)
}
if field.Options().IsIndexed() {
fieldLength, tokenFreqs := field.Analyze()
// see if any of the composite fields need this
for _, compositeField := range w.d.CompositeFields {
compositeField.Compose(field.Name(), fieldLength, tokenFreqs)
}
// encode this field
indexRows, indexBackIndexTermEntries := w.udc.indexField(w.d.ID, field, fieldIndex, fieldLength, tokenFreqs)
rv.rows = append(rv.rows, indexRows...)
backIndexTermEntries = append(backIndexTermEntries, indexBackIndexTermEntries...)
}
if field.Options().IsStored() {
storeRows, indexBackIndexStoreEntries := w.udc.storeField(w.d.ID, field, fieldIndex)
rv.rows = append(rv.rows, storeRows...)
backIndexStoredEntries = append(backIndexStoredEntries, indexBackIndexStoreEntries...)
}
// encode this field
indexRows, indexBackIndexTermEntries := w.udc.indexField(w.d.ID, field, fieldIndex, fieldLength, tokenFreqs)
rv.rows = append(rv.rows, indexRows...)
backIndexTermEntries = append(backIndexTermEntries, indexBackIndexTermEntries...)
}
if field.Options().IsStored() {
storeRows, indexBackIndexStoreEntries := w.udc.storeField(w.d.ID, field, fieldIndex)
rv.rows = append(rv.rows, storeRows...)
backIndexStoredEntries = append(backIndexStoredEntries, indexBackIndexStoreEntries...)
// now index the composite fields
for _, compositeField := range w.d.CompositeFields {
fieldIndex, newFieldRow := w.udc.fieldIndexCache.FieldIndex(compositeField.Name())
if newFieldRow != nil {
rv.rows = append(rv.rows, newFieldRow)
}
if compositeField.Options().IsIndexed() {
fieldLength, tokenFreqs := compositeField.Analyze()
// encode this field
indexRows, indexBackIndexTermEntries := w.udc.indexField(w.d.ID, compositeField, fieldIndex, fieldLength, tokenFreqs)
rv.rows = append(rv.rows, indexRows...)
backIndexTermEntries = append(backIndexTermEntries, indexBackIndexTermEntries...)
}
}
// build the back index row
backIndexRow := NewBackIndexRow(w.d.ID, backIndexTermEntries, backIndexStoredEntries)
rv.rows = append(rv.rows, backIndexRow)
w.rc <- rv
}
// now index the composite fields
for _, compositeField := range w.d.CompositeFields {
fieldIndex, newFieldRow := w.udc.fieldIndexCache.FieldIndex(compositeField.Name())
if newFieldRow != nil {
rv.rows = append(rv.rows, newFieldRow)
}
if compositeField.Options().IsIndexed() {
fieldLength, tokenFreqs := compositeField.Analyze()
// encode this field
indexRows, indexBackIndexTermEntries := w.udc.indexField(w.d.ID, compositeField, fieldIndex, fieldLength, tokenFreqs)
rv.rows = append(rv.rows, indexRows...)
backIndexTermEntries = append(backIndexTermEntries, indexBackIndexTermEntries...)
}
}
// build the back index row
backIndexRow := NewBackIndexRow(w.d.ID, backIndexTermEntries, backIndexStoredEntries)
rv.rows = append(rv.rows, backIndexRow)
w.rc <- rv
}
}

View File

@ -0,0 +1,8 @@
#!/bin/sh
BENCHMARKS=`grep "func Benchmark" *_test.go | sed 's/.*func //' | sed s/\(.*{//`
for BENCHMARK in $BENCHMARKS
do
go test -v -run=xxx -bench=^$BENCHMARK$ -benchtime=10s -tags 'forestdb leveldb' | grep -v ok | grep -v PASS
done

View File

@ -13,196 +13,53 @@ import (
"os"
"testing"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/index/store/boltdb"
)
func BenchmarkBoltDBIndexing1Workers(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
func CreateBoltDB() (store.KVStore, error) {
s := boltdb.New("test", "bleve")
return s, nil
}
CommonBenchmarkIndex(b, s, 1)
func DestroyBoltDB() error {
return os.RemoveAll("test")
}
func BenchmarkBoltDBIndexing1Workers(b *testing.B) {
CommonBenchmarkIndex(b, CreateBoltDB, DestroyBoltDB, 1)
}
func BenchmarkBoltDBIndexing2Workers(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 2)
CommonBenchmarkIndex(b, CreateBoltDB, DestroyBoltDB, 2)
}
func BenchmarkBoltDBIndexing4Workers(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 4)
CommonBenchmarkIndex(b, CreateBoltDB, DestroyBoltDB, 4)
}
// batches
func BenchmarkBoltDBIndexing1Workers10Batch(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 10)
CommonBenchmarkIndexBatch(b, CreateBoltDB, DestroyBoltDB, 1, 10)
}
func BenchmarkBoltDBIndexing2Workers10Batch(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 10)
CommonBenchmarkIndexBatch(b, CreateBoltDB, DestroyBoltDB, 2, 10)
}
func BenchmarkBoltDBIndexing4Workers10Batch(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 10)
CommonBenchmarkIndexBatch(b, CreateBoltDB, DestroyBoltDB, 4, 10)
}
func BenchmarkBoltDBIndexing1Workers100Batch(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 100)
CommonBenchmarkIndexBatch(b, CreateBoltDB, DestroyBoltDB, 1, 100)
}
func BenchmarkBoltDBIndexing2Workers100Batch(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 100)
CommonBenchmarkIndexBatch(b, CreateBoltDB, DestroyBoltDB, 2, 100)
}
func BenchmarkBoltDBIndexing4Workers100Batch(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 100)
CommonBenchmarkIndexBatch(b, CreateBoltDB, DestroyBoltDB, 4, 100)
}

View File

@ -33,9 +33,10 @@ var benchmarkDocBodies = []string{
"The expansion ratio of a liquefied and cryogenic substance is the volume of a given amount of that substance in liquid form compared to the volume of the same amount of substance in gaseous form, at room temperature and normal atmospheric pressure.",
}
func CommonBenchmarkIndex(b *testing.B, s store.KVStore, analysisWorkers int) {
analysisQueue := NewAnalysisQueue(analysisWorkers)
idx := NewUpsideDownCouch(s, analysisQueue)
type KVStoreCreate func() (store.KVStore, error)
type KVStoreDestroy func() error
func CommonBenchmarkIndex(b *testing.B, create KVStoreCreate, destroy KVStoreDestroy, analysisWorkers int) {
cache := registry.NewCache()
analyzer, err := cache.AnalyzerNamed("standard")
@ -47,18 +48,40 @@ func CommonBenchmarkIndex(b *testing.B, s store.KVStore, analysisWorkers int) {
AddField(document.NewTextFieldWithAnalyzer("body", []uint64{}, []byte(benchmarkDocBodies[0]), analyzer))
b.ResetTimer()
b.StopTimer()
for i := 0; i < b.N; i++ {
indexDocument.ID = strconv.Itoa(i)
err := idx.Update(indexDocument)
s, err := create()
if err != nil {
b.Fatal(err)
}
analysisQueue := NewAnalysisQueue(analysisWorkers)
idx := NewUpsideDownCouch(s, analysisQueue)
err = idx.Open()
if err != nil {
b.Fatal(err)
}
indexDocument.ID = strconv.Itoa(i)
// just time the indexing portion
b.StartTimer()
err = idx.Update(indexDocument)
if err != nil {
b.Fatal(err)
}
b.StopTimer()
err = idx.Close()
if err != nil {
b.Fatal(err)
}
err = destroy()
if err != nil {
b.Fatal(err)
}
analysisQueue.Close()
}
}
func CommonBenchmarkIndexBatch(b *testing.B, s store.KVStore, analysisWorkers, batchSize int) {
analysisQueue := NewAnalysisQueue(analysisWorkers)
idx := NewUpsideDownCouch(s, analysisQueue)
func CommonBenchmarkIndexBatch(b *testing.B, create KVStoreCreate, destroy KVStoreDestroy, analysisWorkers, batchSize int) {
cache := registry.NewCache()
analyzer, err := cache.AnalyzerNamed("standard")
@ -67,8 +90,22 @@ func CommonBenchmarkIndexBatch(b *testing.B, s store.KVStore, analysisWorkers, b
}
b.ResetTimer()
b.StopTimer()
for i := 0; i < b.N; i++ {
s, err := create()
if err != nil {
b.Fatal(err)
}
analysisQueue := NewAnalysisQueue(analysisWorkers)
idx := NewUpsideDownCouch(s, analysisQueue)
err = idx.Open()
if err != nil {
b.Fatal(err)
}
b.StartTimer()
batch := index.NewBatch()
for j := 0; j < 1000; j++ {
if j%batchSize == 0 {
@ -92,6 +129,15 @@ func CommonBenchmarkIndexBatch(b *testing.B, s store.KVStore, analysisWorkers, b
b.Fatal(err)
}
}
b.StopTimer()
err = idx.Close()
if err != nil {
b.Fatal(err)
}
err = destroy()
if err != nil {
b.Fatal(err)
}
analysisQueue.Close()
}
}

View File

@ -12,187 +12,64 @@ package upside_down
import (
"testing"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/index/store/cznicb"
)
func BenchmarkCznicBIndexing1Workers(b *testing.B) {
s, err := cznicb.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
func CreateCznicB() (store.KVStore, error) {
return cznicb.StoreConstructor(nil)
}
CommonBenchmarkIndex(b, s, 1)
func DestroyCznicB() error {
return nil
}
func BenchmarkCznicBIndexing1Workers(b *testing.B) {
CommonBenchmarkIndex(b, CreateCznicB, DestroyCznicB, 1)
}
func BenchmarkCznicBIndexing2Workers(b *testing.B) {
s, err := cznicb.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 2)
CommonBenchmarkIndex(b, CreateCznicB, DestroyCznicB, 2)
}
func BenchmarkCznicBIndexing4Workers(b *testing.B) {
s, err := cznicb.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 4)
CommonBenchmarkIndex(b, CreateCznicB, DestroyCznicB, 4)
}
// batches
func BenchmarkCznicBIndexing1Workers10Batch(b *testing.B) {
s, err := cznicb.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 10)
CommonBenchmarkIndexBatch(b, CreateCznicB, DestroyCznicB, 1, 10)
}
func BenchmarkCznicBIndexing2Workers10Batch(b *testing.B) {
s, err := cznicb.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 10)
CommonBenchmarkIndexBatch(b, CreateCznicB, DestroyCznicB, 2, 10)
}
func BenchmarkCznicBIndexing4Workers10Batch(b *testing.B) {
s, err := cznicb.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 10)
CommonBenchmarkIndexBatch(b, CreateCznicB, DestroyCznicB, 4, 10)
}
func BenchmarkCznicBIndexing1Workers100Batch(b *testing.B) {
s, err := cznicb.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 100)
CommonBenchmarkIndexBatch(b, CreateCznicB, DestroyCznicB, 1, 100)
}
func BenchmarkCznicBIndexing2Workers100Batch(b *testing.B) {
s, err := cznicb.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 100)
CommonBenchmarkIndexBatch(b, CreateCznicB, DestroyCznicB, 2, 100)
}
func BenchmarkCznicBIndexing4Workers100Batch(b *testing.B) {
s, err := cznicb.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 100)
CommonBenchmarkIndexBatch(b, CreateCznicB, DestroyCznicB, 4, 100)
}
func BenchmarkCznicBIndexing1Workers1000Batch(b *testing.B) {
s, err := cznicb.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 1000)
CommonBenchmarkIndexBatch(b, CreateCznicB, DestroyCznicB, 1, 1000)
}
func BenchmarkCznicBIndexing2Workers1000Batch(b *testing.B) {
s, err := cznicb.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 1000)
CommonBenchmarkIndexBatch(b, CreateCznicB, DestroyCznicB, 2, 1000)
}
func BenchmarkCznicBIndexing4Workers1000Batch(b *testing.B) {
s, err := cznicb.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 1000)
CommonBenchmarkIndexBatch(b, CreateCznicB, DestroyCznicB, 4, 1000)
}

View File

@ -15,241 +15,60 @@ import (
"os"
"testing"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/index/store/forestdb"
)
func BenchmarkForestDBIndexing1Workers(b *testing.B) {
defer func() {
err := os.RemoveAll("testdir")
if err != nil {
b.Fatal(err)
}
}()
func CreateForestDB() (store.KVStore, error) {
err := os.MkdirAll("testdir", 0700)
if err != nil {
b.Fatal(err)
return nil, err
}
s, err := forestdb.Open("testdir/test", true, nil)
s, err := forestdb.New("testdir/test", true, nil)
if err != nil {
b.Fatal(err)
return nil, err
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
return s, nil
}
CommonBenchmarkIndex(b, s, 1)
func DestroyForestDB() error {
return os.RemoveAll("testdir")
}
func BenchmarkForestDBIndexing1Workers(b *testing.B) {
CommonBenchmarkIndex(b, CreateForestDB, DestroyForestDB, 1)
}
func BenchmarkForestDBIndexing2Workers(b *testing.B) {
defer func() {
err := os.RemoveAll("testdir")
if err != nil {
b.Fatal(err)
}
}()
err := os.MkdirAll("testdir", 0700)
if err != nil {
b.Fatal(err)
}
s, err := forestdb.Open("testdir/test", true, nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 2)
CommonBenchmarkIndex(b, CreateForestDB, DestroyForestDB, 2)
}
func BenchmarkForestDBIndexing4Workers(b *testing.B) {
defer func() {
err := os.RemoveAll("testdir")
if err != nil {
b.Fatal(err)
}
}()
err := os.MkdirAll("testdir", 0700)
if err != nil {
b.Fatal(err)
}
s, err := forestdb.Open("testdir/test", true, nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 4)
CommonBenchmarkIndex(b, CreateForestDB, DestroyForestDB, 4)
}
// batches
func BenchmarkForestDBIndexing1Workers10Batch(b *testing.B) {
defer func() {
err := os.RemoveAll("testdir")
if err != nil {
b.Fatal(err)
}
}()
err := os.MkdirAll("testdir", 0700)
if err != nil {
b.Fatal(err)
}
s, err := forestdb.Open("testdir/test", true, nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 10)
CommonBenchmarkIndexBatch(b, CreateForestDB, DestroyForestDB, 1, 10)
}
func BenchmarkForestDBIndexing2Workers10Batch(b *testing.B) {
defer func() {
err := os.RemoveAll("testdir")
if err != nil {
b.Fatal(err)
}
}()
err := os.MkdirAll("testdir", 0700)
if err != nil {
b.Fatal(err)
}
s, err := forestdb.Open("testdir/test", true, nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 10)
CommonBenchmarkIndexBatch(b, CreateForestDB, DestroyForestDB, 2, 10)
}
func BenchmarkForestDBIndexing4Workers10Batch(b *testing.B) {
defer func() {
err := os.RemoveAll("testdir")
if err != nil {
b.Fatal(err)
}
}()
err := os.MkdirAll("testdir", 0700)
if err != nil {
b.Fatal(err)
}
s, err := forestdb.Open("testdir/test", true, nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 10)
CommonBenchmarkIndexBatch(b, CreateForestDB, DestroyForestDB, 4, 10)
}
func BenchmarkForestDBIndexing1Workers100Batch(b *testing.B) {
defer func() {
err := os.RemoveAll("testdir")
if err != nil {
b.Fatal(err)
}
}()
err := os.MkdirAll("testdir", 0700)
if err != nil {
b.Fatal(err)
}
s, err := forestdb.Open("testdir/test", true, nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 100)
CommonBenchmarkIndexBatch(b, CreateForestDB, DestroyForestDB, 1, 100)
}
func BenchmarkForestDBIndexing2Workers100Batch(b *testing.B) {
defer func() {
err := os.RemoveAll("testdir")
if err != nil {
b.Fatal(err)
}
}()
err := os.MkdirAll("testdir", 0700)
if err != nil {
b.Fatal(err)
}
s, err := forestdb.Open("testdir/test", true, nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 100)
CommonBenchmarkIndexBatch(b, CreateForestDB, DestroyForestDB, 2, 100)
}
func BenchmarkForestDBIndexing4Workers100Batch(b *testing.B) {
defer func() {
err := os.RemoveAll("testdir")
if err != nil {
b.Fatal(err)
}
}()
err := os.MkdirAll("testdir", 0700)
if err != nil {
b.Fatal(err)
}
s, err := forestdb.Open("testdir/test", true, nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 100)
CommonBenchmarkIndexBatch(b, CreateForestDB, DestroyForestDB, 4, 100)
}

View File

@ -13,6 +13,7 @@ import (
"os"
"testing"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/index/store/goleveldb"
)
@ -20,256 +21,60 @@ var goLevelDBTestOptions = map[string]interface{}{
"create_if_missing": true,
}
func BenchmarkGoLevelDBIndexing1Workers(b *testing.B) {
s, err := goleveldb.Open("test", goLevelDBTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
func CreateGoLevelDB() (store.KVStore, error) {
return goleveldb.New("test", goLevelDBTestOptions)
}
CommonBenchmarkIndex(b, s, 1)
func DestroyGoLevelDB() error {
return os.RemoveAll("test")
}
func BenchmarkGoLevelDBIndexing1Workers(b *testing.B) {
CommonBenchmarkIndex(b, CreateGoLevelDB, DestroyGoLevelDB, 1)
}
func BenchmarkGoLevelDBIndexing2Workers(b *testing.B) {
s, err := goleveldb.Open("test", goLevelDBTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 2)
CommonBenchmarkIndex(b, CreateGoLevelDB, DestroyGoLevelDB, 2)
}
func BenchmarkGoLevelDBIndexing4Workers(b *testing.B) {
s, err := goleveldb.Open("test", goLevelDBTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 4)
CommonBenchmarkIndex(b, CreateGoLevelDB, DestroyGoLevelDB, 4)
}
// batches
func BenchmarkGoLevelDBIndexing1Workers10Batch(b *testing.B) {
s, err := goleveldb.Open("test", goLevelDBTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 10)
CommonBenchmarkIndexBatch(b, CreateGoLevelDB, DestroyGoLevelDB, 1, 10)
}
func BenchmarkGoLevelDBIndexing2Workers10Batch(b *testing.B) {
s, err := goleveldb.Open("test", goLevelDBTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 10)
CommonBenchmarkIndexBatch(b, CreateGoLevelDB, DestroyGoLevelDB, 2, 10)
}
func BenchmarkGoLevelDBIndexing4Workers10Batch(b *testing.B) {
s, err := goleveldb.Open("test", goLevelDBTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 10)
CommonBenchmarkIndexBatch(b, CreateGoLevelDB, DestroyGoLevelDB, 4, 10)
}
func BenchmarkGoLevelDBIndexing1Workers100Batch(b *testing.B) {
s, err := goleveldb.Open("test", goLevelDBTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 100)
CommonBenchmarkIndexBatch(b, CreateGoLevelDB, DestroyGoLevelDB, 1, 100)
}
func BenchmarkGoLevelDBIndexing2Workers100Batch(b *testing.B) {
s, err := goleveldb.Open("test", goLevelDBTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 100)
CommonBenchmarkIndexBatch(b, CreateGoLevelDB, DestroyGoLevelDB, 2, 100)
}
func BenchmarkGoLevelDBIndexing4Workers100Batch(b *testing.B) {
s, err := goleveldb.Open("test", goLevelDBTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 100)
CommonBenchmarkIndexBatch(b, CreateGoLevelDB, DestroyGoLevelDB, 4, 100)
}
func BenchmarkGoLevelDBIndexing1Workers1000Batch(b *testing.B) {
s, err := goleveldb.Open("test", goLevelDBTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 1000)
CommonBenchmarkIndexBatch(b, CreateGoLevelDB, DestroyGoLevelDB, 1, 1000)
}
func BenchmarkGoLevelDBIndexing2Workers1000Batch(b *testing.B) {
s, err := goleveldb.Open("test", goLevelDBTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 1000)
CommonBenchmarkIndexBatch(b, CreateGoLevelDB, DestroyGoLevelDB, 2, 1000)
}
func BenchmarkGoLevelDBIndexing4Workers1000Batch(b *testing.B) {
s, err := goleveldb.Open("test", goLevelDBTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 1000)
CommonBenchmarkIndexBatch(b, CreateGoLevelDB, DestroyGoLevelDB, 4, 1000)
}

View File

@ -12,142 +12,52 @@ package upside_down
import (
"testing"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/index/store/gtreap"
)
func BenchmarkGTreapIndexing1Workers(b *testing.B) {
s, err := gtreap.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
func CreateGTreap() (store.KVStore, error) {
return gtreap.StoreConstructor(nil)
}
CommonBenchmarkIndex(b, s, 1)
func DestroyGTreap() error {
return nil
}
func BenchmarkGTreapIndexing1Workers(b *testing.B) {
CommonBenchmarkIndex(b, CreateGTreap, DestroyGTreap, 1)
}
func BenchmarkGTreapIndexing2Workers(b *testing.B) {
s, err := gtreap.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 2)
CommonBenchmarkIndex(b, CreateGTreap, DestroyGTreap, 2)
}
func BenchmarkGTreapIndexing4Workers(b *testing.B) {
s, err := gtreap.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 4)
CommonBenchmarkIndex(b, CreateGTreap, DestroyGTreap, 4)
}
// batches
func BenchmarkGTreapIndexing1Workers10Batch(b *testing.B) {
s, err := gtreap.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 10)
CommonBenchmarkIndexBatch(b, CreateGTreap, DestroyGTreap, 1, 10)
}
func BenchmarkGTreapIndexing2Workers10Batch(b *testing.B) {
s, err := gtreap.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 10)
CommonBenchmarkIndexBatch(b, CreateGTreap, DestroyGTreap, 2, 10)
}
func BenchmarkGTreapIndexing4Workers10Batch(b *testing.B) {
s, err := gtreap.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 10)
CommonBenchmarkIndexBatch(b, CreateGTreap, DestroyGTreap, 4, 10)
}
func BenchmarkGTreapIndexing1Workers100Batch(b *testing.B) {
s, err := gtreap.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 100)
CommonBenchmarkIndexBatch(b, CreateGTreap, DestroyGTreap, 1, 100)
}
func BenchmarkGTreapIndexing2Workers100Batch(b *testing.B) {
s, err := gtreap.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 100)
CommonBenchmarkIndexBatch(b, CreateGTreap, DestroyGTreap, 2, 100)
}
func BenchmarkGTreapIndexing4Workers100Batch(b *testing.B) {
s, err := gtreap.StoreConstructor(nil)
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 100)
CommonBenchmarkIndexBatch(b, CreateGTreap, DestroyGTreap, 4, 100)
}

View File

@ -12,142 +12,52 @@ package upside_down
import (
"testing"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/index/store/inmem"
)
func BenchmarkInMemIndexing1Workers(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
func CreateInMem() (store.KVStore, error) {
return inmem.New()
}
CommonBenchmarkIndex(b, s, 1)
func DestroyInMem() error {
return nil
}
func BenchmarkInMemIndexing1Workers(b *testing.B) {
CommonBenchmarkIndex(b, CreateInMem, DestroyInMem, 1)
}
func BenchmarkInMemIndexing2Workers(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 2)
CommonBenchmarkIndex(b, CreateInMem, DestroyInMem, 2)
}
func BenchmarkInMemIndexing4Workers(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 4)
CommonBenchmarkIndex(b, CreateInMem, DestroyInMem, 4)
}
// batches
func BenchmarkInMemIndexing1Workers10Batch(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 10)
CommonBenchmarkIndexBatch(b, CreateInMem, DestroyInMem, 1, 10)
}
func BenchmarkInMemIndexing2Workers10Batch(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 10)
CommonBenchmarkIndexBatch(b, CreateInMem, DestroyInMem, 2, 10)
}
func BenchmarkInMemIndexing4Workers10Batch(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 10)
CommonBenchmarkIndexBatch(b, CreateInMem, DestroyInMem, 4, 10)
}
func BenchmarkInMemIndexing1Workers100Batch(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 100)
CommonBenchmarkIndexBatch(b, CreateInMem, DestroyInMem, 1, 100)
}
func BenchmarkInMemIndexing2Workers100Batch(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 100)
CommonBenchmarkIndexBatch(b, CreateInMem, DestroyInMem, 2, 100)
}
func BenchmarkInMemIndexing4Workers100Batch(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 100)
CommonBenchmarkIndexBatch(b, CreateInMem, DestroyInMem, 4, 100)
}

View File

@ -15,6 +15,7 @@ import (
"os"
"testing"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/index/store/leveldb"
)
@ -22,256 +23,60 @@ var leveldbTestOptions = map[string]interface{}{
"create_if_missing": true,
}
func BenchmarkLevelDBIndexing1Workers(b *testing.B) {
s, err := leveldb.Open("test", leveldbTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
func CreateLevelDB() (store.KVStore, error) {
return leveldb.New("test", leveldbTestOptions)
}
CommonBenchmarkIndex(b, s, 1)
func DestroyLevelDB() error {
return os.RemoveAll("test")
}
func BenchmarkLevelDBIndexing1Workers(b *testing.B) {
CommonBenchmarkIndex(b, CreateLevelDB, DestroyLevelDB, 1)
}
func BenchmarkLevelDBIndexing2Workers(b *testing.B) {
s, err := leveldb.Open("test", leveldbTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 2)
CommonBenchmarkIndex(b, CreateLevelDB, DestroyLevelDB, 2)
}
func BenchmarkLevelDBIndexing4Workers(b *testing.B) {
s, err := leveldb.Open("test", leveldbTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 4)
CommonBenchmarkIndex(b, CreateLevelDB, DestroyLevelDB, 4)
}
// batches
func BenchmarkLevelDBIndexing1Workers10Batch(b *testing.B) {
s, err := leveldb.Open("test", leveldbTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 10)
CommonBenchmarkIndexBatch(b, CreateLevelDB, DestroyLevelDB, 1, 10)
}
func BenchmarkLevelDBIndexing2Workers10Batch(b *testing.B) {
s, err := leveldb.Open("test", leveldbTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 10)
CommonBenchmarkIndexBatch(b, CreateLevelDB, DestroyLevelDB, 2, 10)
}
func BenchmarkLevelDBIndexing4Workers10Batch(b *testing.B) {
s, err := leveldb.Open("test", leveldbTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 10)
CommonBenchmarkIndexBatch(b, CreateLevelDB, DestroyLevelDB, 4, 10)
}
func BenchmarkLevelDBIndexing1Workers100Batch(b *testing.B) {
s, err := leveldb.Open("test", leveldbTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 100)
CommonBenchmarkIndexBatch(b, CreateLevelDB, DestroyLevelDB, 1, 100)
}
func BenchmarkLevelDBIndexing2Workers100Batch(b *testing.B) {
s, err := leveldb.Open("test", leveldbTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 100)
CommonBenchmarkIndexBatch(b, CreateLevelDB, DestroyLevelDB, 2, 100)
}
func BenchmarkLevelDBIndexing4Workers100Batch(b *testing.B) {
s, err := leveldb.Open("test", leveldbTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 100)
CommonBenchmarkIndexBatch(b, CreateLevelDB, DestroyLevelDB, 4, 100)
}
func BenchmarkLevelDBIndexing1Workers1000Batch(b *testing.B) {
s, err := leveldb.Open("test", leveldbTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 1000)
CommonBenchmarkIndexBatch(b, CreateLevelDB, DestroyLevelDB, 1, 1000)
}
func BenchmarkLevelDBIndexing2Workers1000Batch(b *testing.B) {
s, err := leveldb.Open("test", leveldbTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 1000)
CommonBenchmarkIndexBatch(b, CreateLevelDB, DestroyLevelDB, 2, 1000)
}
func BenchmarkLevelDBIndexing4Workers1000Batch(b *testing.B) {
s, err := leveldb.Open("test", leveldbTestOptions)
if err != nil {
b.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
b.Fatal(err)
}
}()
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 1000)
CommonBenchmarkIndexBatch(b, CreateLevelDB, DestroyLevelDB, 4, 1000)
}

View File

@ -12,142 +12,52 @@ package upside_down
import (
"testing"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/index/store/null"
)
func BenchmarkNullIndexing1Workers(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
func CreateNull() (store.KVStore, error) {
return null.New()
}
CommonBenchmarkIndex(b, s, 1)
func DestroyNull() error {
return nil
}
func BenchmarkNullIndexing1Workers(b *testing.B) {
CommonBenchmarkIndex(b, CreateNull, DestroyNull, 1)
}
func BenchmarkNullIndexing2Workers(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 2)
CommonBenchmarkIndex(b, CreateNull, DestroyNull, 2)
}
func BenchmarkNullIndexing4Workers(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndex(b, s, 4)
CommonBenchmarkIndex(b, CreateNull, DestroyNull, 4)
}
// batches
func BenchmarkNullIndexing1Workers10Batch(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 10)
CommonBenchmarkIndexBatch(b, CreateNull, DestroyNull, 1, 10)
}
func BenchmarkNullIndexing2Workers10Batch(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 10)
CommonBenchmarkIndexBatch(b, CreateNull, DestroyNull, 2, 10)
}
func BenchmarkNullIndexing4Workers10Batch(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 10)
CommonBenchmarkIndexBatch(b, CreateNull, DestroyNull, 4, 10)
}
func BenchmarkNullIndexing1Workers100Batch(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 1, 100)
CommonBenchmarkIndexBatch(b, CreateNull, DestroyNull, 1, 100)
}
func BenchmarkNullIndexing2Workers100Batch(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 2, 100)
CommonBenchmarkIndexBatch(b, CreateNull, DestroyNull, 2, 100)
}
func BenchmarkNullIndexing4Workers100Batch(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer func() {
err := s.Close()
if err != nil {
b.Fatal(err)
}
}()
CommonBenchmarkIndexBatch(b, s, 4, 100)
CommonBenchmarkIndexBatch(b, CreateNull, DestroyNull, 4, 100)
}

View File

@ -26,13 +26,11 @@ func TestDump(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
if err != nil {
t.Error(err)
}
s := boltdb.New("test", "bleve")
s.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
idx := NewUpsideDownCouch(s, analysisQueue)
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}

View File

@ -26,10 +26,11 @@ func TestIndexFieldDict(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
s := boltdb.New("test", "bleve")
s.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
idx := NewUpsideDownCouch(s, analysisQueue)
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}

View File

@ -27,10 +27,11 @@ func TestIndexReader(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
s := boltdb.New("test", "bleve")
s.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
idx := NewUpsideDownCouch(s, analysisQueue)
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -200,10 +201,11 @@ func TestIndexDocIdReader(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
s := boltdb.New("test", "bleve")
s.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
idx := NewUpsideDownCouch(s, analysisQueue)
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}

View File

@ -9,46 +9,62 @@
package upside_down
type dictionaryTermIncr struct{}
import (
"encoding/binary"
)
func newDictionaryTermIncr() *dictionaryTermIncr {
return &dictionaryTermIncr{}
var mergeOperator upsideDownMerge
var dictionaryTermIncr []byte
var dictionaryTermDecr []byte
func init() {
dictionaryTermIncr = make([]byte, 8)
binary.LittleEndian.PutUint64(dictionaryTermIncr, uint64(1))
dictionaryTermDecr = make([]byte, 8)
var negOne = int64(-1)
binary.LittleEndian.PutUint64(dictionaryTermDecr, uint64(negOne))
}
func (t *dictionaryTermIncr) Merge(key, existing []byte) ([]byte, error) {
if len(existing) > 0 {
dr, err := NewDictionaryRowKV(key, existing)
if err != nil {
return nil, err
}
dr.count++
return dr.Value(), nil
} else {
dr, err := NewDictionaryRowK(key)
if err != nil {
return nil, err
}
dr.count = 1
return dr.Value(), nil
type upsideDownMerge struct{}
func (m *upsideDownMerge) FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool) {
// set up record based on key
dr, err := NewDictionaryRowK(key)
if err != nil {
return nil, false
}
}
type dictionaryTermDecr struct{}
func newDictionaryTermDecr() *dictionaryTermDecr {
return &dictionaryTermDecr{}
}
func (t *dictionaryTermDecr) Merge(key, existing []byte) ([]byte, error) {
if len(existing) > 0 {
dr, err := NewDictionaryRowKV(key, existing)
if len(existingValue) > 0 {
// if existing value, parse it
err = dr.parseDictionaryV(existingValue)
if err != nil {
return nil, err
}
dr.count--
if dr.count > 0 {
return dr.Value(), nil
return nil, false
}
}
return nil, nil
// now process operands
for _, operand := range operands {
next := int64(binary.LittleEndian.Uint64(operand))
if next < 0 && uint64(-next) > dr.count {
// subtracting next from existing would overflow
dr.count = 0
} else if next < 0 {
dr.count -= uint64(-next)
} else {
dr.count += uint64(next)
}
}
return dr.Value(), true
}
func (m *upsideDownMerge) PartialMerge(key, leftOperand, rightOperand []byte) ([]byte, bool) {
left := int64(binary.LittleEndian.Uint64(leftOperand))
right := int64(binary.LittleEndian.Uint64(rightOperand))
binary.LittleEndian.PutUint64(leftOperand, uint64(left+right))
return leftOperand, true
}
func (m *upsideDownMerge) Name() string {
return "upsideDownMerge"
}

View File

@ -37,11 +37,11 @@ type UpsideDownCouch struct {
store store.KVStore
fieldIndexCache *FieldIndexCache
docCount uint64
analysisQueue AnalysisQueue
analysisQueue *AnalysisQueue
stats *indexStat
}
func NewUpsideDownCouch(s store.KVStore, analysisQueue AnalysisQueue) *UpsideDownCouch {
func NewUpsideDownCouch(s store.KVStore, analysisQueue *AnalysisQueue) *UpsideDownCouch {
return &UpsideDownCouch{
version: Version,
fieldIndexCache: NewFieldIndexCache(),
@ -119,7 +119,7 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow
if ok {
// need to increment counter
dictionaryKey := tfr.DictionaryRowKey()
wb.Merge(dictionaryKey, newDictionaryTermIncr())
wb.Merge(dictionaryKey, dictionaryTermIncr)
}
wb.Set(row.Key(), row.Value())
}
@ -135,7 +135,7 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow
if ok {
// need to decrement counter
dictionaryKey := tfr.DictionaryRowKey()
wb.Merge(dictionaryKey, newDictionaryTermDecr())
wb.Merge(dictionaryKey, dictionaryTermDecr)
}
wb.Delete(row.Key())
}
@ -153,6 +153,15 @@ func (udc *UpsideDownCouch) DocCount() (uint64, error) {
}
func (udc *UpsideDownCouch) Open() (err error) {
// install the merge operator
udc.store.SetMergeOperator(&mergeOperator)
// now open the kv store
err = udc.store.Open()
if err != nil {
return
}
// start a writer for the open process
var kvwriter store.KVWriter
kvwriter, err = udc.store.Writer()
@ -252,7 +261,7 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) {
}
// put the work on the queue
go func() {
udc.analysisQueue <- aw
udc.analysisQueue.Queue(&aw)
}()
// wait for the result
@ -581,7 +590,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
rc: resultChan,
}
// put the work on the queue
udc.analysisQueue <- aw
udc.analysisQueue.Queue(&aw)
}
}
}()

View File

@ -35,10 +35,11 @@ func TestIndexOpenReopen(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -68,10 +69,8 @@ func TestIndexOpenReopen(t *testing.T) {
t.Fatal(err)
}
store, err = boltdb.Open("test", "bleve")
if err != nil {
t.Fatalf("error opening store: %v", err)
}
store = boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
idx = NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
@ -93,10 +92,11 @@ func TestIndexInsert(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -151,10 +151,11 @@ func TestIndexInsertThenDelete(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -226,8 +227,8 @@ func TestIndexInsertThenDelete(t *testing.T) {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
// should have 2 rows (1 for version, 1 for schema field)
expectedLength := uint64(1 + 1)
// should have 2 rows (1 for version, 1 for schema field, 1 for dictionary row garbage)
expectedLength := uint64(1 + 1 + 1)
rowCount, err := idx.rowCount()
if err != nil {
t.Error(err)
@ -245,10 +246,11 @@ func TestIndexInsertThenUpdate(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -292,8 +294,8 @@ func TestIndexInsertThenUpdate(t *testing.T) {
t.Errorf("Error deleting entry from index: %v", err)
}
// should have 2 rows (1 for version, 1 for schema field, and 1 for the remaining term, and 1 for the term count, and 1 for the back index entry)
expectedLength = uint64(1 + 1 + 1 + 1 + 1)
// should have 2 rows (1 for version, 1 for schema field, and 1 for the remaining term, and 2 for the term diciontary, and 1 for the back index entry)
expectedLength = uint64(1 + 1 + 1 + 2 + 1)
rowCount, err = idx.rowCount()
if err != nil {
t.Error(err)
@ -311,10 +313,11 @@ func TestIndexInsertMultiple(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -353,7 +356,8 @@ func TestIndexInsertMultiple(t *testing.T) {
t.Fatal(err)
}
store, err = boltdb.Open("test", "bleve")
store = boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
idx = NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
@ -391,13 +395,11 @@ func TestIndexInsertWithStore(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
if err != nil {
t.Error(err)
}
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -479,10 +481,11 @@ func TestIndexInternalCRUD(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -574,10 +577,11 @@ func TestIndexBatch(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -670,13 +674,11 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
if err != nil {
t.Error(err)
}
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -866,13 +868,11 @@ func TestIndexInsertFields(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
if err != nil {
t.Error(err)
}
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -927,13 +927,11 @@ func TestIndexUpdateComposites(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
if err != nil {
t.Error(err)
}
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -1006,7 +1004,8 @@ func TestIndexUpdateComposites(t *testing.T) {
t.Errorf("expected field content 'test', got '%s'", string(textField.Value()))
}
// should have the same row count as before
// should have the same row count as before, plus 4 term dictionary garbage rows
expectedLength += 4
rowCount, err = idx.rowCount()
if err != nil {
t.Error(err)
@ -1024,13 +1023,11 @@ func TestIndexFieldsMisc(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
if err != nil {
t.Error(err)
}
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -1072,13 +1069,11 @@ func TestIndexTermReaderCompositeFields(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
if err != nil {
t.Error(err)
}
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -1134,13 +1129,11 @@ func TestIndexDocumentFieldTerms(t *testing.T) {
}
}()
store, err := boltdb.Open("test", "bleve")
if err != nil {
t.Error(err)
}
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
err := idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}

View File

@ -124,7 +124,7 @@ func newIndexUsing(path string, mapping *IndexMapping, kvstore string, kvconfig
kvconfig["error_if_exists"] = true
kvconfig["path"] = indexStorePath(path)
// now open the store
// now create the store
rv.s, err = storeConstructor(kvconfig)
if err != nil {
return nil, err

View File

@ -24,9 +24,13 @@ import (
var twoDocIndex index.Index //= upside_down.NewUpsideDownCouch(inmem.MustOpen())
func init() {
inMemStore, _ := inmem.Open()
inMemStore, _ := inmem.New()
analysisQueue := upside_down.NewAnalysisQueue(1)
twoDocIndex = upside_down.NewUpsideDownCouch(inMemStore, analysisQueue)
err := twoDocIndex.Open()
if err != nil {
panic(err)
}
for _, doc := range twoDocIndexDocs {
err := twoDocIndex.Update(doc)
if err != nil {

View File

@ -25,10 +25,14 @@ func TestTermSearcher(t *testing.T) {
var queryBoost = 3.0
var queryExplain = true
inMemStore, _ := inmem.Open()
inMemStore, _ := inmem.New()
analysisQueue := upside_down.NewAnalysisQueue(1)
i := upside_down.NewUpsideDownCouch(inMemStore, analysisQueue)
err := i.Update(&document.Document{
err := i.Open()
if err != nil {
t.Fatal(err)
}
err = i.Update(&document.Document{
ID: "a",
Fields: []document.Field{
document.NewTextField("desc", []uint64{}, []byte("beer")),