0
0

refactored index to separate out kv storage

now how pluggable options for
leveldb
gouchstore
in memory only
This commit is contained in:
Marty Schoch 2014-05-09 16:37:04 -04:00
parent ba442d2669
commit d48eee948e
22 changed files with 944 additions and 127 deletions

View File

@ -13,6 +13,7 @@ import (
"io/ioutil"
"log"
"github.com/couchbaselabs/bleve/index/store/leveldb"
"github.com/couchbaselabs/bleve/index/upside_down"
"github.com/couchbaselabs/bleve/shredder"
)
@ -28,8 +29,12 @@ func main() {
jsonShredder := shredder.NewAutoJsonShredder()
// create a new index
index := upside_down.NewUpsideDownCouch(*indexDir)
err := index.Open()
store, err := leveldb.Open(*indexDir)
if err != nil {
log.Fatal(err)
}
index := upside_down.NewUpsideDownCouch(store)
err = index.Open()
if err != nil {
log.Fatal(err)
}

View File

@ -13,6 +13,7 @@ import (
"fmt"
"log"
"github.com/couchbaselabs/bleve/index/store/leveldb"
"github.com/couchbaselabs/bleve/index/upside_down"
"github.com/couchbaselabs/bleve/search"
)
@ -30,8 +31,12 @@ func main() {
}
// open index
index := upside_down.NewUpsideDownCouch(*indexDir)
err := index.Open()
store, err := leveldb.Open(*indexDir)
if err != nil {
log.Fatal(err)
}
index := upside_down.NewUpsideDownCouch(store)
err = index.Open()
if err != nil {
log.Fatal(err)
}

View File

@ -0,0 +1,36 @@
package gouchstore
import (
"github.com/mschoch/gouchstore"
)
type GouchstoreBatch struct {
store *GouchstoreStore
bulk gouchstore.BulkWriter
}
func newGouchstoreBatch(store *GouchstoreStore) *GouchstoreBatch {
rv := GouchstoreBatch{
store: store,
bulk: store.db.Bulk(),
}
return &rv
}
func (gb *GouchstoreBatch) Set(key, val []byte) {
doc, docInfo := kvToDocDocInfo(key, val)
gb.bulk.Set(docInfo, doc)
}
func (gb *GouchstoreBatch) Delete(key []byte) {
_, docInfo := kvToDocDocInfo(key, nil)
gb.bulk.Delete(docInfo)
}
func (gb *GouchstoreBatch) Execute() error {
return gb.bulk.Commit()
}
func (gb *GouchstoreBatch) Close() error {
return gb.bulk.Close()
}

View File

@ -0,0 +1,101 @@
package gouchstore
import (
"fmt"
"github.com/mschoch/gouchstore"
)
type GouchstoreIterator struct {
store *GouchstoreStore
valid bool
curr *gouchstore.DocumentInfo
diChan chan *gouchstore.DocumentInfo
closeChan chan bool
}
func newGouchstoreIterator(store *GouchstoreStore) *GouchstoreIterator {
rv := GouchstoreIterator{
store: store,
}
return &rv
}
func (gi *GouchstoreIterator) cleanupExistingIterator() {
if gi.closeChan != nil {
close(gi.closeChan)
alive := true
for alive {
_, alive = <-gi.diChan
}
gi.closeChan = nil
}
}
func (gi *GouchstoreIterator) SeekFirst() {
gi.Seek([]byte{})
}
func (gi *GouchstoreIterator) Seek(key []byte) {
gi.cleanupExistingIterator()
gi.curr = nil
gi.diChan = make(chan *gouchstore.DocumentInfo)
gi.closeChan = make(chan bool)
wtCallback := func(gouchstore *gouchstore.Gouchstore, depth int, documentInfo *gouchstore.DocumentInfo, key []byte, subTreeSize uint64, reducedValue []byte, userContext interface{}) error {
if documentInfo != nil && documentInfo.Deleted == false {
select {
case gi.diChan <- documentInfo:
gi.valid = true
case <-gi.closeChan:
return fmt.Errorf("seek aborted")
}
}
return nil
}
go func() {
gi.store.db.WalkIdTree(string(key), "", wtCallback, nil)
close(gi.diChan)
}()
gi.curr = <-gi.diChan
}
func (gi *GouchstoreIterator) Current() ([]byte, []byte, bool) {
if gi.Valid() {
return gi.Key(), gi.Value(), true
}
return nil, nil, false
}
func (gi *GouchstoreIterator) Next() {
gi.curr = <-gi.diChan
if gi.curr == nil {
gi.valid = false
}
}
func (gi *GouchstoreIterator) Key() []byte {
if gi.curr != nil {
return []byte(gi.curr.ID)
}
return nil
}
func (gi *GouchstoreIterator) Value() []byte {
if gi.curr != nil {
doc, err := gi.store.db.DocumentByDocumentInfo(gi.curr)
if err == nil {
return doc.Body
}
}
return nil
}
func (gi *GouchstoreIterator) Valid() bool {
return gi.valid
}
func (gi *GouchstoreIterator) Close() {
gi.cleanupExistingIterator()
}

View File

@ -0,0 +1,71 @@
package gouchstore
import (
"github.com/couchbaselabs/bleve/index/store"
"github.com/mschoch/gouchstore"
)
type GouchstoreStore struct {
path string
db *gouchstore.Gouchstore
}
func Open(path string) (*GouchstoreStore, error) {
rv := GouchstoreStore{
path: path,
}
var err error
rv.db, err = gouchstore.Open(path, gouchstore.OPEN_CREATE)
if err != nil {
return nil, err
}
return &rv, nil
}
func (gs *GouchstoreStore) Get(key []byte) ([]byte, error) {
docInfo, err := gs.db.DocumentInfoById(string(key))
if err != nil && err.Error() != "document not found" {
return nil, err
}
if docInfo != nil && !docInfo.Deleted {
doc, err := gs.db.DocumentById(string(key))
if err != nil {
return nil, err
}
return doc.Body, nil
}
return nil, nil
}
func (gs *GouchstoreStore) Set(key, val []byte) error {
doc := gouchstore.NewDocument(string(key), val)
docInfo := gouchstore.NewDocumentInfo(string(key))
return gs.db.SaveDocument(doc, docInfo)
}
func (gs *GouchstoreStore) Delete(key []byte) error {
doc := gouchstore.NewDocument(string(key), nil)
docInfo := gouchstore.NewDocumentInfo(string(key))
docInfo.Deleted = true
return gs.db.SaveDocument(doc, docInfo)
}
func (gs *GouchstoreStore) Commit() error {
return gs.db.Commit()
}
func (gs *GouchstoreStore) Close() error {
return gs.db.Close()
}
func (gs *GouchstoreStore) Iterator(key []byte) store.KVIterator {
rv := newGouchstoreIterator(gs)
rv.Seek(key)
return rv
}
func (gs *GouchstoreStore) NewBatch() store.KVBatch {
return newGouchstoreBatch(gs)
}

View File

@ -0,0 +1,82 @@
package gouchstore
import (
"os"
"testing"
)
func TestGouchstoreStore(t *testing.T) {
store, err := Open("test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll("test")
err = store.Set([]byte("a"), []byte("val-a"))
if err != nil {
t.Fatal(err)
}
err = store.Set([]byte("z"), []byte("val-z"))
if err != nil {
t.Fatal(err)
}
err = store.Delete([]byte("z"))
if err != nil {
t.Fatal(err)
}
batch := store.NewBatch()
batch.Set([]byte("a"), []byte("val-a"))
batch.Set([]byte("b"), []byte("val-b"))
batch.Set([]byte("c"), []byte("val-c"))
batch.Set([]byte("d"), []byte("val-d"))
batch.Set([]byte("e"), []byte("val-e"))
batch.Set([]byte("f"), []byte("val-f"))
batch.Set([]byte("g"), []byte("val-g"))
batch.Set([]byte("h"), []byte("val-h"))
batch.Set([]byte("i"), []byte("val-i"))
batch.Set([]byte("j"), []byte("val-j"))
err = batch.Execute()
if err != nil {
t.Fatal(err)
}
it := store.Iterator([]byte("b"))
key, val, valid := it.Current()
if !valid {
t.Fatalf("valid false, expected true")
}
if string(key) != "b" {
t.Fatalf("exepcted key b, got %s", key)
}
if string(val) != "val-b" {
t.Fatalf("expected value val-b, got %s", val)
}
it.Next()
key, val, valid = it.Current()
if !valid {
t.Fatalf("valid false, expected true")
}
if string(key) != "c" {
t.Fatalf("exepcted key c, got %s", key)
}
if string(val) != "val-c" {
t.Fatalf("expected value val-c, got %s", val)
}
it.Seek([]byte("i"))
key, val, valid = it.Current()
if !valid {
t.Fatalf("valid false, expected true")
}
if string(key) != "i" {
t.Fatalf("exepcted key i, got %s", key)
}
if string(val) != "val-i" {
t.Fatalf("expected value val-i, got %s", val)
}
it.Close()
}

View File

@ -0,0 +1,10 @@
package gouchstore
import (
"github.com/mschoch/gouchstore"
)
func kvToDocDocInfo(key, val []byte) (*gouchstore.Document, *gouchstore.DocumentInfo) {
id := string(key)
return gouchstore.NewDocument(id, val), gouchstore.NewDocumentInfo(id)
}

View File

@ -0,0 +1,42 @@
package inmem
type InMemBatch struct {
store *InMemStore
keys [][]byte
vals [][]byte
}
func newInMemBatch(store *InMemStore) *InMemBatch {
rv := InMemBatch{
store: store,
keys: make([][]byte, 0),
vals: make([][]byte, 0),
}
return &rv
}
func (i *InMemBatch) Set(key, val []byte) {
i.keys = append(i.keys, key)
i.vals = append(i.vals, val)
}
func (i *InMemBatch) Delete(key []byte) {
i.keys = append(i.keys, key)
i.vals = append(i.vals, nil)
}
func (i *InMemBatch) Execute() error {
for index, key := range i.keys {
val := i.vals[index]
if val == nil {
i.store.list.Delete(string(key))
} else {
i.store.Set(key, val)
}
}
return nil
}
func (i *InMemBatch) Close() error {
return nil
}

View File

@ -0,0 +1,60 @@
package inmem
import (
"github.com/ryszard/goskiplist/skiplist"
)
type InMemIterator struct {
store *InMemStore
iterator skiplist.Iterator
valid bool
}
func newInMemIterator(store *InMemStore) *InMemIterator {
rv := InMemIterator{
store: store,
iterator: store.list.Iterator(),
}
return &rv
}
func (i *InMemIterator) SeekFirst() {
i.Seek([]byte{0})
}
func (i *InMemIterator) Seek(k []byte) {
i.valid = i.iterator.Seek(string(k))
}
func (i *InMemIterator) Next() {
i.valid = i.iterator.Next()
}
func (i *InMemIterator) Current() ([]byte, []byte, bool) {
if i.valid {
return []byte(i.Key()), []byte(i.Value()), true
}
return nil, nil, false
}
func (i *InMemIterator) Key() []byte {
if i.valid {
return []byte(i.iterator.Key().(string))
}
return nil
}
func (i *InMemIterator) Value() []byte {
if i.valid {
return []byte(i.iterator.Value().(string))
}
return nil
}
func (i *InMemIterator) Valid() bool {
return i.valid
}
func (i *InMemIterator) Close() {
i.iterator.Close()
}

View File

@ -0,0 +1,65 @@
package inmem
import (
//"bytes"
"github.com/couchbaselabs/bleve/index/store"
"github.com/ryszard/goskiplist/skiplist"
)
type InMemStore struct {
list *skiplist.SkipList
}
func Open() (*InMemStore, error) {
rv := InMemStore{
//list: skiplist.NewCustomMap(byteArrayLessThan),
list: skiplist.NewStringMap(),
}
return &rv, nil
}
// func byteArrayLessThan(l, r interface{}) bool {
// if bytes.Compare(l.([]byte), r.([]byte)) < 0 {
// return true
// }
// return false
// }
func (i *InMemStore) Get(key []byte) ([]byte, error) {
val, ok := i.list.Get(string(key))
if ok {
return []byte(val.(string)), nil
}
return nil, nil
}
func (i *InMemStore) Set(key, val []byte) error {
i.list.Set(string(key), string(val))
return nil
}
func (i *InMemStore) Delete(key []byte) error {
i.list.Delete(string(key))
return nil
}
func (i *InMemStore) Commit() error {
return nil
}
func (i *InMemStore) Close() error {
return nil
}
func (i *InMemStore) Iterator(key []byte) store.KVIterator {
rv := newInMemIterator(i)
rv.Seek(key)
return rv
}
func (i *InMemStore) NewBatch() store.KVBatch {
return newInMemBatch(i)
}

View File

@ -0,0 +1,79 @@
package inmem
import (
"testing"
)
func TestInMemStore(t *testing.T) {
store, err := Open()
if err != nil {
t.Fatal(err)
}
err = store.Set([]byte("a"), []byte("val-a"))
if err != nil {
t.Fatal(err)
}
err = store.Set([]byte("z"), []byte("val-z"))
if err != nil {
t.Fatal(err)
}
err = store.Delete([]byte("z"))
if err != nil {
t.Fatal(err)
}
batch := store.NewBatch()
batch.Set([]byte("b"), []byte("val-b"))
batch.Set([]byte("c"), []byte("val-c"))
batch.Set([]byte("d"), []byte("val-d"))
batch.Set([]byte("e"), []byte("val-e"))
batch.Set([]byte("f"), []byte("val-f"))
batch.Set([]byte("g"), []byte("val-g"))
batch.Set([]byte("h"), []byte("val-h"))
batch.Set([]byte("i"), []byte("val-i"))
batch.Set([]byte("j"), []byte("val-j"))
err = batch.Execute()
if err != nil {
t.Fatal(err)
}
it := store.Iterator([]byte("b"))
key, val, valid := it.Current()
if !valid {
t.Fatalf("valid false, expected true")
}
if string(key) != "b" {
t.Fatalf("exepcted key b, got %s", key)
}
if string(val) != "val-b" {
t.Fatalf("expected value val-b, got %s", val)
}
it.Next()
key, val, valid = it.Current()
if !valid {
t.Fatalf("valid false, expected true")
}
if string(key) != "c" {
t.Fatalf("exepcted key c, got %s", key)
}
if string(val) != "val-c" {
t.Fatalf("expected value val-c, got %s", val)
}
it.Seek([]byte("i"))
key, val, valid = it.Current()
if !valid {
t.Fatalf("valid false, expected true")
}
if string(key) != "i" {
t.Fatalf("exepcted key i, got %s", key)
}
if string(val) != "val-i" {
t.Fatalf("expected value val-i, got %s", val)
}
it.Close()
}

32
index/store/kvstore.go Normal file
View File

@ -0,0 +1,32 @@
package store
type KVBatch interface {
Set(key, val []byte)
Delete(key []byte)
Execute() error
Close() error
}
type KVIterator interface {
SeekFirst()
Seek([]byte)
Next()
Current() ([]byte, []byte, bool)
Key() []byte
Value() []byte
Valid() bool
Close()
}
type KVStore interface {
Get(key []byte) ([]byte, error)
Set(key, val []byte) error
Delete(key []byte) error
Commit() error
Close() error
Iterator(key []byte) KVIterator
NewBatch() KVBatch
}

View File

@ -0,0 +1,35 @@
package leveldb
import (
"github.com/jmhodges/levigo"
)
type LevelDBBatch struct {
store *LevelDBStore
batch *levigo.WriteBatch
}
func newLevelDBBatch(store *LevelDBStore) *LevelDBBatch {
rv := LevelDBBatch{
store: store,
batch: levigo.NewWriteBatch(),
}
return &rv
}
func (ldb *LevelDBBatch) Set(key, val []byte) {
ldb.batch.Put(key, val)
}
func (ldb *LevelDBBatch) Delete(key []byte) {
ldb.batch.Delete(key)
}
func (ldb *LevelDBBatch) Execute() error {
return ldb.store.db.Write(defaultWriteOptions(), ldb.batch)
}
func (ldb *LevelDBBatch) Close() error {
ldb.batch.Close()
return nil
}

View File

@ -0,0 +1,53 @@
package leveldb
import (
"github.com/jmhodges/levigo"
)
type LevelDBIterator struct {
store *LevelDBStore
iterator *levigo.Iterator
}
func newLevelDBIterator(store *LevelDBStore) *LevelDBIterator {
rv := LevelDBIterator{
store: store,
iterator: store.db.NewIterator(defaultReadOptions()),
}
return &rv
}
func (ldi *LevelDBIterator) SeekFirst() {
ldi.iterator.SeekToFirst()
}
func (ldi *LevelDBIterator) Seek(key []byte) {
ldi.iterator.Seek(key)
}
func (ldi *LevelDBIterator) Next() {
ldi.iterator.Next()
}
func (ldi *LevelDBIterator) Current() ([]byte, []byte, bool) {
if ldi.Valid() {
return ldi.Key(), ldi.Value(), true
}
return nil, nil, false
}
func (ldi *LevelDBIterator) Key() []byte {
return ldi.iterator.Key()
}
func (ldi *LevelDBIterator) Value() []byte {
return ldi.iterator.Value()
}
func (ldi *LevelDBIterator) Valid() bool {
return ldi.iterator.Valid()
}
func (ldi *LevelDBIterator) Close() {
ldi.iterator.Close()
}

View File

@ -0,0 +1,61 @@
package leveldb
import (
"github.com/couchbaselabs/bleve/index/store"
"github.com/jmhodges/levigo"
)
type LevelDBStore struct {
path string
opts *levigo.Options
db *levigo.DB
}
func Open(path string) (*LevelDBStore, error) {
rv := LevelDBStore{
path: path,
}
opts := levigo.NewOptions()
opts.SetCreateIfMissing(true)
rv.opts = opts
var err error
rv.db, err = levigo.Open(rv.path, rv.opts)
if err != nil {
return nil, err
}
return &rv, nil
}
func (ldbs *LevelDBStore) Get(key []byte) ([]byte, error) {
return ldbs.db.Get(defaultReadOptions(), key)
}
func (ldbs *LevelDBStore) Set(key, val []byte) error {
return ldbs.db.Put(defaultWriteOptions(), key, val)
}
func (ldbs *LevelDBStore) Delete(key []byte) error {
return ldbs.db.Delete(defaultWriteOptions(), key)
}
func (ldbs *LevelDBStore) Commit() error {
return nil
}
func (ldbs *LevelDBStore) Close() error {
ldbs.db.Close()
return nil
}
func (ldbs *LevelDBStore) Iterator(key []byte) store.KVIterator {
rv := newLevelDBIterator(ldbs)
rv.Seek(key)
return rv
}
func (ldbs *LevelDBStore) NewBatch() store.KVBatch {
return newLevelDBBatch(ldbs)
}

View File

@ -0,0 +1,81 @@
package leveldb
import (
"os"
"testing"
)
func TestLevelDBStore(t *testing.T) {
store, err := Open("test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll("test")
err = store.Set([]byte("a"), []byte("val-a"))
if err != nil {
t.Fatal(err)
}
err = store.Set([]byte("z"), []byte("val-z"))
if err != nil {
t.Fatal(err)
}
err = store.Delete([]byte("z"))
if err != nil {
t.Fatal(err)
}
batch := store.NewBatch()
batch.Set([]byte("b"), []byte("val-b"))
batch.Set([]byte("c"), []byte("val-c"))
batch.Set([]byte("d"), []byte("val-d"))
batch.Set([]byte("e"), []byte("val-e"))
batch.Set([]byte("f"), []byte("val-f"))
batch.Set([]byte("g"), []byte("val-g"))
batch.Set([]byte("h"), []byte("val-h"))
batch.Set([]byte("i"), []byte("val-i"))
batch.Set([]byte("j"), []byte("val-j"))
err = batch.Execute()
if err != nil {
t.Fatal(err)
}
it := store.Iterator([]byte("b"))
key, val, valid := it.Current()
if !valid {
t.Fatalf("valid false, expected true")
}
if string(key) != "b" {
t.Fatalf("exepcted key b, got %s", key)
}
if string(val) != "val-b" {
t.Fatalf("expected value val-b, got %s", val)
}
it.Next()
key, val, valid = it.Current()
if !valid {
t.Fatalf("valid false, expected true")
}
if string(key) != "c" {
t.Fatalf("exepcted key c, got %s", key)
}
if string(val) != "val-c" {
t.Fatalf("expected value val-c, got %s", val)
}
it.Seek([]byte("i"))
key, val, valid = it.Current()
if !valid {
t.Fatalf("valid false, expected true")
}
if string(key) != "i" {
t.Fatalf("exepcted key i, got %s", key)
}
if string(val) != "val-i" {
t.Fatalf("expected value val-i, got %s", val)
}
it.Close()
}

View File

@ -0,0 +1,17 @@
package leveldb
import (
"github.com/jmhodges/levigo"
)
func defaultWriteOptions() *levigo.WriteOptions {
wo := levigo.NewWriteOptions()
// request fsync on write for safety
wo.SetSync(true)
return wo
}
func defaultReadOptions() *levigo.ReadOptions {
ro := levigo.NewReadOptions()
return ro
}

View File

@ -11,37 +11,32 @@ package upside_down
import (
"bytes"
"github.com/jmhodges/levigo"
"github.com/couchbaselabs/bleve/index"
"github.com/couchbaselabs/bleve/index/store"
)
type UpsideDownCouchTermFieldReader struct {
index *UpsideDownCouch
iterator *levigo.Iterator
iterator store.KVIterator
count uint64
term []byte
field uint16
}
func newUpsideDownCouchTermFieldReader(index *UpsideDownCouch, term []byte, field uint16) (*UpsideDownCouchTermFieldReader, error) {
ro := defaultReadOptions()
it := index.db.NewIterator(ro)
tfr := NewTermFrequencyRow(term, field, "", 0, 0)
it.Seek(tfr.Key())
it := index.store.Iterator(tfr.Key())
var count uint64 = 0
if it.Valid() {
if bytes.Equal(it.Key(), tfr.Key()) {
tfr, err := NewTermFrequencyRowKV(it.Key(), it.Value())
key, val, valid := it.Current()
if valid {
if bytes.Equal(key, tfr.Key()) {
tfr, err := NewTermFrequencyRowKV(key, val)
if err != nil {
return nil, err
}
count = tfr.freq
}
} else {
return nil, it.GetError()
}
return &UpsideDownCouchTermFieldReader{
@ -59,13 +54,14 @@ func (r *UpsideDownCouchTermFieldReader) Count() uint64 {
func (r *UpsideDownCouchTermFieldReader) Next() (*index.TermFieldDoc, error) {
r.iterator.Next()
if r.iterator.Valid() {
key, val, valid := r.iterator.Current()
if valid {
testfr := NewTermFrequencyRow(r.term, r.field, "", 0, 0)
if !bytes.HasPrefix(r.iterator.Key(), testfr.Key()) {
if !bytes.HasPrefix(key, testfr.Key()) {
// end of the line
return nil, nil
}
tfr, err := NewTermFrequencyRowKV(r.iterator.Key(), r.iterator.Value())
tfr, err := NewTermFrequencyRowKV(key, val)
if err != nil {
return nil, err
}
@ -76,20 +72,21 @@ func (r *UpsideDownCouchTermFieldReader) Next() (*index.TermFieldDoc, error) {
Vectors: r.index.termFieldVectorsFromTermVectors(tfr.vectors),
}, nil
} else {
return nil, r.iterator.GetError()
return nil, nil
}
}
func (r *UpsideDownCouchTermFieldReader) Advance(docId string) (*index.TermFieldDoc, error) {
tfr := NewTermFrequencyRow(r.term, r.field, docId, 0, 0)
r.iterator.Seek(tfr.Key())
if r.iterator.Valid() {
key, val, valid := r.iterator.Current()
if valid {
testfr := NewTermFrequencyRow(r.term, r.field, "", 0, 0)
if !bytes.HasPrefix(r.iterator.Key(), testfr.Key()) {
if !bytes.HasPrefix(key, testfr.Key()) {
// end of the line
return nil, nil
}
tfr, err := NewTermFrequencyRowKV(r.iterator.Key(), r.iterator.Value())
tfr, err := NewTermFrequencyRowKV(key, val)
if err != nil {
return nil, err
}
@ -100,7 +97,7 @@ func (r *UpsideDownCouchTermFieldReader) Advance(docId string) (*index.TermField
Vectors: r.index.termFieldVectorsFromTermVectors(tfr.vectors),
}, nil
} else {
return nil, r.iterator.GetError()
return nil, nil
}
}

View File

@ -16,14 +16,15 @@ import (
_ "github.com/couchbaselabs/bleve/analysis/analyzers/standard_analyzer"
"github.com/couchbaselabs/bleve/document"
"github.com/couchbaselabs/bleve/index"
"github.com/couchbaselabs/bleve/index/store/gouchstore"
)
func TestIndexReader(t *testing.T) {
defer os.RemoveAll("test")
idx := NewUpsideDownCouch("test")
err := idx.Open()
store, err := gouchstore.Open("test")
idx := NewUpsideDownCouch(store)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -119,6 +120,9 @@ func TestIndexReader(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if match == nil {
t.Fatalf("Expected match, got nil")
}
if match.ID != "2" {
t.Errorf("Expected ID '2', got '%s'", match.ID)
}

View File

@ -14,10 +14,10 @@ import (
"math"
"github.com/couchbaselabs/bleve/analysis"
"github.com/jmhodges/levigo"
"github.com/couchbaselabs/bleve/document"
"github.com/couchbaselabs/bleve/index"
"github.com/couchbaselabs/bleve/index/store"
)
var VERSION_KEY []byte = []byte{'v'}
@ -27,24 +27,19 @@ const VERSION uint8 = 1
type UpsideDownCouch struct {
version uint8
path string
opts *levigo.Options
db *levigo.DB
store store.KVStore
fieldIndexes map[string]uint16
lastFieldIndex int
analyzer map[string]*analysis.Analyzer
docCount uint64
}
func NewUpsideDownCouch(path string) *UpsideDownCouch {
opts := levigo.NewOptions()
opts.SetCreateIfMissing(true)
func NewUpsideDownCouch(s store.KVStore) *UpsideDownCouch {
return &UpsideDownCouch{
version: VERSION,
path: path,
opts: opts,
analyzer: make(map[string]*analysis.Analyzer),
fieldIndexes: make(map[string]uint16),
store: s,
}
}
@ -59,20 +54,20 @@ func (udc *UpsideDownCouch) init() (err error) {
}
func (udc *UpsideDownCouch) loadSchema() (err error) {
// schema := make([]*index.Field, 0)
ro := defaultReadOptions()
it := udc.db.NewIterator(ro)
defer it.Close()
keyPrefix := []byte{'f'}
it := udc.store.Iterator(keyPrefix)
defer it.Close()
it.Seek(keyPrefix)
for it = it; it.Valid(); it.Next() {
key, val, valid := it.Current()
for valid {
// stop when
if !bytes.HasPrefix(it.Key(), keyPrefix) {
if !bytes.HasPrefix(key, keyPrefix) {
break
}
fieldRow, err := NewFieldRowKV(it.Key(), it.Value())
fieldRow, err := NewFieldRowKV(key, val)
if err != nil {
return err
}
@ -80,20 +75,17 @@ func (udc *UpsideDownCouch) loadSchema() (err error) {
if int(fieldRow.index) > udc.lastFieldIndex {
udc.lastFieldIndex = int(fieldRow.index)
}
}
err = it.GetError()
if err != nil {
return
it.Next()
key, val, valid = it.Current()
}
return
}
func (udc *UpsideDownCouch) batchRows(addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) (err error) {
ro := defaultReadOptions()
// prepare batch
wb := levigo.NewWriteBatch()
wb := udc.store.NewBatch()
// add
for _, row := range addRows {
@ -101,7 +93,7 @@ func (udc *UpsideDownCouch) batchRows(addRows []UpsideDownCouchRow, updateRows [
if ok {
// need to increment counter
tr := NewTermFrequencyRow(tfr.term, tfr.field, "", 0, 0)
val, err := udc.db.Get(ro, tr.Key())
val, err := udc.store.Get(tr.Key())
if err != nil {
return err
}
@ -116,14 +108,14 @@ func (udc *UpsideDownCouch) batchRows(addRows []UpsideDownCouchRow, updateRows [
}
// now add this to the batch
wb.Put(tr.Key(), tr.Value())
wb.Set(tr.Key(), tr.Value())
}
wb.Put(row.Key(), row.Value())
wb.Set(row.Key(), row.Value())
}
// update
for _, row := range updateRows {
wb.Put(row.Key(), row.Value())
wb.Set(row.Key(), row.Value())
}
// delete
@ -132,7 +124,7 @@ func (udc *UpsideDownCouch) batchRows(addRows []UpsideDownCouchRow, updateRows [
if ok {
// need to decrement counter
tr := NewTermFrequencyRow(tfr.term, tfr.field, "", 0, 0)
val, err := udc.db.Get(ro, tr.Key())
val, err := udc.store.Get(tr.Key())
if err != nil {
return err
}
@ -150,7 +142,7 @@ func (udc *UpsideDownCouch) batchRows(addRows []UpsideDownCouchRow, updateRows [
wb.Delete(tr.Key())
} else {
// now add this to the batch
wb.Put(tr.Key(), tr.Value())
wb.Set(tr.Key(), tr.Value())
}
}
@ -158,8 +150,11 @@ func (udc *UpsideDownCouch) batchRows(addRows []UpsideDownCouchRow, updateRows [
}
// write out the batch
wo := defaultWriteOptions()
err = udc.db.Write(wo, wb)
err = wb.Execute()
if err != nil {
return
}
err = udc.store.Commit()
return
}
@ -168,14 +163,8 @@ func (udc *UpsideDownCouch) DocCount() uint64 {
}
func (udc *UpsideDownCouch) Open() (err error) {
udc.db, err = levigo.Open(udc.path, udc.opts)
if err != nil {
return
}
ro := defaultReadOptions()
var value []byte
value, err = udc.db.Get(ro, VERSION_KEY)
value, err = udc.store.Get(VERSION_KEY)
if err != nil {
return
}
@ -198,41 +187,40 @@ func (udc *UpsideDownCouch) Open() (err error) {
}
func (udc *UpsideDownCouch) countDocs() uint64 {
ro := defaultReadOptions()
ro.SetFillCache(false) // dont fill the cache with this
it := udc.db.NewIterator(ro)
it := udc.store.Iterator([]byte{'b'})
defer it.Close()
// begining of back index
it.Seek([]byte{'b'})
var rv uint64 = 0
for it = it; it.Valid(); it.Next() {
if !bytes.HasPrefix(it.Key(), []byte{'b'}) {
key, _, valid := it.Current()
for valid {
if !bytes.HasPrefix(key, []byte{'b'}) {
break
}
rv += 1
it.Next()
key, _, valid = it.Current()
}
return rv
}
func (udc *UpsideDownCouch) rowCount() uint64 {
ro := defaultReadOptions()
ro.SetFillCache(false) // dont fill the cache with this
it := udc.db.NewIterator(ro)
it := udc.store.Iterator([]byte{0})
defer it.Close()
it.Seek([]byte{0})
var rv uint64 = 0
for it = it; it.Valid(); it.Next() {
_, _, valid := it.Current()
for valid {
rv += 1
it.Next()
_, _, valid = it.Current()
}
return rv
}
func (udc *UpsideDownCouch) Close() {
udc.db.Close()
udc.store.Close()
}
func (udc *UpsideDownCouch) Update(doc *document.Document) error {
@ -369,14 +357,12 @@ func (udc *UpsideDownCouch) Delete(id string) error {
}
func (udc *UpsideDownCouch) backIndexRowForDoc(docId string) (*BackIndexRow, error) {
ro := defaultReadOptions()
// use a temporary row structure to build key
tempRow := &BackIndexRow{
doc: []byte(docId),
}
key := tempRow.Key()
value, err := udc.db.Get(ro, key)
value, err := udc.store.Get(key)
if err != nil {
return nil, err
}
@ -391,26 +377,23 @@ func (udc *UpsideDownCouch) backIndexRowForDoc(docId string) (*BackIndexRow, err
}
func (udc *UpsideDownCouch) Dump() {
ro := defaultReadOptions()
ro.SetFillCache(false)
it := udc.db.NewIterator(ro)
it := udc.store.Iterator([]byte{0})
defer it.Close()
it.SeekToFirst()
for it = it; it.Valid(); it.Next() {
//fmt.Printf("Key: `%v` Value: `%v`\n", string(it.Key()), string(it.Value()))
row, err := ParseFromKeyValue(it.Key(), it.Value())
key, val, valid := it.Current()
for valid {
row, err := ParseFromKeyValue(key, val)
if err != nil {
fmt.Printf("error parsing key/value: %v", err)
return
}
if row != nil {
fmt.Printf("%v\n", row)
fmt.Printf("Key: % -100x\nValue: % -100x\n\n", it.Key(), it.Value())
fmt.Printf("Key: % -100x\nValue: % -100x\n\n", key, val)
}
}
err := it.GetError()
if err != nil {
fmt.Printf("Error reading iterator: %v", err)
it.Next()
key, val, valid = it.Current()
}
}
@ -422,18 +405,6 @@ func (udc *UpsideDownCouch) TermFieldReader(term []byte, fieldName string) (inde
return newUpsideDownCouchTermFieldReader(udc, []byte{BYTE_SEPARATOR}, 0)
}
func defaultWriteOptions() *levigo.WriteOptions {
wo := levigo.NewWriteOptions()
// request fsync on write for safety
wo.SetSync(true)
return wo
}
func defaultReadOptions() *levigo.ReadOptions {
ro := levigo.NewReadOptions()
return ro
}
func frequencyFromTokenFreq(tf *analysis.TokenFreq) int {
return len(tf.Locations)
}

View File

@ -14,13 +14,15 @@ import (
_ "github.com/couchbaselabs/bleve/analysis/analyzers/standard_analyzer"
"github.com/couchbaselabs/bleve/document"
"github.com/couchbaselabs/bleve/index/store/gouchstore"
)
func TestIndexOpenReopen(t *testing.T) {
defer os.RemoveAll("test")
idx := NewUpsideDownCouch("test")
err := idx.Open()
store, err := gouchstore.Open("test")
idx := NewUpsideDownCouch(store)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -41,7 +43,8 @@ func TestIndexOpenReopen(t *testing.T) {
// now close it
idx.Close()
idx = NewUpsideDownCouch("test")
store, err = gouchstore.Open("test")
idx = NewUpsideDownCouch(store)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -54,9 +57,9 @@ func TestIndexOpenReopen(t *testing.T) {
func TestIndexInsert(t *testing.T) {
defer os.RemoveAll("test")
idx := NewUpsideDownCouch("test")
err := idx.Open()
store, err := gouchstore.Open("test")
idx := NewUpsideDownCouch(store)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -92,9 +95,9 @@ func TestIndexInsert(t *testing.T) {
func TestIndexInsertThenDelete(t *testing.T) {
defer os.RemoveAll("test")
idx := NewUpsideDownCouch("test")
err := idx.Open()
store, err := gouchstore.Open("test")
idx := NewUpsideDownCouch(store)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -160,9 +163,9 @@ func TestIndexInsertThenDelete(t *testing.T) {
func TestIndexInsertThenUpdate(t *testing.T) {
defer os.RemoveAll("test")
idx := NewUpsideDownCouch("test")
err := idx.Open()
store, err := gouchstore.Open("test")
idx := NewUpsideDownCouch(store)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -209,9 +212,9 @@ func TestIndexInsertThenUpdate(t *testing.T) {
func TestIndexInsertMultiple(t *testing.T) {
defer os.RemoveAll("test")
idx := NewUpsideDownCouch("test")
err := idx.Open()
store, err := gouchstore.Open("test")
idx := NewUpsideDownCouch(store)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
}
@ -243,8 +246,8 @@ func TestIndexInsertMultiple(t *testing.T) {
// close and reopen and and one more to testing counting works correctly
idx.Close()
idx = NewUpsideDownCouch("test")
store, err = gouchstore.Open("test")
idx = NewUpsideDownCouch(store)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)

View File

@ -12,16 +12,23 @@ import (
"flag"
"log"
"github.com/couchbaselabs/bleve/index/store/leveldb"
"github.com/couchbaselabs/bleve/index/upside_down"
)
var indexDir = flag.String("indexDir", "index", "index directory")
var fieldsOnly = flag.Bool("fields", false, "fields only")
func main() {
flag.Parse()
index := upside_down.NewUpsideDownCouch(*indexDir)
err := index.Open()
store, err := leveldb.Open(*indexDir)
if err != nil {
log.Fatal(err)
}
index := upside_down.NewUpsideDownCouch(store)
err = index.Open()
if err != nil {
log.Fatal(err)
}