0
0
Fork 0

major improvements to index row encoding

improvements uncovered some issues with how k/v data was copied
or not.  to address this, kv abstraction layer now lets impl
specify if the bytes returned are safe to use after a reader
(or writer since writers are also readers) are closed
See index/store/KVReader - BytesSafeAfterClose() bool
false is the safe value if you're not sure
it will cause index impls to copy the data
Some kv impls already have created a copy a the C-api barrier
in which case they can safely return true.

Overall this yields ~25% speedup for searches with leveldb.
It yields ~10% speedup for boltdb.
Returning stored fields is now slower with boltdb, as previously
we were returning unsafe bytes.
This commit is contained in:
Marty Schoch 2015-04-03 16:50:48 -04:00
parent 52712b9537
commit 867110e03b
22 changed files with 232 additions and 250 deletions

View File

@ -10,8 +10,7 @@
package boltdb
import (
indexStore "github.com/blevesearch/bleve/index/store"
"github.com/boltdb/bolt"
"github.com/blevesearch/bleve/index/store"
)
type op struct {
@ -20,29 +19,9 @@ type op struct {
}
type Batch struct {
store *Store
alreadyLocked bool
ops []op
merges map[string]indexStore.AssociativeMergeChain
}
func newBatch(store *Store) *Batch {
rv := Batch{
store: store,
ops: make([]op, 0),
merges: make(map[string]indexStore.AssociativeMergeChain),
}
return &rv
}
func newBatchAlreadyLocked(store *Store) *Batch {
rv := Batch{
store: store,
alreadyLocked: true,
ops: make([]op, 0),
merges: make(map[string]indexStore.AssociativeMergeChain),
}
return &rv
writer *Writer
ops []op
merges map[string]store.AssociativeMergeChain
}
func (i *Batch) Set(key, val []byte) {
@ -53,59 +32,53 @@ func (i *Batch) Delete(key []byte) {
i.ops = append(i.ops, op{key, nil})
}
func (i *Batch) Merge(key []byte, oper indexStore.AssociativeMerge) {
func (i *Batch) Merge(key []byte, oper store.AssociativeMerge) {
opers, ok := i.merges[string(key)]
if !ok {
opers = make(indexStore.AssociativeMergeChain, 0, 1)
opers = make(store.AssociativeMergeChain, 0, 1)
}
opers = append(opers, oper)
i.merges[string(key)] = opers
}
func (i *Batch) Execute() error {
if !i.alreadyLocked {
i.store.writer.Lock()
defer i.store.writer.Unlock()
}
return i.store.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(i.store.bucket))
b := i.writer.tx.Bucket([]byte(i.writer.store.bucket))
// first process the merges
for k, mc := range i.merges {
val := b.Get([]byte(k))
var err error
val, err = mc.Merge([]byte(k), val)
// first process the merges
for k, mc := range i.merges {
val := b.Get([]byte(k))
var err error
val, err = mc.Merge([]byte(k), val)
if err != nil {
return err
}
if val == nil {
err := b.Delete([]byte(k))
if err != nil {
return err
}
if val == nil {
err := b.Delete([]byte(k))
if err != nil {
return err
}
} else {
err := b.Put([]byte(k), val)
if err != nil {
return err
}
} else {
err := b.Put([]byte(k), val)
if err != nil {
return err
}
}
}
// now process the regular get/set ops
for _, o := range i.ops {
if o.v == nil {
if err := b.Delete(o.k); err != nil {
return err
}
} else {
if err := b.Put(o.k, o.v); err != nil {
return err
}
// now process the regular get/set ops
for _, o := range i.ops {
if o.v == nil {
if err := b.Delete(o.k); err != nil {
return err
}
} else {
if err := b.Put(o.k, o.v); err != nil {
return err
}
}
}
return nil
})
return nil
}
func (i *Batch) Close() error {

View File

@ -15,7 +15,6 @@ import (
type Iterator struct {
store *Store
ownTx bool
tx *bolt.Tx
cursor *bolt.Cursor
valid bool
@ -23,45 +22,18 @@ type Iterator struct {
val []byte
}
func newIterator(store *Store) *Iterator {
tx, _ := store.db.Begin(false)
b := tx.Bucket([]byte(store.bucket))
cursor := b.Cursor()
return &Iterator{
store: store,
tx: tx,
ownTx: true,
cursor: cursor,
}
}
func newIteratorExistingTx(store *Store, tx *bolt.Tx) *Iterator {
b := tx.Bucket([]byte(store.bucket))
cursor := b.Cursor()
return &Iterator{
store: store,
tx: tx,
cursor: cursor,
}
}
func (i *Iterator) SeekFirst() {
i.key, i.val = i.cursor.First()
i.valid = (i.key != nil)
}
func (i *Iterator) Seek(k []byte) {
i.key, i.val = i.cursor.Seek(k)
i.valid = (i.key != nil)
}
func (i *Iterator) Next() {
i.key, i.val = i.cursor.Next()
i.valid = (i.key != nil)
}
@ -82,9 +54,5 @@ func (i *Iterator) Valid() bool {
}
func (i *Iterator) Close() error {
// only close the transaction if we opened it
if i.ownTx {
return i.tx.Rollback()
}
return nil
}

View File

@ -19,15 +19,8 @@ type Reader struct {
tx *bolt.Tx
}
func newReader(store *Store) (*Reader, error) {
tx, err := store.db.Begin(false)
if err != nil {
return nil, err
}
return &Reader{
store: store,
tx: tx,
}, nil
func (r *Reader) BytesSafeAfterClose() bool {
return false
}
func (r *Reader) Get(key []byte) ([]byte, error) {
@ -36,7 +29,15 @@ func (r *Reader) Get(key []byte) ([]byte, error) {
}
func (r *Reader) Iterator(key []byte) store.KVIterator {
rv := newIteratorExistingTx(r.store, r.tx)
b := r.tx.Bucket([]byte(r.store.bucket))
cursor := b.Cursor()
rv := &Iterator{
store: r.store,
tx: r.tx,
cursor: cursor,
}
rv.Seek(key)
return rv
}

View File

@ -51,62 +51,37 @@ func Open(path string, bucket string) (*Store, error) {
return &rv, nil
}
func (bs *Store) get(key []byte) ([]byte, error) {
var rv []byte
err := bs.db.View(func(tx *bolt.Tx) error {
rv = tx.Bucket([]byte(bs.bucket)).Get(key)
return nil
})
return rv, err
}
func (bs *Store) set(key, val []byte) error {
bs.writer.Lock()
defer bs.writer.Unlock()
return bs.setlocked(key, val)
}
func (bs *Store) setlocked(key, val []byte) error {
return bs.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket([]byte(bs.bucket)).Put(key, val)
})
}
func (bs *Store) delete(key []byte) error {
bs.writer.Lock()
defer bs.writer.Unlock()
return bs.deletelocked(key)
}
func (bs *Store) deletelocked(key []byte) error {
return bs.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket([]byte(bs.bucket)).Delete(key)
})
}
func (bs *Store) Close() error {
return bs.db.Close()
}
func (bs *Store) iterator(key []byte) store.KVIterator {
rv := newIterator(bs)
rv.Seek(key)
return rv
}
func (bs *Store) Reader() (store.KVReader, error) {
return newReader(bs)
tx, err := bs.db.Begin(false)
if err != nil {
return nil, err
}
return &Reader{
store: bs,
tx: tx,
}, nil
}
func (bs *Store) Writer() (store.KVWriter, error) {
return newWriter(bs)
}
func (bs *Store) newBatch() store.KVBatch {
return newBatch(bs)
bs.writer.Lock()
tx, err := bs.db.Begin(true)
if err != nil {
bs.writer.Unlock()
return nil, err
}
reader := &Reader{
store: bs,
tx: tx,
}
return &Writer{
store: bs,
tx: tx,
reader: reader,
}, nil
}
func StoreConstructor(config map[string]interface{}) (store.KVStore, error) {

View File

@ -11,43 +11,45 @@ package boltdb
import (
"github.com/blevesearch/bleve/index/store"
"github.com/boltdb/bolt"
)
type Writer struct {
store *Store
}
func newWriter(store *Store) (*Writer, error) {
store.writer.Lock()
return &Writer{
store: store,
}, nil
store *Store
tx *bolt.Tx
reader *Reader
}
func (w *Writer) Set(key, val []byte) error {
return w.store.setlocked(key, val)
return w.tx.Bucket([]byte(w.store.bucket)).Put(key, val)
}
func (w *Writer) Delete(key []byte) error {
return w.store.deletelocked(key)
return w.tx.Bucket([]byte(w.store.bucket)).Delete(key)
}
func (w *Writer) NewBatch() store.KVBatch {
return newBatchAlreadyLocked(w.store)
rv := Batch{
writer: w,
ops: make([]op, 0),
merges: make(map[string]store.AssociativeMergeChain),
}
return &rv
}
func (w *Writer) Close() error {
w.store.writer.Unlock()
return nil
return w.tx.Commit()
}
func (w *Writer) BytesSafeAfterClose() bool {
return w.reader.BytesSafeAfterClose()
}
// these two methods can safely read using the regular
// methods without a read transaction, because we know
// that no one else is writing but us
func (w *Writer) Get(key []byte) ([]byte, error) {
return w.store.get(key)
return w.reader.Get(key)
}
func (w *Writer) Iterator(key []byte) store.KVIterator {
return w.store.iterator(key)
return w.reader.Iterator(key)
}

View File

@ -77,6 +77,10 @@ func (s *Store) Reader() (store.KVReader, error) {
return s, nil
}
func (s *Store) BytesSafeAfterClose() bool {
return false
}
func (s *Store) Writer() (store.KVWriter, error) {
return s, nil
}

View File

@ -23,6 +23,10 @@ type Reader struct {
snapshot *forestdb.KVStore
}
func (r *Reader) BytesSafeAfterClose() bool {
return true
}
func newReader(store *Store) (*Reader, error) {
snapshot, err := store.newSnapshot()
if err != nil {

View File

@ -19,6 +19,10 @@ type Writer struct {
store *Store
}
func (w *Writer) BytesSafeAfterClose() bool {
return true
}
func newWriter(store *Store) (*Writer, error) {
store.writer.Lock()
return &Writer{

View File

@ -110,6 +110,10 @@ func (s *Store) Writer() (store.KVWriter, error) {
return &Writer{s: s}, nil
}
func (w *Reader) BytesSafeAfterClose() bool {
return false
}
func (w *Reader) Get(k []byte) (v []byte, err error) {
itm := w.t.Get(&Item{k: k})
if itm != nil {
@ -126,6 +130,10 @@ func (w *Reader) Close() error {
return nil
}
func (w *Writer) BytesSafeAfterClose() bool {
return false
}
func (w *Writer) Get(k []byte) (v []byte, err error) {
w.s.m.Lock()
t := w.s.t
@ -338,4 +346,3 @@ func (w *Batch) Execute() (err error) {
func (w *Batch) Close() error {
return nil
}

View File

@ -23,6 +23,10 @@ func newReader(store *Store) (*Reader, error) {
}, nil
}
func (r *Reader) BytesSafeAfterClose() bool {
return false
}
func (r *Reader) Get(key []byte) ([]byte, error) {
return r.store.get(key)
}

View File

@ -24,6 +24,10 @@ func newWriter(store *Store) (*Writer, error) {
}, nil
}
func (w *Writer) BytesSafeAfterClose() bool {
return false
}
func (w *Writer) Set(key, val []byte) error {
return w.store.setlocked(key, val)
}

View File

@ -44,6 +44,7 @@ type KVWriter interface {
}
type KVReader interface {
BytesSafeAfterClose() bool
Get(key []byte) ([]byte, error)
Iterator(key []byte) KVIterator
Close() error

View File

@ -28,6 +28,10 @@ func newReader(store *Store) (*Reader, error) {
}, nil
}
func (r *Reader) BytesSafeAfterClose() bool {
return true
}
func (r *Reader) Get(key []byte) ([]byte, error) {
return r.store.getWithSnapshot(key, r.snapshot)
}

View File

@ -26,6 +26,10 @@ func newWriter(store *Store) (*Writer, error) {
}, nil
}
func (w *Writer) BytesSafeAfterClose() bool {
return true
}
func (w *Writer) Set(key, val []byte) error {
return w.store.setlocked(key, val)
}

View File

@ -157,6 +157,10 @@ func (s *Store) Actual() store.KVStore {
return s.o
}
func (w *Reader) BytesSafeAfterClose() bool {
return w.BytesSafeAfterClose()
}
func (w *Reader) Get(key []byte) (v []byte, err error) {
w.s.TimerReaderGet.Time(func() {
v, err = w.o.Get(key)
@ -182,6 +186,10 @@ func (w *Reader) Close() error {
return err
}
func (w *Writer) BytesSafeAfterClose() bool {
return w.BytesSafeAfterClose()
}
func (w *Writer) Get(key []byte) (v []byte, err error) {
w.s.TimerWriterGet.Time(func() {
v, err = w.o.Get(key)

View File

@ -51,6 +51,10 @@ func newReader(store *Store) (*Reader, error) {
}, nil
}
func (r *Reader) BytesSafeAfterClose() bool {
return true
}
func (r *Reader) Get(key []byte) ([]byte, error) {
return nil, nil
}
@ -129,6 +133,10 @@ func newWriter(store *Store) (*Writer, error) {
}, nil
}
func (w *Writer) BytesSafeAfterClose() bool {
return true
}
func (w *Writer) Set(key, val []byte) error {
return nil
}

View File

@ -70,7 +70,12 @@ func (i *IndexReader) Document(id string) (*document.Document, error) {
if !bytes.HasPrefix(key, storedRowScanPrefix) {
break
}
row, err := NewStoredRowKV(key, val)
safeVal := val
if !i.kvreader.BytesSafeAfterClose() {
safeVal = make([]byte, len(val))
copy(safeVal, val)
}
row, err := NewStoredRowKV(key, safeVal)
if err != nil {
return nil, err
}

View File

@ -17,11 +17,12 @@ import (
)
type UpsideDownCouchTermFieldReader struct {
indexReader *IndexReader
iterator store.KVIterator
count uint64
term []byte
field uint16
indexReader *IndexReader
iterator store.KVIterator
count uint64
term []byte
field uint16
readerPrefix []byte
}
func newUpsideDownCouchTermFieldReader(indexReader *IndexReader, term []byte, field uint16) (*UpsideDownCouchTermFieldReader, error) {
@ -44,14 +45,16 @@ func newUpsideDownCouchTermFieldReader(indexReader *IndexReader, term []byte, fi
}
tfr := NewTermFrequencyRow(term, field, "", 0, 0)
it := indexReader.kvreader.Iterator(tfr.Key())
readerPrefix := tfr.Key()
it := indexReader.kvreader.Iterator(readerPrefix)
return &UpsideDownCouchTermFieldReader{
indexReader: indexReader,
iterator: it,
count: dictionaryRow.count,
term: term,
field: field,
indexReader: indexReader,
iterator: it,
count: dictionaryRow.count,
term: term,
field: field,
readerPrefix: readerPrefix,
}, nil
}
@ -63,8 +66,7 @@ func (r *UpsideDownCouchTermFieldReader) Next() (*index.TermFieldDoc, error) {
if r.iterator != nil {
key, val, valid := r.iterator.Current()
if valid {
testfr := NewTermFrequencyRow(r.term, r.field, "", 0, 0)
if !bytes.HasPrefix(key, testfr.Key()) {
if !bytes.HasPrefix(key, r.readerPrefix) {
// end of the line
return nil, nil
}
@ -90,8 +92,7 @@ func (r *UpsideDownCouchTermFieldReader) Advance(docID string) (*index.TermField
r.iterator.Seek(tfr.Key())
key, val, valid := r.iterator.Current()
if valid {
testfr := NewTermFrequencyRow(r.term, r.field, "", 0, 0)
if !bytes.HasPrefix(key, testfr.Key()) {
if !bytes.HasPrefix(key, r.readerPrefix) {
// end of the line
return nil, nil
}

View File

@ -364,58 +364,50 @@ func NewTermFrequencyRowWithTermVectors(term []byte, field uint16, doc string, f
}
func NewTermFrequencyRowK(key []byte) (*TermFrequencyRow, error) {
rv := TermFrequencyRow{
doc: []byte(""),
}
buf := bytes.NewBuffer(key)
_, err := buf.ReadByte() // type
if err != nil {
return nil, err
rv := TermFrequencyRow{}
keyLen := len(key)
if keyLen < 3 {
return nil, fmt.Errorf("invalid term frequency key, no valid field")
}
rv.field = binary.LittleEndian.Uint16(key[1:3])
err = binary.Read(buf, binary.LittleEndian, &rv.field)
if err != nil {
return nil, err
termEndPos := bytes.IndexByte(key[3:], ByteSeparator)
if termEndPos < 0 {
return nil, fmt.Errorf("invalid term frequency key, no byte separator terminating term")
}
rv.term = key[3 : 3+termEndPos]
rv.term, err = buf.ReadBytes(ByteSeparator)
if err != nil {
return nil, err
}
rv.term = rv.term[:len(rv.term)-1] // trim off separator byte
doc, err := buf.ReadBytes(ByteSeparator)
if err != io.EOF {
return nil, err
}
if doc != nil {
rv.doc = doc
docLen := len(key) - (3 + termEndPos + 1)
if docLen < 1 {
return nil, fmt.Errorf("invalid term frequency key, empty docid")
}
rv.doc = key[3+termEndPos+1:]
return &rv, nil
}
func (tfr *TermFrequencyRow) parseV(value []byte) error {
buf := bytes.NewBuffer((value))
freq, err := binary.ReadUvarint(buf)
if err != nil {
return err
currOffset := 0
bytesRead := 0
tfr.freq, bytesRead = binary.Uvarint(value[currOffset:])
if bytesRead <= 0 {
return fmt.Errorf("invalid term frequency value, invalid frequency")
}
tfr.freq = freq
currOffset += bytesRead
norm, err := binary.ReadUvarint(buf)
if err != nil {
return err
var norm uint64
norm, bytesRead = binary.Uvarint(value[currOffset:])
if bytesRead <= 0 {
return fmt.Errorf("invalid term frequency value, no norm")
}
currOffset += bytesRead
tfr.norm = math.Float32frombits(uint32(norm))
field, err := binary.ReadUvarint(buf)
if err != nil && err != io.EOF {
return err
}
for err != io.EOF {
var field uint64
field, bytesRead = binary.Uvarint(value[currOffset:])
for bytesRead > 0 {
currOffset += bytesRead
tv := TermVector{}
tv.field = uint16(field)
// at this point we expect at least one term vector
@ -423,31 +415,32 @@ func (tfr *TermFrequencyRow) parseV(value []byte) error {
tfr.vectors = make([]*TermVector, 0)
}
var pos uint64
pos, err = binary.ReadUvarint(buf)
if err != nil {
return err
tv.pos, bytesRead = binary.Uvarint(value[currOffset:])
if bytesRead <= 0 {
return fmt.Errorf("invalid term frequency value, vector contains no position")
}
tv.pos = pos
currOffset += bytesRead
var start uint64
start, err = binary.ReadUvarint(buf)
if err != nil {
return err
tv.start, bytesRead = binary.Uvarint(value[currOffset:])
if bytesRead <= 0 {
return fmt.Errorf("invalid term frequency value, vector contains no start")
}
tv.start = start
currOffset += bytesRead
var end uint64
end, err = binary.ReadUvarint(buf)
if err != nil {
return err
tv.end, bytesRead = binary.Uvarint(value[currOffset:])
if bytesRead <= 0 {
return fmt.Errorf("invalid term frequency value, vector contains no end")
}
tv.end = end
currOffset += bytesRead
tfr.vectors = append(tfr.vectors, &tv)
// try to read next record (may not exist)
field, err = binary.ReadUvarint(buf)
field, bytesRead = binary.Uvarint(value[currOffset:])
}
if len(value[currOffset:]) > 0 && bytesRead <= 0 {
return fmt.Errorf("invalid term frequency value, vector field invalid")
}
return nil
}

View File

@ -48,8 +48,8 @@ func TestRows(t *testing.T) {
[]byte{27},
},
{
NewTermFrequencyRow([]byte{'b', 'e', 'e', 'r'}, 0, "", 3, 3.14),
[]byte{'t', 0, 0, 'b', 'e', 'e', 'r', ByteSeparator},
NewTermFrequencyRow([]byte{'b', 'e', 'e', 'r'}, 0, "catz", 3, 3.14),
[]byte{'t', 0, 0, 'b', 'e', 'e', 'r', ByteSeparator, 'c', 'a', 't', 'z'},
[]byte{3, 195, 235, 163, 130, 4},
},
{
@ -178,27 +178,27 @@ func TestInvalidRows(t *testing.T) {
// type t, invalid val (missing norm)
{
[]byte{'t', 0, 0, 'b', 'e', 'e', 'r', ByteSeparator, 'b', 'u', 'd', 'w', 'e', 'i', 's', 'e', 'r'},
[]byte{3, 0, 0, 0, 0, 0, 0, 0},
[]byte{3},
},
// type t, invalid val (half missing tv field, full missing is valid (no term vectors))
{
[]byte{'t', 0, 0, 'b', 'e', 'e', 'r', ByteSeparator, 'b', 'u', 'd', 'w', 'e', 'i', 's', 'e', 'r'},
[]byte{3, 0, 0, 0, 0, 0, 0, 0, 195, 245, 72, 64, 0},
[]byte{3, 25, 255},
},
// type t, invalid val (missing tv pos)
{
[]byte{'t', 0, 0, 'b', 'e', 'e', 'r', ByteSeparator, 'b', 'u', 'd', 'w', 'e', 'i', 's', 'e', 'r'},
[]byte{3, 0, 0, 0, 0, 0, 0, 0, 195, 245, 72, 64, 0, 0},
[]byte{3, 25, 0},
},
// type t, invalid val (missing tv start)
{
[]byte{'t', 0, 0, 'b', 'e', 'e', 'r', ByteSeparator, 'b', 'u', 'd', 'w', 'e', 'i', 's', 'e', 'r'},
[]byte{3, 0, 0, 0, 0, 0, 0, 0, 195, 245, 72, 64, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0},
[]byte{3, 25, 0, 0},
},
// type t, invalid val (missing tv end)
{
[]byte{'t', 0, 0, 'b', 'e', 'e', 'r', ByteSeparator, 'b', 'u', 'd', 'w', 'e', 'i', 's', 'e', 'r'},
[]byte{3, 0, 0, 0, 0, 0, 0, 0, 195, 245, 72, 64, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0},
[]byte{3, 25, 0, 0, 0},
},
// type b, invalid key (missing id)
{

View File

@ -192,11 +192,17 @@ func (s *BooleanSearcher) Next() (*search.DocumentMatch, error) {
}
if s.currShould != nil && s.currShould.ID == s.currentID {
// score bonus matches should
cons := []*search.DocumentMatch{}
var cons []*search.DocumentMatch
if s.currMust != nil {
cons = append(cons, s.currMust)
cons = []*search.DocumentMatch{
s.currMust,
s.currShould,
}
} else {
cons = []*search.DocumentMatch{
s.currShould,
}
}
cons = append(cons, s.currShould)
rv = s.scorer.Score(cons)
err = s.advanceNextMust()
if err != nil {
@ -214,11 +220,17 @@ func (s *BooleanSearcher) Next() (*search.DocumentMatch, error) {
}
} else if s.currShould != nil && s.currShould.ID == s.currentID {
// score bonus matches should
cons := []*search.DocumentMatch{}
var cons []*search.DocumentMatch
if s.currMust != nil {
cons = append(cons, s.currMust)
cons = []*search.DocumentMatch{
s.currMust,
s.currShould,
}
} else {
cons = []*search.DocumentMatch{
s.currShould,
}
}
cons = append(cons, s.currShould)
rv = s.scorer.Score(cons)
err = s.advanceNextMust()
if err != nil {

View File

@ -111,7 +111,7 @@ func (s *DisjunctionSearcher) Next() (*search.DocumentMatch, error) {
}
var err error
var rv *search.DocumentMatch
matching := make([]*search.DocumentMatch, 0)
matching := make([]*search.DocumentMatch, 0, len(s.searchers))
found := false
for !found && s.currentID != "" {