0
0
Fork 0

more refactoring

This commit is contained in:
Marty Schoch 2015-09-28 16:50:27 -04:00
parent 900f1b4a67
commit d06b526cbf
13 changed files with 209 additions and 351 deletions

View File

@ -17,111 +17,63 @@ import (
"github.com/blevesearch/bleve/index/store/test"
)
func open(mo store.MergeOperator) (store.KVStore, error) {
return New(mo, map[string]interface{}{"path": "test"})
}
func TestBoltDBKVCrud(t *testing.T) {
s, err := open(nil)
func open(t *testing.T, mo store.MergeOperator) store.KVStore {
rv, err := New(mo, map[string]interface{}{"path": "test"})
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
return rv
}
func cleanup(t *testing.T, s store.KVStore) {
err := s.Close()
if err != nil {
t.Fatal(err)
}
err = os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}
func TestBoltDBKVCrud(t *testing.T) {
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestKVCrud(t, s)
}
func TestBoltDBReaderIsolation(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestReaderIsolation(t, s)
}
func TestBoltDBReaderOwnsGetBytes(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestReaderOwnsGetBytes(t, s)
}
func TestBoltDBWriterOwnsBytes(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestWriterOwnsBytes(t, s)
}
func TestBoltDBPrefixIterator(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestPrefixIterator(t, s)
}
func TestBoltDBRangeIterator(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestRangeIterator(t, s)
}
func TestBoltDBMerge(t *testing.T) {
s, err := open(&test.TestMergeCounter{})
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, &test.TestMergeCounter{})
defer cleanup(t, s)
test.CommonTestMerge(t, s)
}

View File

@ -56,7 +56,6 @@ func New(mo store.MergeOperator, config map[string]interface{}) (store.KVStore,
defaultWriteOptions: &opt.WriteOptions{},
}
rv.defaultWriteOptions.Sync = true
return &rv, nil
}

View File

@ -17,114 +17,66 @@ import (
"github.com/blevesearch/bleve/index/store/test"
)
func open(mo store.MergeOperator) (store.KVStore, error) {
return New(mo, map[string]interface{}{
func open(t *testing.T, mo store.MergeOperator) store.KVStore {
rv, err := New(mo, map[string]interface{}{
"path": "test",
"create_if_missing": true,
})
}
func TestGoLevelDBKVCrud(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
return rv
}
func cleanup(t *testing.T, s store.KVStore) {
err := s.Close()
if err != nil {
t.Fatal(err)
}
err = os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}
func TestGoLevelDBKVCrud(t *testing.T) {
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestKVCrud(t, s)
}
func TestGoLevelDBReaderIsolation(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestReaderIsolation(t, s)
}
func TestGoLevelDBReaderOwnsGetBytes(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestReaderOwnsGetBytes(t, s)
}
func TestGoLevelDBWriterOwnsBytes(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestWriterOwnsBytes(t, s)
}
func TestGoLevelDBPrefixIterator(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestPrefixIterator(t, s)
}
func TestGoLevelDBRangeIterator(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestRangeIterator(t, s)
}
func TestGoLevelDBMerge(t *testing.T) {
s, err := open(&test.TestMergeCounter{})
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, &test.TestMergeCounter{})
defer cleanup(t, s)
test.CommonTestMerge(t, s)
}

View File

@ -13,118 +13,65 @@
package gtreap
import (
"os"
"testing"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/index/store/test"
)
func open(mo store.MergeOperator) (store.KVStore, error) {
return New(mo, nil)
}
func TestGTreapKVCrud(t *testing.T) {
s, err := open(nil)
func open(t *testing.T, mo store.MergeOperator) store.KVStore {
rv, err := New(mo, nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
return rv
}
func cleanup(t *testing.T, s store.KVStore) {
err := s.Close()
if err != nil {
t.Fatal(err)
}
}
func TestGTreapKVCrud(t *testing.T) {
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestKVCrud(t, s)
}
func TestGTreapReaderIsolation(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestReaderIsolation(t, s)
}
func TestGTreapReaderOwnsGetBytes(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestReaderOwnsGetBytes(t, s)
}
func TestGTreapWriterOwnsBytes(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestWriterOwnsBytes(t, s)
}
func TestGTreapPrefixIterator(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestPrefixIterator(t, s)
}
func TestGTreapRangeIterator(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestRangeIterator(t, s)
}
func TestGTreapMerge(t *testing.T) {
s, err := open(&test.TestMergeCounter{})
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, &test.TestMergeCounter{})
defer cleanup(t, s)
test.CommonTestMerge(t, s)
}

View File

@ -1,7 +1,6 @@
package metrics
import (
"os"
"testing"
"github.com/blevesearch/bleve/index/store"
@ -9,111 +8,59 @@ import (
"github.com/blevesearch/bleve/index/store/test"
)
func open(mo store.MergeOperator) (store.KVStore, error) {
return New(mo, map[string]interface{}{"kvStoreName_actual": gtreap.Name})
}
func TestMetricsKVCrud(t *testing.T) {
s, err := open(nil)
func open(t *testing.T, mo store.MergeOperator) store.KVStore {
rv, err := New(mo, map[string]interface{}{"kvStoreName_actual": gtreap.Name})
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
return rv
}
func cleanup(t *testing.T, s store.KVStore) {
err := s.Close()
if err != nil {
t.Fatal(err)
}
}
func TestMetricsKVCrud(t *testing.T) {
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestKVCrud(t, s)
}
func TestMetricsReaderIsolation(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestReaderIsolation(t, s)
}
func TestMetricsReaderOwnsGetBytes(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestReaderOwnsGetBytes(t, s)
}
func TestMetricsWriterOwnsBytes(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestWriterOwnsBytes(t, s)
}
func TestMetricsPrefixIterator(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestPrefixIterator(t, s)
}
func TestMetricsRangeIterator(t *testing.T) {
s, err := open(nil)
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, nil)
defer cleanup(t, s)
test.CommonTestRangeIterator(t, s)
}
func TestMetricsMerge(t *testing.T) {
s, err := open(&test.TestMergeCounter{})
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll("test")
if err != nil {
t.Fatal(err)
}
}()
s := open(t, &test.TestMergeCounter{})
defer cleanup(t, s)
test.CommonTestMerge(t, s)
}

View File

@ -12,10 +12,12 @@ func TestStore(t *testing.T) {
t.Fatal(err)
}
CommonTestKVStore(t, s)
NullTestKVStore(t, s)
}
func CommonTestKVStore(t *testing.T, s store.KVStore) {
// NullTestKVStore has very different expectations
// compared to CommonTestKVStore
func NullTestKVStore(t *testing.T, s store.KVStore) {
writer, err := s.Writer()
if err != nil {

View File

@ -0,0 +1,11 @@
# Generic KVStore implementation tests
These are a set of common tests that should pass on any correct KVStore implementation.
Each test function in this package has the form:
func CommonTest<name>(t *testing.T, s store.KVStore) {...}
A KVStore implementation test should use the same name, including its own KVStore name in the test function. It should instantiate an instance of the store, and pass the testing.T and store to the common function.
The common test functions should *NOT* close the KVStore. The KVStore test implementation should close the store and cleanup any state.

View File

@ -106,12 +106,6 @@ func CommonTestReaderOwnsGetBytes(t *testing.T, s store.KVStore) {
t.Fatal(err)
}
// close the store
err = s.Close()
if err != nil {
t.Fatal(err)
}
// finally check that the value we mutated still has what we set it to
for i := range returnedVal {
if returnedVal[i] != '2' {
@ -265,10 +259,4 @@ func CommonTestWriterOwnsBytes(t *testing.T, s store.KVStore) {
if err != nil {
t.Fatal(err)
}
// close the store
err = s.Close()
if err != nil {
t.Fatal(err)
}
}

View File

@ -133,15 +133,15 @@ func CommonTestReaderIsolation(t *testing.T, s store.KVStore) {
// ensure that the direct iterator sees it
count = 0
it = newReader.RangeIterator([]byte{0}, []byte{'x'})
it2 := newReader.RangeIterator([]byte{0}, []byte{'x'})
defer func() {
err := it.Close()
err := it2.Close()
if err != nil {
t.Fatal(err)
}
}()
for it.Valid() {
it.Next()
for it2.Valid() {
it2.Next()
count++
}
if count != 2 {
@ -159,15 +159,15 @@ func CommonTestReaderIsolation(t *testing.T, s store.KVStore) {
// and ensure that the iterator on the isolated reader also does not
count = 0
it = reader.RangeIterator([]byte{0}, []byte{'x'})
it3 := reader.RangeIterator([]byte{0}, []byte{'x'})
defer func() {
err := it.Close()
err := it3.Close()
if err != nil {
t.Fatal(err)
}
}()
for it.Valid() {
it.Next()
for it3.Valid() {
it3.Next()
count++
}
if count != 1 {

View File

@ -118,12 +118,6 @@ func CommonTestPrefixIterator(t *testing.T, s store.KVStore) {
if err != nil {
t.Fatal(err)
}
// close the store
err = s.Close()
if err != nil {
t.Fatal(err)
}
}
func CommonTestRangeIterator(t *testing.T, s store.KVStore) {
@ -280,10 +274,4 @@ func CommonTestRangeIterator(t *testing.T, s store.KVStore) {
if err != nil {
t.Fatal(err)
}
// close the store
err = s.Close()
if err != nil {
t.Fatal(err)
}
}

View File

@ -552,7 +552,7 @@ func NewBackIndexRowKV(key, value []byte) (*BackIndexRow, error) {
rv.doc, err = buf.ReadBytes(ByteSeparator)
if err == io.EOF && len(rv.doc) < 1 {
err = fmt.Errorf("invalid doc length 0")
err = fmt.Errorf("invalid doc length 0 - % x", key)
}
if err != nil && err != io.EOF {
return nil, err

View File

@ -49,6 +49,8 @@ type UpsideDownCouch struct {
m sync.RWMutex
// fields protected by m
docCount uint64
writeMutex sync.Mutex
}
func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
@ -159,6 +161,10 @@ func (udc *UpsideDownCouch) DocCount() (uint64, error) {
}
func (udc *UpsideDownCouch) Open() (err error) {
//acquire the write mutex for the duratin of Open()
udc.writeMutex.Lock()
defer udc.writeMutex.Unlock()
// open the kv store
storeConstructor := registry.KVStoreConstructorByName(udc.storeName)
if storeConstructor == nil {
@ -217,7 +223,7 @@ func (udc *UpsideDownCouch) Open() (err error) {
}
}()
// init th eindex
// init the index
err = udc.init(kvwriter)
}
@ -289,6 +295,9 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) {
close(resultChan)
atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart)))
udc.writeMutex.Lock()
defer udc.writeMutex.Unlock()
// open a reader for backindex lookup
var kvreader store.KVReader
kvreader, err = udc.store.Reader()
@ -458,6 +467,9 @@ func (udc *UpsideDownCouch) indexField(docID string, field document.Field, field
func (udc *UpsideDownCouch) Delete(id string) (err error) {
indexStart := time.Now()
udc.writeMutex.Lock()
defer udc.writeMutex.Unlock()
// open a reader for backindex lookup
var kvreader store.KVReader
kvreader, err = udc.store.Reader()
@ -677,6 +689,9 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
indexStart := time.Now()
udc.writeMutex.Lock()
defer udc.writeMutex.Unlock()
// open a reader for backindex lookup
var kvreader store.KVReader
kvreader, err = udc.store.Reader()
@ -763,6 +778,8 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
func (udc *UpsideDownCouch) SetInternal(key, val []byte) (err error) {
internalRow := NewInternalRow(key, val)
udc.writeMutex.Lock()
defer udc.writeMutex.Unlock()
var writer store.KVWriter
writer, err = udc.store.Writer()
if err != nil {
@ -782,6 +799,8 @@ func (udc *UpsideDownCouch) SetInternal(key, val []byte) (err error) {
func (udc *UpsideDownCouch) DeleteInternal(key []byte) (err error) {
internalRow := NewInternalRow(key, nil)
udc.writeMutex.Lock()
defer udc.writeMutex.Unlock()
var writer store.KVWriter
writer, err = udc.store.Writer()
if err != nil {

View File

@ -10,9 +10,11 @@
package upside_down
import (
"log"
"reflect"
"regexp"
"strconv"
"sync"
"testing"
"time"
@ -1155,3 +1157,54 @@ func BenchmarkBatch(b *testing.B) {
}
}
}
func TestConcurrentUpdate(t *testing.T) {
defer DestroyTest()
analysisQueue := index.NewAnalysisQueue(1)
idx, err := NewUpsideDownCouch(boltdb.Name, boltTestConfig, analysisQueue)
if err != nil {
t.Fatal(err)
}
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
defer func() {
err := idx.Close()
if err != nil {
t.Fatal(err)
}
}()
// do some concurrent updates
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
doc := document.NewDocument("1")
doc.AddField(document.NewTextFieldWithIndexingOptions(strconv.Itoa(i), []uint64{}, []byte(strconv.Itoa(i)), document.StoreField))
err := idx.Update(doc)
if err != nil {
t.Errorf("Error updating index: %v", err)
}
wg.Done()
}(i)
}
wg.Wait()
// now load the name field and see what we get
r, err := idx.Reader()
if err != nil {
log.Fatal(err)
}
doc, err := r.Document("1")
if err != nil {
log.Fatal(err)
}
if len(doc.Fields) > 1 {
t.Errorf("expected single field, found %d", len(doc.Fields))
}
}