0
0

adding initial version of bolt persisted segment

This commit is contained in:
Marty Schoch 2017-12-05 13:05:12 -05:00
parent f6be841668
commit ece27ef215
9 changed files with 2471 additions and 0 deletions

View File

@ -0,0 +1,500 @@
// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bolt
import (
"bytes"
"encoding/binary"
"math"
"github.com/RoaringBitmap/roaring"
"github.com/Smerity/govarint"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
"github.com/boltdb/bolt"
"github.com/couchbaselabs/vellum"
"github.com/golang/snappy"
)
var fieldsBucket = []byte{'a'}
var dictBucket = []byte{'b'}
var postingsBucket = []byte{'c'}
var postingDetailsBucket = []byte{'d'}
var storedBucket = []byte{'e'}
var configBucket = []byte{'x'}
var indexLocsKey = []byte{'l'}
var freqNormKey = []byte{'a'}
var locKey = []byte{'b'}
var metaKey = []byte{'a'}
var dataKey = []byte{'b'}
var chunkKey = []byte{'c'}
var versionKey = []byte{'v'}
var version = 0
func persistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (err error) {
db, err := bolt.Open(path, 0777, nil)
if err != nil {
return err
}
defer func() {
if cerr := db.Close(); err == nil && cerr != nil {
err = cerr
}
}()
tx, err := db.Begin(true)
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
} else {
_ = tx.Rollback()
}
}()
err = persistFields(memSegment, tx)
if err != nil {
return err
}
err = persistDictionary(memSegment, tx)
if err != nil {
return err
}
err = persistPostings(memSegment, tx)
if err != nil {
return err
}
err = persistPostingsDetails(memSegment, tx, chunkFactor)
if err != nil {
return err
}
err = persistStored(memSegment, tx)
if err != nil {
return err
}
err = persistConfig(tx, chunkFactor)
if err != nil {
return err
}
return nil
}
// persistFields puts the fields as separate k/v pairs in the fields bucket
// makes very little attempt to squeeze a lot of perf because it is expected
// this is usually somewhat small, and when re-opened it will be read once and
// kept on the heap, and not read out of the file subsequently
func persistFields(memSegment *mem.Segment, tx *bolt.Tx) error {
bucket, err := tx.CreateBucket(fieldsBucket)
if err != nil {
return err
}
bucket.FillPercent = 1.0
// build/persist a bitset corresponding to the field locs array
indexLocs := roaring.NewBitmap()
for i, indexLoc := range memSegment.FieldsLoc {
if indexLoc {
indexLocs.AddInt(i)
}
}
var indexLocsBuffer bytes.Buffer
_, err = indexLocs.WriteTo(&indexLocsBuffer)
if err != nil {
return err
}
err = bucket.Put(indexLocsKey, indexLocsBuffer.Bytes())
if err != nil {
return err
}
// we use special varint which is still guaranteed to sort correctly
fieldBuf := make([]byte, 0, maxVarintSize)
for fieldID, fieldName := range memSegment.FieldsInv {
if fieldID != 0 {
// reset buffer if necessary
fieldBuf = fieldBuf[:0]
}
fieldBuf = EncodeUvarintAscending(fieldBuf, uint64(fieldID))
err = bucket.Put(fieldBuf, []byte(fieldName))
if err != nil {
return err
}
}
return nil
}
func persistDictionary(memSegment *mem.Segment, tx *bolt.Tx) error {
bucket, err := tx.CreateBucket(dictBucket)
if err != nil {
return err
}
bucket.FillPercent = 1.0
// TODO consider whether or not there is benefit to building the vellums
// concurrently. While we have to insert them into the bolt in order,
// the (presumably) heavier lifting involved in building the FST could
// be done concurrently.
fieldBuf := make([]byte, 0, maxVarintSize)
for fieldID, fieldTerms := range memSegment.DictKeys {
if fieldID != 0 {
// reset buffers if necessary
fieldBuf = fieldBuf[:0]
}
// start a new vellum for this field
var buffer bytes.Buffer
builder, err := vellum.New(&buffer, nil)
if err != nil {
return err
}
dict := memSegment.Dicts[fieldID]
// now walk the dictionary in order of fieldTerms (already sorted)
for i := range fieldTerms {
err = builder.Insert([]byte(fieldTerms[i]), dict[fieldTerms[i]]-1)
if err != nil {
return err
}
}
err = builder.Close()
if err != nil {
return err
}
// put this FST into bolt
// we use special varint which is still guaranteed to sort correctly
fieldBuf = EncodeUvarintAscending(fieldBuf, uint64(fieldID))
err = bucket.Put(fieldBuf, buffer.Bytes())
if err != nil {
return err
}
}
return nil
}
func persistPostings(memSegment *mem.Segment, tx *bolt.Tx) error {
bucket, err := tx.CreateBucket(postingsBucket)
if err != nil {
return err
}
bucket.FillPercent = 1.0
postingIDBuf := make([]byte, 0, maxVarintSize)
for postingID := range memSegment.Postings {
if postingID != 0 {
// reset buffers if necessary
postingIDBuf = postingIDBuf[:0]
}
postingIDBuf = EncodeUvarintAscending(postingIDBuf, uint64(postingID))
var postingsBuf bytes.Buffer
_, err := memSegment.Postings[postingID].WriteTo(&postingsBuf)
if err != nil {
return err
}
err = bucket.Put(postingIDBuf, postingsBuf.Bytes())
if err != nil {
return err
}
}
return nil
}
func persistPostingsDetails(memSegment *mem.Segment, tx *bolt.Tx,
chunkFactor uint32) error {
bucket, err := tx.CreateBucket(postingDetailsBucket)
if err != nil {
return err
}
bucket.FillPercent = 1.0
postingIDBuf := make([]byte, 0, maxVarintSize)
for postingID := range memSegment.Postings {
if postingID != 0 {
// reset buffers if necessary
postingIDBuf = postingIDBuf[:0]
}
postingIDBuf = EncodeUvarintAscending(postingIDBuf, uint64(postingID))
// make bucket for posting details
postingBucket, err := bucket.CreateBucket(postingIDBuf)
if err != nil {
return err
}
postingBucket.FillPercent = 1.0
err = persistPostingDetails(memSegment, postingBucket, postingID, chunkFactor)
if err != nil {
return err
}
}
return nil
}
func persistPostingDetails(memSegment *mem.Segment, postingBucket *bolt.Bucket,
postingID int, chunkFactor uint32) error {
// walk the postings list
var err error
var chunkBucket *bolt.Bucket
var currChunk uint32
chunkIDBuf := make([]byte, 0, maxVarintSize)
postingsListItr := memSegment.Postings[postingID].Iterator()
var encoder *govarint.Base128Encoder
var locEncoder *govarint.Base128Encoder
encodingBuf := &bytes.Buffer{}
locEncodingBuf := &bytes.Buffer{}
var offset int
var locOffset int
for postingsListItr.HasNext() {
docNum := postingsListItr.Next()
chunk := docNum / chunkFactor
// create new chunk bucket if necessary
if chunkBucket == nil || currChunk != chunk {
// close out last chunk
if chunkBucket != nil {
// fix me write freq/norms
encoder.Close()
err = chunkBucket.Put(freqNormKey, encodingBuf.Bytes())
if err != nil {
return err
}
locEncoder.Close()
err = chunkBucket.Put(locKey, locEncodingBuf.Bytes())
if err != nil {
return err
}
// reset for next
chunkIDBuf = chunkIDBuf[:0]
encodingBuf = &bytes.Buffer{}
locEncodingBuf = &bytes.Buffer{}
}
// prepare next chunk
chunkIDBuf = EncodeUvarintAscending(chunkIDBuf, uint64(chunk))
chunkBucket, err = postingBucket.CreateBucket(chunkIDBuf)
if err != nil {
return err
}
chunkBucket.FillPercent = 1.0
currChunk = chunk
encoder = govarint.NewU64Base128Encoder(encodingBuf)
locEncoder = govarint.NewU64Base128Encoder(locEncodingBuf)
}
// put freq
_, err = encoder.PutU64(memSegment.Freqs[postingID][offset])
if err != nil {
return err
}
// put norm
norm := memSegment.Norms[postingID][offset]
normBits := math.Float32bits(norm)
_, err = encoder.PutU32(normBits)
if err != nil {
return err
}
// put locations
for i := 0; i < int(memSegment.Freqs[postingID][offset]); i++ {
if len(memSegment.Locfields[postingID]) > 0 {
// put field
_, err = locEncoder.PutU64(uint64(memSegment.Locfields[postingID][locOffset]))
if err != nil {
return err
}
// put pos
_, err = locEncoder.PutU64(memSegment.Locpos[postingID][locOffset])
if err != nil {
return err
}
// put start
_, err = locEncoder.PutU64(memSegment.Locstarts[postingID][locOffset])
if err != nil {
return err
}
// put end
_, err = locEncoder.PutU64(memSegment.Locends[postingID][locOffset])
if err != nil {
return err
}
// put array positions
num := len(memSegment.Locarraypos[postingID][locOffset])
// put the number of array positions to follow
_, err = locEncoder.PutU64(uint64(num))
if err != nil {
return err
}
// put each array position
for j := 0; j < num; j++ {
_, err = locEncoder.PutU64(memSegment.Locarraypos[postingID][locOffset][j])
if err != nil {
return err
}
}
}
locOffset++
}
offset++
}
// close out last chunk
if chunkBucket != nil {
// fix me write freq/norms
encoder.Close()
err = chunkBucket.Put(freqNormKey, encodingBuf.Bytes())
if err != nil {
return err
}
locEncoder.Close()
err = chunkBucket.Put(locKey, locEncodingBuf.Bytes())
if err != nil {
return err
}
}
return nil
}
func persistStored(memSegment *mem.Segment, tx *bolt.Tx) error {
bucket, err := tx.CreateBucket(storedBucket)
if err != nil {
return err
}
bucket.FillPercent = 1.0
var curr int
// we use special varint which is still guaranteed to sort correctly
docNumBuf := make([]byte, 0, maxVarintSize)
for docNum, storedValues := range memSegment.Stored {
var metaBuf bytes.Buffer
var data, compressed []byte
if docNum != 0 {
// reset buffer if necessary
docNumBuf = docNumBuf[:0]
curr = 0
}
// create doc sub-bucket
docNumBuf = EncodeUvarintAscending(docNumBuf, uint64(docNum))
docBucket, err := bucket.CreateBucket(docNumBuf)
if err != nil {
return err
}
docBucket.FillPercent = 1.0
metaEncoder := govarint.NewU64Base128Encoder(&metaBuf)
// encode fields in order
for fieldID := range memSegment.FieldsInv {
if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok {
// has stored values for this field
num := len(storedFieldValues)
// process each value
for i := 0; i < num; i++ {
// encode field
metaEncoder.PutU64(uint64(fieldID))
// encode type
metaEncoder.PutU64(uint64(memSegment.StoredTypes[docNum][uint16(fieldID)][i]))
// encode start offset
metaEncoder.PutU64(uint64(curr))
// end len
metaEncoder.PutU64(uint64(len(storedFieldValues[i])))
// encode number of array pos
metaEncoder.PutU64(uint64(len(memSegment.StoredPos[docNum][uint16(fieldID)][i])))
// encode all array positions
for j := 0; j < len(memSegment.StoredPos[docNum][uint16(fieldID)][i]); j++ {
metaEncoder.PutU64(memSegment.StoredPos[docNum][uint16(fieldID)][i][j])
}
// append data
data = append(data, storedFieldValues[i]...)
// update curr
curr += len(storedFieldValues[i])
}
}
}
metaEncoder.Close()
err = docBucket.Put(metaKey, metaBuf.Bytes())
if err != nil {
return err
}
// compress data
compressed = snappy.Encode(compressed, data)
err = docBucket.Put(dataKey, compressed)
if err != nil {
return err
}
}
return nil
}
func persistConfig(tx *bolt.Tx, chunkFactor uint32) error {
bucket, err := tx.CreateBucket(configBucket)
if err != nil {
return err
}
chunkVal := make([]byte, 4)
binary.BigEndian.PutUint32(chunkVal, chunkFactor)
err = bucket.Put(chunkKey, chunkVal)
if err != nil {
return err
}
err = bucket.Put(versionKey, []byte{byte(version)})
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,288 @@
// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bolt
import (
"os"
"testing"
"github.com/blevesearch/bleve/analysis"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
)
func TestBuild(t *testing.T) {
os.RemoveAll("/tmp/scorch.bolt")
memSegment := buildMemSegment()
err := persistSegment(memSegment, "/tmp/scorch.bolt", 1024)
if err != nil {
t.Fatal(err)
}
}
func buildMemSegment() *mem.Segment {
doc := &document.Document{
ID: "a",
Fields: []document.Field{
document.NewTextFieldCustom("_id", nil, []byte("a"), document.IndexField|document.StoreField, nil),
document.NewTextFieldCustom("name", nil, []byte("wow"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
document.NewTextFieldCustom("desc", nil, []byte("some thing"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
document.NewTextFieldCustom("tag", []uint64{0}, []byte("cold"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
document.NewTextFieldCustom("tag", []uint64{1}, []byte("dark"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
},
CompositeFields: []*document.CompositeField{
document.NewCompositeField("_all", true, nil, []string{"_id"}),
},
}
// forge analyzed docs
results := []*index.AnalysisResult{
&index.AnalysisResult{
Document: doc,
Analyzed: []analysis.TokenFrequencies{
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 1,
Position: 1,
Term: []byte("a"),
},
}, nil, false),
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 3,
Position: 1,
Term: []byte("wow"),
},
}, nil, true),
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 4,
Position: 1,
Term: []byte("some"),
},
&analysis.Token{
Start: 5,
End: 10,
Position: 2,
Term: []byte("thing"),
},
}, nil, true),
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 4,
Position: 1,
Term: []byte("cold"),
},
}, []uint64{0}, true),
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 4,
Position: 1,
Term: []byte("dark"),
},
}, []uint64{1}, true),
},
Length: []int{
1,
1,
2,
1,
1,
},
},
}
// fix up composite fields
for _, ar := range results {
for i, f := range ar.Document.Fields {
for _, cf := range ar.Document.CompositeFields {
cf.Compose(f.Name(), ar.Length[i], ar.Analyzed[i])
}
}
}
return mem.NewFromAnalyzedDocs(results)
}
func buildMemSegmentMulti() *mem.Segment {
doc := &document.Document{
ID: "a",
Fields: []document.Field{
document.NewTextFieldCustom("_id", nil, []byte("a"), document.IndexField|document.StoreField, nil),
document.NewTextFieldCustom("name", nil, []byte("wow"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
document.NewTextFieldCustom("desc", nil, []byte("some thing"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
document.NewTextFieldCustom("tag", []uint64{0}, []byte("cold"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
document.NewTextFieldCustom("tag", []uint64{1}, []byte("dark"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
},
CompositeFields: []*document.CompositeField{
document.NewCompositeField("_all", true, nil, []string{"_id"}),
},
}
doc2 := &document.Document{
ID: "b",
Fields: []document.Field{
document.NewTextFieldCustom("_id", nil, []byte("b"), document.IndexField|document.StoreField, nil),
document.NewTextFieldCustom("name", nil, []byte("who"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
document.NewTextFieldCustom("desc", nil, []byte("some thing"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
document.NewTextFieldCustom("tag", []uint64{0}, []byte("cold"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
document.NewTextFieldCustom("tag", []uint64{1}, []byte("dark"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
},
CompositeFields: []*document.CompositeField{
document.NewCompositeField("_all", true, nil, []string{"_id"}),
},
}
// forge analyzed docs
results := []*index.AnalysisResult{
&index.AnalysisResult{
Document: doc,
Analyzed: []analysis.TokenFrequencies{
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 1,
Position: 1,
Term: []byte("a"),
},
}, nil, false),
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 3,
Position: 1,
Term: []byte("wow"),
},
}, nil, true),
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 4,
Position: 1,
Term: []byte("some"),
},
&analysis.Token{
Start: 5,
End: 10,
Position: 2,
Term: []byte("thing"),
},
}, nil, true),
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 4,
Position: 1,
Term: []byte("cold"),
},
}, []uint64{0}, true),
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 4,
Position: 1,
Term: []byte("dark"),
},
}, []uint64{1}, true),
},
Length: []int{
1,
1,
2,
1,
1,
},
},
&index.AnalysisResult{
Document: doc2,
Analyzed: []analysis.TokenFrequencies{
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 1,
Position: 1,
Term: []byte("b"),
},
}, nil, false),
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 3,
Position: 1,
Term: []byte("who"),
},
}, nil, true),
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 4,
Position: 1,
Term: []byte("some"),
},
&analysis.Token{
Start: 5,
End: 10,
Position: 2,
Term: []byte("thing"),
},
}, nil, true),
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 4,
Position: 1,
Term: []byte("cold"),
},
}, []uint64{0}, true),
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 4,
Position: 1,
Term: []byte("dark"),
},
}, []uint64{1}, true),
},
Length: []int{
1,
1,
2,
1,
1,
},
},
}
// fix up composite fields
for _, ar := range results {
for i, f := range ar.Document.Fields {
for _, cf := range ar.Document.CompositeFields {
cf.Compose(f.Name(), ar.Length[i], ar.Analyzed[i])
}
}
}
segment := mem.NewFromAnalyzedDocs(results)
return segment
}

View File

@ -0,0 +1,161 @@
// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bolt
import (
"fmt"
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/couchbaselabs/vellum"
"github.com/couchbaselabs/vellum/regexp"
)
// Dictionary is the bolt representation of the term dictionary
type Dictionary struct {
segment *Segment
field string
fieldID uint16
fst *vellum.FST
}
// PostingsList returns the postings list for the specified term
func (d *Dictionary) PostingsList(term string, except *roaring.Bitmap) (segment.PostingsList, error) {
return d.postingsList(term, except)
}
func (d *Dictionary) postingsList(term string, except *roaring.Bitmap) (*PostingsList, error) {
rv := &PostingsList{
dictionary: d,
term: term,
except: except,
}
if d.fst != nil {
postingsID, exists, err := d.fst.Get([]byte(term))
if err != nil {
return nil, fmt.Errorf("vellum err: %v", err)
}
if exists {
rv.postingsID = postingsID
postingsIDKey := EncodeUvarintAscending(nil, postingsID)
bucket := d.segment.tx.Bucket(postingsBucket)
if bucket == nil {
return nil, fmt.Errorf("postings bucket missing")
}
roaringBytes := bucket.Get(postingsIDKey)
if roaringBytes == nil {
return nil, fmt.Errorf("postings for postingsID %d missing", postingsID)
}
bitmap := roaring.NewBitmap()
_, err = bitmap.FromBuffer(roaringBytes)
if err != nil {
return nil, fmt.Errorf("error loading roaring bitmap: %v", err)
}
rv.postings = bitmap
rv.postingKey = postingsIDKey
}
}
return rv, nil
}
// Iterator returns an iterator for this dictionary
func (d *Dictionary) Iterator() segment.DictionaryIterator {
rv := &DictionaryIterator{
d: d,
}
if d.fst != nil {
itr, err := d.fst.Iterator(nil, nil)
if err == nil {
rv.itr = itr
}
}
return rv
}
// PrefixIterator returns an iterator which only visits terms having the
// the specified prefix
func (d *Dictionary) PrefixIterator(prefix string) segment.DictionaryIterator {
rv := &DictionaryIterator{
d: d,
}
if d.fst != nil {
r, err := regexp.New(prefix + ".*")
if err == nil {
itr, err := d.fst.Search(r, nil, nil)
if err == nil {
rv.itr = itr
}
}
}
return rv
}
// RangeIterator returns an iterator which only visits terms between the
// start and end terms. NOTE: bleve.index API specifies the end is inclusive.
func (d *Dictionary) RangeIterator(start, end string) segment.DictionaryIterator {
rv := &DictionaryIterator{
d: d,
}
// need to increment the end position to be inclusive
endBytes := []byte(end)
if endBytes[len(endBytes)-1] < 0xff {
endBytes[len(endBytes)-1]++
} else {
endBytes = append(endBytes, 0xff)
}
if d.fst != nil {
itr, err := d.fst.Iterator([]byte(start), endBytes)
if err == nil {
rv.itr = itr
}
}
return rv
}
// DictionaryIterator is an iterator for term dictionary
type DictionaryIterator struct {
d *Dictionary
itr vellum.Iterator
err error
}
// Next returns the next entry in the dictionary
func (i *DictionaryIterator) Next() (*index.DictEntry, error) {
if i.err == vellum.ErrIteratorDone {
return nil, nil
} else if i.err != nil {
return nil, i.err
}
term, count := i.itr.Current()
rv := &index.DictEntry{
Term: string(term),
Count: count,
}
i.err = i.itr.Next()
return rv, nil
}

View File

@ -0,0 +1,183 @@
// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bolt
import (
"os"
"reflect"
"testing"
"github.com/blevesearch/bleve/analysis"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
)
func buildMemSegmentForDict() *mem.Segment {
doc := &document.Document{
ID: "a",
Fields: []document.Field{
document.NewTextFieldCustom("_id", nil, []byte("a"), document.IndexField|document.StoreField, nil),
document.NewTextFieldCustom("desc", nil, []byte("apple ball cat dog egg fish bat"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
},
}
// forge analyzed docs
results := []*index.AnalysisResult{
&index.AnalysisResult{
Document: doc,
Analyzed: []analysis.TokenFrequencies{
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 1,
Position: 1,
Term: []byte("a"),
},
}, nil, false),
analysis.TokenFrequency(analysis.TokenStream{
&analysis.Token{
Start: 0,
End: 5,
Position: 1,
Term: []byte("apple"),
},
&analysis.Token{
Start: 6,
End: 10,
Position: 2,
Term: []byte("ball"),
},
&analysis.Token{
Start: 11,
End: 14,
Position: 3,
Term: []byte("cat"),
},
&analysis.Token{
Start: 15,
End: 18,
Position: 4,
Term: []byte("dog"),
},
&analysis.Token{
Start: 19,
End: 22,
Position: 5,
Term: []byte("egg"),
},
&analysis.Token{
Start: 20,
End: 24,
Position: 6,
Term: []byte("fish"),
},
&analysis.Token{
Start: 25,
End: 28,
Position: 7,
Term: []byte("bat"),
},
}, nil, true),
},
Length: []int{
1,
7,
},
},
}
segment := mem.NewFromAnalyzedDocs(results)
return segment
}
func TestDictionary(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.bolt")
memSegment := buildMemSegmentForDict()
err := persistSegment(memSegment, "/tmp/scorch.bolt", 1024)
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}
segment, err := Open("/tmp/scorch.bolt")
if err != nil {
t.Fatalf("error opening segment: %v", err)
}
defer func() {
cerr := segment.Close()
if cerr != nil {
t.Fatalf("error closing segment: %v", err)
}
}()
dict, err := segment.Dictionary("desc")
if err != nil {
t.Fatal(err)
}
// test basic full iterator
expected := []string{"apple", "ball", "bat", "cat", "dog", "egg", "fish"}
var got []string
itr := dict.Iterator()
next, err := itr.Next()
for next != nil && err == nil {
got = append(got, next.Term)
next, err = itr.Next()
}
if err != nil {
t.Fatalf("dict itr error: %v", err)
}
if !reflect.DeepEqual(expected, got) {
t.Errorf("expected: %v, got: %v", expected, got)
}
// test prefix iterator
expected = []string{"ball", "bat"}
got = got[:0]
itr = dict.PrefixIterator("b")
next, err = itr.Next()
for next != nil && err == nil {
got = append(got, next.Term)
next, err = itr.Next()
}
if err != nil {
t.Fatalf("dict itr error: %v", err)
}
if !reflect.DeepEqual(expected, got) {
t.Errorf("expected: %v, got: %v", expected, got)
}
// test range iterator
expected = []string{"cat", "dog", "egg"}
got = got[:0]
itr = dict.RangeIterator("cat", "egg")
next, err = itr.Next()
for next != nil && err == nil {
got = append(got, next.Term)
next, err = itr.Next()
}
if err != nil {
t.Fatalf("dict itr error: %v", err)
}
if !reflect.DeepEqual(expected, got) {
t.Errorf("expected: %v, got: %v", expected, got)
}
}

View File

@ -0,0 +1,94 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
// This code originated from:
// https://github.com/cockroachdb/cockroach/blob/2dd65dde5d90c157f4b93f92502ca1063b904e1d/pkg/util/encoding/encoding.go
// Modified to not use pkg/errors
package bolt
import "fmt"
const (
maxVarintSize = 9
// IntMin is chosen such that the range of int tags does not overlap the
// ascii character set that is frequently used in testing.
IntMin = 0x80 // 128
intMaxWidth = 8
intZero = IntMin + intMaxWidth // 136
intSmall = IntMax - intZero - intMaxWidth // 109
// IntMax is the maximum int tag value.
IntMax = 0xfd // 253
)
// EncodeUvarintAscending encodes the uint64 value using a variable length
// (length-prefixed) representation. The length is encoded as a single
// byte indicating the number of encoded bytes (-8) to follow. See
// EncodeVarintAscending for rationale. The encoded bytes are appended to the
// supplied buffer and the final buffer is returned.
func EncodeUvarintAscending(b []byte, v uint64) []byte {
switch {
case v <= intSmall:
return append(b, intZero+byte(v))
case v <= 0xff:
return append(b, IntMax-7, byte(v))
case v <= 0xffff:
return append(b, IntMax-6, byte(v>>8), byte(v))
case v <= 0xffffff:
return append(b, IntMax-5, byte(v>>16), byte(v>>8), byte(v))
case v <= 0xffffffff:
return append(b, IntMax-4, byte(v>>24), byte(v>>16), byte(v>>8), byte(v))
case v <= 0xffffffffff:
return append(b, IntMax-3, byte(v>>32), byte(v>>24), byte(v>>16), byte(v>>8),
byte(v))
case v <= 0xffffffffffff:
return append(b, IntMax-2, byte(v>>40), byte(v>>32), byte(v>>24), byte(v>>16),
byte(v>>8), byte(v))
case v <= 0xffffffffffffff:
return append(b, IntMax-1, byte(v>>48), byte(v>>40), byte(v>>32), byte(v>>24),
byte(v>>16), byte(v>>8), byte(v))
default:
return append(b, IntMax, byte(v>>56), byte(v>>48), byte(v>>40), byte(v>>32),
byte(v>>24), byte(v>>16), byte(v>>8), byte(v))
}
}
// DecodeUvarintAscending decodes a varint encoded uint64 from the input
// buffer. The remainder of the input buffer and the decoded uint64
// are returned.
func DecodeUvarintAscending(b []byte) ([]byte, uint64, error) {
if len(b) == 0 {
return nil, 0, fmt.Errorf("insufficient bytes to decode uvarint value")
}
length := int(b[0]) - intZero
b = b[1:] // skip length byte
if length <= intSmall {
return b, uint64(length), nil
}
length -= intSmall
if length < 0 || length > 8 {
return nil, 0, fmt.Errorf("invalid uvarint length of %d", length)
} else if len(b) < length {
return nil, 0, fmt.Errorf("insufficient bytes to decode uvarint value: %q", b)
}
var v uint64
// It is faster to range over the elements in a slice than to index
// into the slice on each loop iteration.
for _, t := range b[:length] {
v = (v << 8) | uint64(t)
}
return b[length:], v, nil
}

View File

@ -0,0 +1,96 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
// This code originated from:
// https://github.com/cockroachdb/cockroach/blob/2dd65dde5d90c157f4b93f92502ca1063b904e1d/pkg/util/encoding/encoding_test.go
// Modified to only test the parts we borrowed
package bolt
import (
"bytes"
"math"
"testing"
)
type testCaseUint64 struct {
value uint64
expEnc []byte
}
func TestEncodeDecodeUvarint(t *testing.T) {
testBasicEncodeDecodeUint64(EncodeUvarintAscending, DecodeUvarintAscending, false, t)
testCases := []testCaseUint64{
{0, []byte{0x88}},
{1, []byte{0x89}},
{109, []byte{0xf5}},
{110, []byte{0xf6, 0x6e}},
{1 << 8, []byte{0xf7, 0x01, 0x00}},
{math.MaxUint64, []byte{0xfd, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}},
}
testCustomEncodeUint64(testCases, EncodeUvarintAscending, t)
}
func testBasicEncodeDecodeUint64(
encFunc func([]byte, uint64) []byte,
decFunc func([]byte) ([]byte, uint64, error),
descending bool, t *testing.T,
) {
testCases := []uint64{
0, 1,
1<<8 - 1, 1 << 8,
1<<16 - 1, 1 << 16,
1<<24 - 1, 1 << 24,
1<<32 - 1, 1 << 32,
1<<40 - 1, 1 << 40,
1<<48 - 1, 1 << 48,
1<<56 - 1, 1 << 56,
math.MaxUint64 - 1, math.MaxUint64,
}
var lastEnc []byte
for i, v := range testCases {
enc := encFunc(nil, v)
if i > 0 {
if (descending && bytes.Compare(enc, lastEnc) >= 0) ||
(!descending && bytes.Compare(enc, lastEnc) < 0) {
t.Errorf("ordered constraint violated for %d: [% x] vs. [% x]", v, enc, lastEnc)
}
}
b, decode, err := decFunc(enc)
if err != nil {
t.Error(err)
continue
}
if len(b) != 0 {
t.Errorf("leftover bytes: [% x]", b)
}
if decode != v {
t.Errorf("decode yielded different value than input: %d vs. %d", decode, v)
}
lastEnc = enc
}
}
func testCustomEncodeUint64(
testCases []testCaseUint64, encFunc func([]byte, uint64) []byte, t *testing.T,
) {
for _, test := range testCases {
enc := encFunc(nil, test.value)
if !bytes.Equal(enc, test.expEnc) {
t.Errorf("expected [% x]; got [% x] (value: %d)", test.expEnc, enc, test.value)
}
}
}

View File

@ -0,0 +1,323 @@
// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bolt
import (
"bytes"
"fmt"
"math"
"github.com/RoaringBitmap/roaring"
"github.com/Smerity/govarint"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/boltdb/bolt"
)
// PostingsList is an in-memory represenation of a postings list
type PostingsList struct {
dictionary *Dictionary
term string
postingsID uint64
postings *roaring.Bitmap
except *roaring.Bitmap
postingKey []byte
}
// Iterator returns an iterator for this postings list
func (p *PostingsList) Iterator() segment.PostingsIterator {
rv := &PostingsIterator{
postings: p,
}
if p.postings != nil {
detailsBucket := p.dictionary.segment.tx.Bucket(postingDetailsBucket)
rv.detailBucket = detailsBucket.Bucket(p.postingKey)
rv.all = p.postings.Iterator()
if p.except != nil {
allExcept := p.postings.Clone()
allExcept.AndNot(p.except)
rv.actual = allExcept.Iterator()
} else {
rv.actual = p.postings.Iterator()
}
}
return rv
}
// Count returns the number of items on this postings list
func (p *PostingsList) Count() uint64 {
var rv uint64
if p.postings != nil {
rv = p.postings.GetCardinality()
if p.except != nil {
except := p.except.GetCardinality()
if except > rv {
// avoid underflow
except = rv
}
rv -= except
}
}
return rv
}
// PostingsIterator provides a way to iterate through the postings list
type PostingsIterator struct {
postings *PostingsList
all roaring.IntIterable
offset int
locoffset int
actual roaring.IntIterable
detailBucket *bolt.Bucket
currChunk uint32
currChunkFreqNorm []byte
currChunkLoc []byte
freqNormDecoder *govarint.Base128Decoder
locDecoder *govarint.Base128Decoder
}
func (i *PostingsIterator) loadChunk(chunk int) error {
// load correct chunk bytes
chunkID := EncodeUvarintAscending(nil, uint64(chunk))
chunkBucket := i.detailBucket.Bucket(chunkID)
if chunkBucket == nil {
return fmt.Errorf("chunk %d missing", chunkID)
}
i.currChunkFreqNorm = chunkBucket.Get(freqNormKey)
i.freqNormDecoder = govarint.NewU64Base128Decoder(bytes.NewReader(i.currChunkFreqNorm))
i.currChunkLoc = chunkBucket.Get(locKey)
i.locDecoder = govarint.NewU64Base128Decoder(bytes.NewReader(i.currChunkLoc))
i.currChunk = uint32(chunk)
return nil
}
func (i *PostingsIterator) readFreqNorm() (uint64, uint64, error) {
freq, err := i.freqNormDecoder.GetU64()
if err != nil {
return 0, 0, fmt.Errorf("error reading frequency: %v", err)
}
normBits, err := i.freqNormDecoder.GetU64()
if err != nil {
return 0, 0, fmt.Errorf("error reading norm: %v", err)
}
return freq, normBits, err
}
// readLocation processes all the integers on the stream representing a single
// location. if you care about it, pass in a non-nil location struct, and we
// will fill it. if you don't care about it, pass in nil and we safely consume
// the contents.
func (i *PostingsIterator) readLocation(l *Location) error {
// read off field
fieldID, err := i.locDecoder.GetU64()
if err != nil {
return fmt.Errorf("error reading location field: %v", err)
}
// read off pos
pos, err := i.locDecoder.GetU64()
if err != nil {
return fmt.Errorf("error reading location pos: %v", err)
}
// read off start
start, err := i.locDecoder.GetU64()
if err != nil {
return fmt.Errorf("error reading location start: %v", err)
}
// read off end
end, err := i.locDecoder.GetU64()
if err != nil {
return fmt.Errorf("error reading location end: %v", err)
}
// read off num array pos
numArrayPos, err := i.locDecoder.GetU64()
if err != nil {
return fmt.Errorf("error reading location num array pos: %v", err)
}
// group these together for less branching
if l != nil {
l.field = i.postings.dictionary.segment.fieldsInv[fieldID]
l.pos = pos
l.start = start
l.end = end
if numArrayPos > 0 {
l.ap = make([]uint64, int(numArrayPos))
}
}
// read off array positions
for k := 0; k < int(numArrayPos); k++ {
ap, err := i.locDecoder.GetU64()
if err != nil {
return fmt.Errorf("error reading array position: %v", err)
}
if l != nil {
l.ap[k] = ap
}
}
return nil
}
// Next returns the next posting on the postings list, or nil at the end
func (i *PostingsIterator) Next() (segment.Posting, error) {
if i.actual == nil || !i.actual.HasNext() {
return nil, nil
}
n := i.actual.Next()
nChunk := n / i.postings.dictionary.segment.chunkFactor
allN := i.all.Next()
allNChunk := allN / i.postings.dictionary.segment.chunkFactor
// n is the next actual hit (excluding some postings)
// allN is the next hit in the full postings
// if they don't match, adjust offsets to factor in item we're skipping over
// incr the all iterator, and check again
for allN != n {
// in different chunks, reset offsets
if allNChunk != nChunk {
i.locoffset = 0
i.offset = 0
} else {
if i.currChunk != nChunk || i.currChunkFreqNorm == nil {
err := i.loadChunk(int(nChunk))
if err != nil {
return nil, fmt.Errorf("error loading chunk: %v", err)
}
}
// read off freq/offsets even though we don't care about them
freq, _, err := i.readFreqNorm()
if err != nil {
return nil, err
}
if i.postings.dictionary.segment.fieldsLoc[i.postings.dictionary.fieldID] {
for j := 0; j < int(freq); j++ {
err := i.readLocation(nil)
if err != nil {
return nil, err
}
}
}
// in same chunk, need to account for offsets
i.offset++
}
allN = i.all.Next()
}
if i.currChunk != nChunk || i.currChunkFreqNorm == nil {
err := i.loadChunk(int(nChunk))
if err != nil {
return nil, fmt.Errorf("error loading chunk: %v", err)
}
}
rv := &Posting{
iterator: i,
docNum: uint64(n),
}
var err error
var normBits uint64
rv.freq, normBits, err = i.readFreqNorm()
if err != nil {
return nil, err
}
rv.norm = math.Float32frombits(uint32(normBits))
if i.postings.dictionary.segment.fieldsLoc[i.postings.dictionary.fieldID] {
// read off 'freq' locations
rv.locs = make([]segment.Location, rv.freq)
locs := make([]Location, rv.freq)
for j := 0; j < int(rv.freq); j++ {
err := i.readLocation(&locs[j])
if err != nil {
return nil, err
}
rv.locs[j] = &locs[j]
}
}
return rv, nil
}
// Posting is a single entry in a postings list
type Posting struct {
iterator *PostingsIterator
docNum uint64
freq uint64
norm float32
locs []segment.Location
}
// Number returns the document number of this posting in this segment
func (p *Posting) Number() uint64 {
return p.docNum
}
// Frequency returns the frequence of occurance of this term in this doc/field
func (p *Posting) Frequency() uint64 {
return p.freq
}
// Norm returns the normalization factor for this posting
func (p *Posting) Norm() float64 {
return float64(p.norm)
}
// Locations returns the location information for each occurance
func (p *Posting) Locations() []segment.Location {
return p.locs
}
// Location represents the location of a single occurance
type Location struct {
field string
pos uint64
start uint64
end uint64
ap []uint64
}
// Field returns the name of the field (useful in composite fields to know
// which original field the value came from)
func (l *Location) Field() string {
return l.field
}
// Start returns the start byte offset of this occurance
func (l *Location) Start() uint64 {
return l.start
}
// End returns the end byte offset of this occurance
func (l *Location) End() uint64 {
return l.end
}
// Pos returns the 1-based phrase position of this occurance
func (l *Location) Pos() uint64 {
return l.pos
}
// ArrayPositions returns the array position vector associated with this occurance
func (l *Location) ArrayPositions() []uint64 {
return l.ap
}

View File

@ -0,0 +1,309 @@
// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bolt
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"github.com/RoaringBitmap/roaring"
"github.com/Smerity/govarint"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/boltdb/bolt"
"github.com/couchbaselabs/vellum"
"github.com/golang/snappy"
)
var readOnlyOptions = &bolt.Options{
ReadOnly: true,
}
// _id field is always guaranteed to have fieldID of 0
const idFieldID uint16 = 0
// Open returns a boltdb impl of a segment
func Open(path string) (segment.Segment, error) {
db, err := bolt.Open(path, 0600, readOnlyOptions)
if err != nil {
return nil, err
}
tx, err := db.Begin(false)
if err != nil {
_ = db.Close()
return nil, err
}
rv := &Segment{
db: db,
tx: tx,
fieldsMap: make(map[string]uint16),
}
err = rv.loadConfig()
if err != nil {
_ = db.Close()
return nil, err
}
err = rv.loadFields()
if err != nil {
_ = db.Close()
return nil, err
}
return rv, nil
}
// Segment implements a boltdb based implementation of a segment
type Segment struct {
version uint8
chunkFactor uint32
db *bolt.DB
tx *bolt.Tx
fieldsMap map[string]uint16
fieldsInv []string
fieldsLoc []bool
}
func (s *Segment) loadConfig() (err error) {
bucket := s.tx.Bucket(configBucket)
if bucket == nil {
return fmt.Errorf("config bucket missing")
}
ver := bucket.Get(versionKey)
if ver == nil {
return fmt.Errorf("version key missing")
}
s.version = ver[0]
chunk := bucket.Get(chunkKey)
if chunk == nil {
return fmt.Errorf("chunk key is missing")
}
s.chunkFactor = binary.BigEndian.Uint32(chunk)
return nil
}
// loadFields reads the fields info from the segment so that we never have to go
// back to disk to access this (small and used frequently)
func (s *Segment) loadFields() (err error) {
bucket := s.tx.Bucket(fieldsBucket)
if bucket == nil {
return fmt.Errorf("fields bucket missing")
}
indexLocs := roaring.NewBitmap()
err = bucket.ForEach(func(k []byte, v []byte) error {
// process index locations bitset
if k[0] == indexLocsKey[0] {
_, err2 := indexLocs.FromBuffer(v)
if err2 != nil {
return fmt.Errorf("error loading indexLocs: %v", err2)
}
} else {
_, fieldID, err2 := DecodeUvarintAscending(k)
if err2 != nil {
return err2
}
// we store fieldID+1 in so we can discern the zero value
s.fieldsMap[string(v)] = uint16(fieldID + 1)
}
return nil
})
if err != nil {
return err
}
// now setup the inverse (should have same size as map and be keyed 0-(len-1))
s.fieldsInv = make([]string, len(s.fieldsMap))
for k, v := range s.fieldsMap {
s.fieldsInv[int(v)-1] = k
}
s.fieldsLoc = make([]bool, len(s.fieldsInv))
for i := range s.fieldsInv {
if indexLocs.ContainsInt(i) {
s.fieldsLoc[i] = true
}
}
return nil
}
// Fields returns the field names used in this segment
func (s *Segment) Fields() []string {
return s.fieldsInv
}
// Count returns the number of documents in this segment
// (this has no notion of deleted docs)
func (s *Segment) Count() uint64 {
return uint64(s.tx.Bucket(storedBucket).Stats().BucketN - 1)
}
// Dictionary returns the term dictionary for the specified field
func (s *Segment) Dictionary(field string) (segment.TermDictionary, error) {
return s.dictionary(field)
}
func (s *Segment) dictionary(field string) (*Dictionary, error) {
rv := &Dictionary{
segment: s,
field: field,
}
rv.fieldID = s.fieldsMap[field]
if rv.fieldID > 0 {
rv.fieldID = rv.fieldID - 1
fieldIDKey := EncodeUvarintAscending(nil, uint64(rv.fieldID))
bucket := s.tx.Bucket(dictBucket)
if bucket == nil {
return nil, fmt.Errorf("dictionary bucket missing")
}
fstBytes := bucket.Get(fieldIDKey)
if fstBytes == nil {
return nil, fmt.Errorf("dictionary field %s bytes nil", field)
}
if fstBytes != nil {
fst, err := vellum.Load(fstBytes)
if err != nil {
return nil, fmt.Errorf("dictionary field %s vellum err: %v", field, err)
}
if err == nil {
rv.fst = fst
}
}
}
return rv, nil
}
// VisitDocument invokes the DocFieldValueVistor for each stored field
// for the specified doc number
func (s *Segment) VisitDocument(num uint64, visitor segment.DocumentFieldValueVisitor) error {
storedBuucket := s.tx.Bucket(storedBucket)
if storedBuucket == nil {
return fmt.Errorf("stored bucket missing")
}
docNumKey := EncodeUvarintAscending(nil, num)
docBucket := storedBuucket.Bucket(docNumKey)
if docBucket == nil {
return fmt.Errorf("segment has no doc number %d", num)
}
metaBytes := docBucket.Get(metaKey)
if metaBytes == nil {
return fmt.Errorf("stored meta bytes for doc number %d is nil", num)
}
dataBytes := docBucket.Get(dataKey)
if dataBytes == nil {
return fmt.Errorf("stored data bytes for doc number %d is nil", num)
}
uncompressed, err := snappy.Decode(nil, dataBytes)
if err != nil {
return err
}
reader := bytes.NewReader(metaBytes)
decoder := govarint.NewU64Base128Decoder(reader)
keepGoing := true
for keepGoing {
field, err := decoder.GetU64()
if err == io.EOF {
break
}
if err != nil {
return err
}
typ, err := decoder.GetU64()
if err != nil {
return err
}
offset, err := decoder.GetU64()
if err != nil {
return err
}
l, err := decoder.GetU64()
if err != nil {
return err
}
numap, err := decoder.GetU64()
if err != nil {
return err
}
var arrayPos []uint64
if numap > 0 {
arrayPos = make([]uint64, numap)
for i := 0; i < int(numap); i++ {
ap, err := decoder.GetU64()
if err != nil {
return err
}
arrayPos[i] = ap
}
}
value := uncompressed[offset : offset+l]
keepGoing = visitor(s.fieldsInv[field], byte(typ), value, arrayPos)
}
return nil
}
// DocNumbers returns a bitset corresponding to the doc numbers of all the
// provided _id strings
func (s *Segment) DocNumbers(ids []string) (*roaring.Bitmap, error) {
rv := roaring.New()
if len(s.fieldsMap) > 0 {
idDict, err := s.dictionary("_id")
if err != nil {
return nil, err
}
for _, id := range ids {
postings, err := idDict.postingsList(id, nil)
if err != nil {
return nil, err
}
if postings.postings != nil {
rv.Or(postings.postings)
}
}
}
return rv, nil
}
// Close releases all resources associated with this segment
func (s *Segment) Close() error {
err := s.tx.Rollback()
if err != nil {
_ = s.db.Close()
return err
}
return s.db.Close()
}

View File

@ -0,0 +1,517 @@
// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bolt
import (
"math"
"os"
"reflect"
"testing"
)
func TestOpen(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.bolt")
memSegment := buildMemSegment()
err := persistSegment(memSegment, "/tmp/scorch.bolt", 1024)
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}
segment, err := Open("/tmp/scorch.bolt")
if err != nil {
t.Fatalf("error opening segment: %v", err)
}
defer func() {
cerr := segment.Close()
if cerr != nil {
t.Fatalf("error closing segment: %v", err)
}
}()
expectFields := map[string]struct{}{
"_id": struct{}{},
"_all": struct{}{},
"name": struct{}{},
"desc": struct{}{},
"tag": struct{}{},
}
fields := segment.Fields()
if len(fields) != len(expectFields) {
t.Errorf("expected %d fields, only got %d", len(expectFields), len(fields))
}
for _, field := range fields {
if _, ok := expectFields[field]; !ok {
t.Errorf("got unexpected field: %s", field)
}
}
docCount := segment.Count()
if docCount != 1 {
t.Errorf("expected count 1, got %d", docCount)
}
// check the _id field
dict, err := segment.Dictionary("_id")
if err != nil {
t.Fatal(err)
}
if dict == nil {
t.Fatal("got nil dict, expected non-nil")
}
postingsList, err := dict.PostingsList("a", nil)
if err != nil {
t.Fatal(err)
}
if postingsList == nil {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr := postingsList.Iterator()
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
count := 0
nextPosting, err := postingsItr.Next()
for nextPosting != nil && err == nil {
count++
if nextPosting.Frequency() != 1 {
t.Errorf("expected frequency 1, got %d", nextPosting.Frequency())
}
if nextPosting.Number() != 0 {
t.Errorf("expected doc number 0, got %d", nextPosting.Number())
}
if nextPosting.Norm() != 1.0 {
t.Errorf("expected norm 1.0, got %f", nextPosting.Norm())
}
nextPosting, err = postingsItr.Next()
}
if err != nil {
t.Fatal(err)
}
if count != 1 {
t.Errorf("expected count to be 1, got %d", count)
}
// check the name field
dict, err = segment.Dictionary("name")
if err != nil {
t.Fatal(err)
}
if dict == nil {
t.Fatal("got nil dict, expected non-nil")
}
postingsList, err = dict.PostingsList("wow", nil)
if err != nil {
t.Fatal(err)
}
if postingsList == nil {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr = postingsList.Iterator()
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
count = 0
nextPosting, err = postingsItr.Next()
for nextPosting != nil && err == nil {
count++
if nextPosting.Frequency() != 1 {
t.Errorf("expected frequency 1, got %d", nextPosting.Frequency())
}
if nextPosting.Number() != 0 {
t.Errorf("expected doc number 0, got %d", nextPosting.Number())
}
if nextPosting.Norm() != 1.0 {
t.Errorf("expected norm 1.0, got %f", nextPosting.Norm())
}
var numLocs uint64
for _, loc := range nextPosting.Locations() {
numLocs++
if loc.Field() != "name" {
t.Errorf("expected loc field to be 'name', got '%s'", loc.Field())
}
if loc.Start() != 0 {
t.Errorf("expected loc start to be 0, got %d", loc.Start())
}
if loc.End() != 3 {
t.Errorf("expected loc end to be 3, got %d", loc.End())
}
if loc.Pos() != 1 {
t.Errorf("expected loc pos to be 1, got %d", loc.Pos())
}
if loc.ArrayPositions() != nil {
t.Errorf("expect loc array pos to be nil, got %v", loc.ArrayPositions())
}
}
if numLocs != nextPosting.Frequency() {
t.Errorf("expected %d locations, got %d", nextPosting.Frequency(), numLocs)
}
nextPosting, err = postingsItr.Next()
}
if err != nil {
t.Fatal(err)
}
if count != 1 {
t.Errorf("expected count to be 1, got %d", count)
}
// check the _all field (composite)
dict, err = segment.Dictionary("_all")
if err != nil {
t.Fatal(err)
}
if dict == nil {
t.Fatal("got nil dict, expected non-nil")
}
postingsList, err = dict.PostingsList("wow", nil)
if err != nil {
t.Fatal(err)
}
if postingsList == nil {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr = postingsList.Iterator()
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
count = 0
nextPosting, err = postingsItr.Next()
for nextPosting != nil && err == nil {
count++
if nextPosting.Frequency() != 1 {
t.Errorf("expected frequency 1, got %d", nextPosting.Frequency())
}
if nextPosting.Number() != 0 {
t.Errorf("expected doc number 0, got %d", nextPosting.Number())
}
expectedNorm := float32(1.0 / math.Sqrt(float64(5)))
if nextPosting.Norm() != float64(expectedNorm) {
t.Errorf("expected norm %f, got %f", expectedNorm, nextPosting.Norm())
}
var numLocs uint64
for _, loc := range nextPosting.Locations() {
numLocs++
if loc.Field() != "name" {
t.Errorf("expected loc field to be 'name', got '%s'", loc.Field())
}
if loc.Start() != 0 {
t.Errorf("expected loc start to be 0, got %d", loc.Start())
}
if loc.End() != 3 {
t.Errorf("expected loc end to be 3, got %d", loc.End())
}
if loc.Pos() != 1 {
t.Errorf("expected loc pos to be 1, got %d", loc.Pos())
}
if loc.ArrayPositions() != nil {
t.Errorf("expect loc array pos to be nil, got %v", loc.ArrayPositions())
}
}
if numLocs != nextPosting.Frequency() {
t.Errorf("expected %d locations, got %d", nextPosting.Frequency(), numLocs)
}
nextPosting, err = postingsItr.Next()
}
if err != nil {
t.Fatal(err)
}
if count != 1 {
t.Errorf("expected count to be 1, got %d", count)
}
// now try a field with array positions
dict, err = segment.Dictionary("tag")
if err != nil {
t.Fatal(err)
}
if dict == nil {
t.Fatal("got nil dict, expected non-nil")
}
postingsList, err = dict.PostingsList("dark", nil)
if err != nil {
t.Fatal(err)
}
if postingsList == nil {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr = postingsList.Iterator()
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
nextPosting, err = postingsItr.Next()
for nextPosting != nil && err == nil {
if nextPosting.Frequency() != 1 {
t.Errorf("expected frequency 1, got %d", nextPosting.Frequency())
}
if nextPosting.Number() != 0 {
t.Errorf("expected doc number 0, got %d", nextPosting.Number())
}
var numLocs uint64
for _, loc := range nextPosting.Locations() {
numLocs++
if loc.Field() != "tag" {
t.Errorf("expected loc field to be 'name', got '%s'", loc.Field())
}
if loc.Start() != 0 {
t.Errorf("expected loc start to be 0, got %d", loc.Start())
}
if loc.End() != 4 {
t.Errorf("expected loc end to be 3, got %d", loc.End())
}
if loc.Pos() != 1 {
t.Errorf("expected loc pos to be 1, got %d", loc.Pos())
}
expectArrayPos := []uint64{1}
if !reflect.DeepEqual(loc.ArrayPositions(), expectArrayPos) {
t.Errorf("expect loc array pos to be %v, got %v", expectArrayPos, loc.ArrayPositions())
}
}
if numLocs != nextPosting.Frequency() {
t.Errorf("expected %d locations, got %d", nextPosting.Frequency(), numLocs)
}
nextPosting, err = postingsItr.Next()
}
if err != nil {
t.Fatal(err)
}
// now try and visit a document
var fieldValuesSeen int
err = segment.VisitDocument(0, func(field string, typ byte, value []byte, pos []uint64) bool {
fieldValuesSeen++
return true
})
if err != nil {
t.Fatal(err)
}
if fieldValuesSeen != 5 {
t.Errorf("expected 5 field values, got %d", fieldValuesSeen)
}
}
func TestOpenMulti(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.bolt")
memSegment := buildMemSegmentMulti()
err := persistSegment(memSegment, "/tmp/scorch.bolt", 1024)
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}
segment, err := Open("/tmp/scorch.bolt")
if err != nil {
t.Fatalf("error opening segment: %v", err)
}
defer func() {
cerr := segment.Close()
if cerr != nil {
t.Fatalf("error closing segment: %v", err)
}
}()
if segment.Count() != 2 {
t.Errorf("expected count 2, got %d", segment.Count())
}
// check the desc field
dict, err := segment.Dictionary("desc")
if err != nil {
t.Fatal(err)
}
if dict == nil {
t.Fatal("got nil dict, expected non-nil")
}
postingsList, err := dict.PostingsList("thing", nil)
if err != nil {
t.Fatal(err)
}
if postingsList == nil {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr := postingsList.Iterator()
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
count := 0
nextPosting, err := postingsItr.Next()
for nextPosting != nil && err == nil {
count++
nextPosting, err = postingsItr.Next()
}
if err != nil {
t.Fatal(err)
}
if count != 2 {
t.Errorf("expected count to be 2, got %d", count)
}
// get docnum of a
exclude, err := segment.DocNumbers([]string{"a"})
if err != nil {
t.Fatal(err)
}
// look for term 'thing' excluding doc 'a'
postingsListExcluding, err := dict.PostingsList("thing", exclude)
if err != nil {
t.Fatal(err)
}
if postingsList == nil {
t.Fatal("got nil postings list, expected non-nil")
}
postingsListExcludingCount := postingsListExcluding.Count()
if postingsListExcludingCount != 1 {
t.Errorf("expected count from postings list to be 1, got %d", postingsListExcludingCount)
}
postingsItrExcluding := postingsListExcluding.Iterator()
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
count = 0
nextPosting, err = postingsItrExcluding.Next()
for nextPosting != nil && err == nil {
count++
nextPosting, err = postingsItrExcluding.Next()
}
if err != nil {
t.Fatal(err)
}
if count != 1 {
t.Errorf("expected count to be 1, got %d", count)
}
}
func TestOpenMultiWithTwoChunks(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.bolt")
memSegment := buildMemSegmentMulti()
err := persistSegment(memSegment, "/tmp/scorch.bolt", 1)
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}
segment, err := Open("/tmp/scorch.bolt")
if err != nil {
t.Fatalf("error opening segment: %v", err)
}
defer func() {
cerr := segment.Close()
if cerr != nil {
t.Fatalf("error closing segment: %v", err)
}
}()
if segment.Count() != 2 {
t.Errorf("expected count 2, got %d", segment.Count())
}
// check the desc field
dict, err := segment.Dictionary("desc")
if err != nil {
t.Fatal(err)
}
if dict == nil {
t.Fatal("got nil dict, expected non-nil")
}
postingsList, err := dict.PostingsList("thing", nil)
if err != nil {
t.Fatal(err)
}
if postingsList == nil {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr := postingsList.Iterator()
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
count := 0
nextPosting, err := postingsItr.Next()
for nextPosting != nil && err == nil {
count++
nextPosting, err = postingsItr.Next()
}
if err != nil {
t.Fatal(err)
}
if count != 2 {
t.Errorf("expected count to be 2, got %d", count)
}
// get docnum of a
exclude, err := segment.DocNumbers([]string{"a"})
if err != nil {
t.Fatal(err)
}
// look for term 'thing' excluding doc 'a'
postingsListExcluding, err := dict.PostingsList("thing", exclude)
if err != nil {
t.Fatal(err)
}
if postingsList == nil {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItrExcluding := postingsListExcluding.Iterator()
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
count = 0
nextPosting, err = postingsItrExcluding.Next()
for nextPosting != nil && err == nil {
count++
nextPosting, err = postingsItrExcluding.Next()
}
if err != nil {
t.Fatal(err)
}
if count != 1 {
t.Errorf("expected count to be 1, got %d", count)
}
}