From ece27ef21551f2be790d9fa75a7ce2a798542ea1 Mon Sep 17 00:00:00 2001 From: Marty Schoch Date: Tue, 5 Dec 2017 13:05:12 -0500 Subject: [PATCH] adding initial version of bolt persisted segment --- index/scorch/segment/bolt/build.go | 500 +++++++++++++++++++++ index/scorch/segment/bolt/build_test.go | 288 ++++++++++++ index/scorch/segment/bolt/dict.go | 161 +++++++ index/scorch/segment/bolt/dict_test.go | 183 ++++++++ index/scorch/segment/bolt/int.go | 94 ++++ index/scorch/segment/bolt/int_test.go | 96 ++++ index/scorch/segment/bolt/posting.go | 323 ++++++++++++++ index/scorch/segment/bolt/segment.go | 309 +++++++++++++ index/scorch/segment/bolt/segment_test.go | 517 ++++++++++++++++++++++ 9 files changed, 2471 insertions(+) create mode 100644 index/scorch/segment/bolt/build.go create mode 100644 index/scorch/segment/bolt/build_test.go create mode 100644 index/scorch/segment/bolt/dict.go create mode 100644 index/scorch/segment/bolt/dict_test.go create mode 100644 index/scorch/segment/bolt/int.go create mode 100644 index/scorch/segment/bolt/int_test.go create mode 100644 index/scorch/segment/bolt/posting.go create mode 100644 index/scorch/segment/bolt/segment.go create mode 100644 index/scorch/segment/bolt/segment_test.go diff --git a/index/scorch/segment/bolt/build.go b/index/scorch/segment/bolt/build.go new file mode 100644 index 00000000..6ed5719d --- /dev/null +++ b/index/scorch/segment/bolt/build.go @@ -0,0 +1,500 @@ +// Copyright (c) 2017 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "bytes" + "encoding/binary" + "math" + + "github.com/RoaringBitmap/roaring" + "github.com/Smerity/govarint" + "github.com/blevesearch/bleve/index/scorch/segment/mem" + "github.com/boltdb/bolt" + "github.com/couchbaselabs/vellum" + "github.com/golang/snappy" +) + +var fieldsBucket = []byte{'a'} +var dictBucket = []byte{'b'} +var postingsBucket = []byte{'c'} +var postingDetailsBucket = []byte{'d'} +var storedBucket = []byte{'e'} +var configBucket = []byte{'x'} + +var indexLocsKey = []byte{'l'} + +var freqNormKey = []byte{'a'} +var locKey = []byte{'b'} + +var metaKey = []byte{'a'} +var dataKey = []byte{'b'} + +var chunkKey = []byte{'c'} +var versionKey = []byte{'v'} + +var version = 0 + +func persistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (err error) { + + db, err := bolt.Open(path, 0777, nil) + if err != nil { + return err + } + defer func() { + if cerr := db.Close(); err == nil && cerr != nil { + err = cerr + } + }() + + tx, err := db.Begin(true) + if err != nil { + return err + } + defer func() { + if err == nil { + err = tx.Commit() + } else { + _ = tx.Rollback() + } + }() + + err = persistFields(memSegment, tx) + if err != nil { + return err + } + + err = persistDictionary(memSegment, tx) + if err != nil { + return err + } + + err = persistPostings(memSegment, tx) + if err != nil { + return err + } + + err = persistPostingsDetails(memSegment, tx, chunkFactor) + if err != nil { + return err + } + + err = persistStored(memSegment, tx) + if err != nil { + return err + } + + err = persistConfig(tx, chunkFactor) + if err != nil { + return err + } + + return nil +} + +// persistFields puts the fields as separate k/v pairs in the fields bucket +// makes very little attempt to squeeze a lot of perf because it is expected +// this is usually somewhat small, and when re-opened it will be read once and +// kept on the heap, and not read out of the file subsequently +func persistFields(memSegment *mem.Segment, tx *bolt.Tx) error { + bucket, err := tx.CreateBucket(fieldsBucket) + if err != nil { + return err + } + bucket.FillPercent = 1.0 + + // build/persist a bitset corresponding to the field locs array + indexLocs := roaring.NewBitmap() + for i, indexLoc := range memSegment.FieldsLoc { + if indexLoc { + indexLocs.AddInt(i) + } + } + var indexLocsBuffer bytes.Buffer + _, err = indexLocs.WriteTo(&indexLocsBuffer) + if err != nil { + return err + } + err = bucket.Put(indexLocsKey, indexLocsBuffer.Bytes()) + if err != nil { + return err + } + + // we use special varint which is still guaranteed to sort correctly + fieldBuf := make([]byte, 0, maxVarintSize) + for fieldID, fieldName := range memSegment.FieldsInv { + if fieldID != 0 { + // reset buffer if necessary + fieldBuf = fieldBuf[:0] + } + fieldBuf = EncodeUvarintAscending(fieldBuf, uint64(fieldID)) + err = bucket.Put(fieldBuf, []byte(fieldName)) + if err != nil { + return err + } + } + return nil +} + +func persistDictionary(memSegment *mem.Segment, tx *bolt.Tx) error { + bucket, err := tx.CreateBucket(dictBucket) + if err != nil { + return err + } + bucket.FillPercent = 1.0 + + // TODO consider whether or not there is benefit to building the vellums + // concurrently. While we have to insert them into the bolt in order, + // the (presumably) heavier lifting involved in building the FST could + // be done concurrently. + + fieldBuf := make([]byte, 0, maxVarintSize) + for fieldID, fieldTerms := range memSegment.DictKeys { + if fieldID != 0 { + // reset buffers if necessary + fieldBuf = fieldBuf[:0] + } + // start a new vellum for this field + var buffer bytes.Buffer + builder, err := vellum.New(&buffer, nil) + if err != nil { + return err + } + + dict := memSegment.Dicts[fieldID] + // now walk the dictionary in order of fieldTerms (already sorted) + for i := range fieldTerms { + err = builder.Insert([]byte(fieldTerms[i]), dict[fieldTerms[i]]-1) + if err != nil { + return err + } + } + err = builder.Close() + if err != nil { + return err + } + + // put this FST into bolt + // we use special varint which is still guaranteed to sort correctly + fieldBuf = EncodeUvarintAscending(fieldBuf, uint64(fieldID)) + err = bucket.Put(fieldBuf, buffer.Bytes()) + if err != nil { + return err + } + } + + return nil +} + +func persistPostings(memSegment *mem.Segment, tx *bolt.Tx) error { + bucket, err := tx.CreateBucket(postingsBucket) + if err != nil { + return err + } + bucket.FillPercent = 1.0 + + postingIDBuf := make([]byte, 0, maxVarintSize) + for postingID := range memSegment.Postings { + if postingID != 0 { + // reset buffers if necessary + postingIDBuf = postingIDBuf[:0] + } + postingIDBuf = EncodeUvarintAscending(postingIDBuf, uint64(postingID)) + var postingsBuf bytes.Buffer + _, err := memSegment.Postings[postingID].WriteTo(&postingsBuf) + if err != nil { + return err + } + err = bucket.Put(postingIDBuf, postingsBuf.Bytes()) + if err != nil { + return err + } + } + + return nil +} + +func persistPostingsDetails(memSegment *mem.Segment, tx *bolt.Tx, + chunkFactor uint32) error { + bucket, err := tx.CreateBucket(postingDetailsBucket) + if err != nil { + return err + } + bucket.FillPercent = 1.0 + + postingIDBuf := make([]byte, 0, maxVarintSize) + for postingID := range memSegment.Postings { + if postingID != 0 { + // reset buffers if necessary + postingIDBuf = postingIDBuf[:0] + } + postingIDBuf = EncodeUvarintAscending(postingIDBuf, uint64(postingID)) + + // make bucket for posting details + postingBucket, err := bucket.CreateBucket(postingIDBuf) + if err != nil { + return err + } + postingBucket.FillPercent = 1.0 + + err = persistPostingDetails(memSegment, postingBucket, postingID, chunkFactor) + if err != nil { + return err + } + } + + return nil +} + +func persistPostingDetails(memSegment *mem.Segment, postingBucket *bolt.Bucket, + postingID int, chunkFactor uint32) error { + // walk the postings list + var err error + var chunkBucket *bolt.Bucket + var currChunk uint32 + chunkIDBuf := make([]byte, 0, maxVarintSize) + postingsListItr := memSegment.Postings[postingID].Iterator() + var encoder *govarint.Base128Encoder + var locEncoder *govarint.Base128Encoder + + encodingBuf := &bytes.Buffer{} + locEncodingBuf := &bytes.Buffer{} + + var offset int + var locOffset int + for postingsListItr.HasNext() { + docNum := postingsListItr.Next() + chunk := docNum / chunkFactor + + // create new chunk bucket if necessary + if chunkBucket == nil || currChunk != chunk { + + // close out last chunk + if chunkBucket != nil { + + // fix me write freq/norms + encoder.Close() + err = chunkBucket.Put(freqNormKey, encodingBuf.Bytes()) + if err != nil { + return err + } + locEncoder.Close() + err = chunkBucket.Put(locKey, locEncodingBuf.Bytes()) + if err != nil { + return err + } + + // reset for next + chunkIDBuf = chunkIDBuf[:0] + encodingBuf = &bytes.Buffer{} + locEncodingBuf = &bytes.Buffer{} + } + + // prepare next chunk + chunkIDBuf = EncodeUvarintAscending(chunkIDBuf, uint64(chunk)) + chunkBucket, err = postingBucket.CreateBucket(chunkIDBuf) + if err != nil { + return err + } + chunkBucket.FillPercent = 1.0 + currChunk = chunk + + encoder = govarint.NewU64Base128Encoder(encodingBuf) + locEncoder = govarint.NewU64Base128Encoder(locEncodingBuf) + } + + // put freq + _, err = encoder.PutU64(memSegment.Freqs[postingID][offset]) + if err != nil { + return err + } + + // put norm + norm := memSegment.Norms[postingID][offset] + normBits := math.Float32bits(norm) + _, err = encoder.PutU32(normBits) + if err != nil { + return err + } + + // put locations + + for i := 0; i < int(memSegment.Freqs[postingID][offset]); i++ { + + if len(memSegment.Locfields[postingID]) > 0 { + // put field + _, err = locEncoder.PutU64(uint64(memSegment.Locfields[postingID][locOffset])) + if err != nil { + return err + } + + // put pos + _, err = locEncoder.PutU64(memSegment.Locpos[postingID][locOffset]) + if err != nil { + return err + } + + // put start + _, err = locEncoder.PutU64(memSegment.Locstarts[postingID][locOffset]) + if err != nil { + return err + } + + // put end + _, err = locEncoder.PutU64(memSegment.Locends[postingID][locOffset]) + if err != nil { + return err + } + + // put array positions + num := len(memSegment.Locarraypos[postingID][locOffset]) + + // put the number of array positions to follow + _, err = locEncoder.PutU64(uint64(num)) + if err != nil { + return err + } + + // put each array position + for j := 0; j < num; j++ { + _, err = locEncoder.PutU64(memSegment.Locarraypos[postingID][locOffset][j]) + if err != nil { + return err + } + } + } + + locOffset++ + } + + offset++ + } + + // close out last chunk + + if chunkBucket != nil { + // fix me write freq/norms + encoder.Close() + err = chunkBucket.Put(freqNormKey, encodingBuf.Bytes()) + if err != nil { + return err + } + locEncoder.Close() + err = chunkBucket.Put(locKey, locEncodingBuf.Bytes()) + if err != nil { + return err + } + } + + return nil +} + +func persistStored(memSegment *mem.Segment, tx *bolt.Tx) error { + bucket, err := tx.CreateBucket(storedBucket) + if err != nil { + return err + } + bucket.FillPercent = 1.0 + + var curr int + // we use special varint which is still guaranteed to sort correctly + docNumBuf := make([]byte, 0, maxVarintSize) + for docNum, storedValues := range memSegment.Stored { + var metaBuf bytes.Buffer + var data, compressed []byte + if docNum != 0 { + // reset buffer if necessary + docNumBuf = docNumBuf[:0] + curr = 0 + } + // create doc sub-bucket + docNumBuf = EncodeUvarintAscending(docNumBuf, uint64(docNum)) + docBucket, err := bucket.CreateBucket(docNumBuf) + if err != nil { + return err + } + docBucket.FillPercent = 1.0 + + metaEncoder := govarint.NewU64Base128Encoder(&metaBuf) + + // encode fields in order + for fieldID := range memSegment.FieldsInv { + if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok { + // has stored values for this field + num := len(storedFieldValues) + + // process each value + for i := 0; i < num; i++ { + // encode field + metaEncoder.PutU64(uint64(fieldID)) + // encode type + metaEncoder.PutU64(uint64(memSegment.StoredTypes[docNum][uint16(fieldID)][i])) + // encode start offset + metaEncoder.PutU64(uint64(curr)) + // end len + metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) + // encode number of array pos + metaEncoder.PutU64(uint64(len(memSegment.StoredPos[docNum][uint16(fieldID)][i]))) + // encode all array positions + for j := 0; j < len(memSegment.StoredPos[docNum][uint16(fieldID)][i]); j++ { + metaEncoder.PutU64(memSegment.StoredPos[docNum][uint16(fieldID)][i][j]) + } + // append data + data = append(data, storedFieldValues[i]...) + // update curr + curr += len(storedFieldValues[i]) + } + } + } + metaEncoder.Close() + + err = docBucket.Put(metaKey, metaBuf.Bytes()) + if err != nil { + return err + } + + // compress data + compressed = snappy.Encode(compressed, data) + + err = docBucket.Put(dataKey, compressed) + if err != nil { + return err + } + + } + + return nil +} + +func persistConfig(tx *bolt.Tx, chunkFactor uint32) error { + bucket, err := tx.CreateBucket(configBucket) + if err != nil { + return err + } + + chunkVal := make([]byte, 4) + binary.BigEndian.PutUint32(chunkVal, chunkFactor) + err = bucket.Put(chunkKey, chunkVal) + if err != nil { + return err + } + + err = bucket.Put(versionKey, []byte{byte(version)}) + if err != nil { + return err + } + + return nil +} diff --git a/index/scorch/segment/bolt/build_test.go b/index/scorch/segment/bolt/build_test.go new file mode 100644 index 00000000..d4b93f15 --- /dev/null +++ b/index/scorch/segment/bolt/build_test.go @@ -0,0 +1,288 @@ +// Copyright (c) 2017 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "os" + "testing" + + "github.com/blevesearch/bleve/analysis" + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/index" + "github.com/blevesearch/bleve/index/scorch/segment/mem" +) + +func TestBuild(t *testing.T) { + os.RemoveAll("/tmp/scorch.bolt") + + memSegment := buildMemSegment() + err := persistSegment(memSegment, "/tmp/scorch.bolt", 1024) + if err != nil { + t.Fatal(err) + } +} + +func buildMemSegment() *mem.Segment { + doc := &document.Document{ + ID: "a", + Fields: []document.Field{ + document.NewTextFieldCustom("_id", nil, []byte("a"), document.IndexField|document.StoreField, nil), + document.NewTextFieldCustom("name", nil, []byte("wow"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("desc", nil, []byte("some thing"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("tag", []uint64{0}, []byte("cold"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("tag", []uint64{1}, []byte("dark"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + }, + CompositeFields: []*document.CompositeField{ + document.NewCompositeField("_all", true, nil, []string{"_id"}), + }, + } + + // forge analyzed docs + results := []*index.AnalysisResult{ + &index.AnalysisResult{ + Document: doc, + Analyzed: []analysis.TokenFrequencies{ + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 1, + Position: 1, + Term: []byte("a"), + }, + }, nil, false), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 3, + Position: 1, + Term: []byte("wow"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("some"), + }, + &analysis.Token{ + Start: 5, + End: 10, + Position: 2, + Term: []byte("thing"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("cold"), + }, + }, []uint64{0}, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("dark"), + }, + }, []uint64{1}, true), + }, + Length: []int{ + 1, + 1, + 2, + 1, + 1, + }, + }, + } + + // fix up composite fields + for _, ar := range results { + for i, f := range ar.Document.Fields { + for _, cf := range ar.Document.CompositeFields { + cf.Compose(f.Name(), ar.Length[i], ar.Analyzed[i]) + } + } + } + + return mem.NewFromAnalyzedDocs(results) +} + +func buildMemSegmentMulti() *mem.Segment { + + doc := &document.Document{ + ID: "a", + Fields: []document.Field{ + document.NewTextFieldCustom("_id", nil, []byte("a"), document.IndexField|document.StoreField, nil), + document.NewTextFieldCustom("name", nil, []byte("wow"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("desc", nil, []byte("some thing"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("tag", []uint64{0}, []byte("cold"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("tag", []uint64{1}, []byte("dark"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + }, + CompositeFields: []*document.CompositeField{ + document.NewCompositeField("_all", true, nil, []string{"_id"}), + }, + } + + doc2 := &document.Document{ + ID: "b", + Fields: []document.Field{ + document.NewTextFieldCustom("_id", nil, []byte("b"), document.IndexField|document.StoreField, nil), + document.NewTextFieldCustom("name", nil, []byte("who"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("desc", nil, []byte("some thing"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("tag", []uint64{0}, []byte("cold"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("tag", []uint64{1}, []byte("dark"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + }, + CompositeFields: []*document.CompositeField{ + document.NewCompositeField("_all", true, nil, []string{"_id"}), + }, + } + + // forge analyzed docs + results := []*index.AnalysisResult{ + &index.AnalysisResult{ + Document: doc, + Analyzed: []analysis.TokenFrequencies{ + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 1, + Position: 1, + Term: []byte("a"), + }, + }, nil, false), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 3, + Position: 1, + Term: []byte("wow"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("some"), + }, + &analysis.Token{ + Start: 5, + End: 10, + Position: 2, + Term: []byte("thing"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("cold"), + }, + }, []uint64{0}, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("dark"), + }, + }, []uint64{1}, true), + }, + Length: []int{ + 1, + 1, + 2, + 1, + 1, + }, + }, + &index.AnalysisResult{ + Document: doc2, + Analyzed: []analysis.TokenFrequencies{ + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 1, + Position: 1, + Term: []byte("b"), + }, + }, nil, false), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 3, + Position: 1, + Term: []byte("who"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("some"), + }, + &analysis.Token{ + Start: 5, + End: 10, + Position: 2, + Term: []byte("thing"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("cold"), + }, + }, []uint64{0}, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("dark"), + }, + }, []uint64{1}, true), + }, + Length: []int{ + 1, + 1, + 2, + 1, + 1, + }, + }, + } + + // fix up composite fields + for _, ar := range results { + for i, f := range ar.Document.Fields { + for _, cf := range ar.Document.CompositeFields { + cf.Compose(f.Name(), ar.Length[i], ar.Analyzed[i]) + } + } + } + + segment := mem.NewFromAnalyzedDocs(results) + + return segment +} diff --git a/index/scorch/segment/bolt/dict.go b/index/scorch/segment/bolt/dict.go new file mode 100644 index 00000000..0d7ab5ec --- /dev/null +++ b/index/scorch/segment/bolt/dict.go @@ -0,0 +1,161 @@ +// Copyright (c) 2017 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "fmt" + + "github.com/RoaringBitmap/roaring" + "github.com/blevesearch/bleve/index" + "github.com/blevesearch/bleve/index/scorch/segment" + "github.com/couchbaselabs/vellum" + "github.com/couchbaselabs/vellum/regexp" +) + +// Dictionary is the bolt representation of the term dictionary +type Dictionary struct { + segment *Segment + field string + fieldID uint16 + fst *vellum.FST +} + +// PostingsList returns the postings list for the specified term +func (d *Dictionary) PostingsList(term string, except *roaring.Bitmap) (segment.PostingsList, error) { + return d.postingsList(term, except) +} + +func (d *Dictionary) postingsList(term string, except *roaring.Bitmap) (*PostingsList, error) { + rv := &PostingsList{ + dictionary: d, + term: term, + except: except, + } + + if d.fst != nil { + postingsID, exists, err := d.fst.Get([]byte(term)) + if err != nil { + return nil, fmt.Errorf("vellum err: %v", err) + } + if exists { + rv.postingsID = postingsID + postingsIDKey := EncodeUvarintAscending(nil, postingsID) + bucket := d.segment.tx.Bucket(postingsBucket) + if bucket == nil { + return nil, fmt.Errorf("postings bucket missing") + } + + roaringBytes := bucket.Get(postingsIDKey) + if roaringBytes == nil { + return nil, fmt.Errorf("postings for postingsID %d missing", postingsID) + } + bitmap := roaring.NewBitmap() + _, err = bitmap.FromBuffer(roaringBytes) + if err != nil { + return nil, fmt.Errorf("error loading roaring bitmap: %v", err) + } + + rv.postings = bitmap + rv.postingKey = postingsIDKey + } + } + + return rv, nil +} + +// Iterator returns an iterator for this dictionary +func (d *Dictionary) Iterator() segment.DictionaryIterator { + + rv := &DictionaryIterator{ + d: d, + } + + if d.fst != nil { + itr, err := d.fst.Iterator(nil, nil) + if err == nil { + rv.itr = itr + } + } + + return rv +} + +// PrefixIterator returns an iterator which only visits terms having the +// the specified prefix +func (d *Dictionary) PrefixIterator(prefix string) segment.DictionaryIterator { + rv := &DictionaryIterator{ + d: d, + } + + if d.fst != nil { + r, err := regexp.New(prefix + ".*") + if err == nil { + itr, err := d.fst.Search(r, nil, nil) + if err == nil { + rv.itr = itr + } + } + } + + return rv +} + +// RangeIterator returns an iterator which only visits terms between the +// start and end terms. NOTE: bleve.index API specifies the end is inclusive. +func (d *Dictionary) RangeIterator(start, end string) segment.DictionaryIterator { + rv := &DictionaryIterator{ + d: d, + } + + // need to increment the end position to be inclusive + endBytes := []byte(end) + if endBytes[len(endBytes)-1] < 0xff { + endBytes[len(endBytes)-1]++ + } else { + endBytes = append(endBytes, 0xff) + } + + if d.fst != nil { + itr, err := d.fst.Iterator([]byte(start), endBytes) + if err == nil { + rv.itr = itr + } + } + + return rv +} + +// DictionaryIterator is an iterator for term dictionary +type DictionaryIterator struct { + d *Dictionary + itr vellum.Iterator + err error +} + +// Next returns the next entry in the dictionary +func (i *DictionaryIterator) Next() (*index.DictEntry, error) { + if i.err == vellum.ErrIteratorDone { + return nil, nil + } else if i.err != nil { + return nil, i.err + } + term, count := i.itr.Current() + rv := &index.DictEntry{ + Term: string(term), + Count: count, + } + i.err = i.itr.Next() + return rv, nil +} diff --git a/index/scorch/segment/bolt/dict_test.go b/index/scorch/segment/bolt/dict_test.go new file mode 100644 index 00000000..6b3926a8 --- /dev/null +++ b/index/scorch/segment/bolt/dict_test.go @@ -0,0 +1,183 @@ +// Copyright (c) 2017 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "os" + "reflect" + "testing" + + "github.com/blevesearch/bleve/analysis" + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/index" + "github.com/blevesearch/bleve/index/scorch/segment/mem" +) + +func buildMemSegmentForDict() *mem.Segment { + doc := &document.Document{ + ID: "a", + Fields: []document.Field{ + document.NewTextFieldCustom("_id", nil, []byte("a"), document.IndexField|document.StoreField, nil), + document.NewTextFieldCustom("desc", nil, []byte("apple ball cat dog egg fish bat"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + }, + } + + // forge analyzed docs + results := []*index.AnalysisResult{ + &index.AnalysisResult{ + Document: doc, + Analyzed: []analysis.TokenFrequencies{ + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 1, + Position: 1, + Term: []byte("a"), + }, + }, nil, false), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 5, + Position: 1, + Term: []byte("apple"), + }, + &analysis.Token{ + Start: 6, + End: 10, + Position: 2, + Term: []byte("ball"), + }, + &analysis.Token{ + Start: 11, + End: 14, + Position: 3, + Term: []byte("cat"), + }, + &analysis.Token{ + Start: 15, + End: 18, + Position: 4, + Term: []byte("dog"), + }, + &analysis.Token{ + Start: 19, + End: 22, + Position: 5, + Term: []byte("egg"), + }, + &analysis.Token{ + Start: 20, + End: 24, + Position: 6, + Term: []byte("fish"), + }, + &analysis.Token{ + Start: 25, + End: 28, + Position: 7, + Term: []byte("bat"), + }, + }, nil, true), + }, + Length: []int{ + 1, + 7, + }, + }, + } + + segment := mem.NewFromAnalyzedDocs(results) + + return segment +} + +func TestDictionary(t *testing.T) { + + _ = os.RemoveAll("/tmp/scorch.bolt") + + memSegment := buildMemSegmentForDict() + err := persistSegment(memSegment, "/tmp/scorch.bolt", 1024) + if err != nil { + t.Fatalf("error persisting segment: %v", err) + } + + segment, err := Open("/tmp/scorch.bolt") + if err != nil { + t.Fatalf("error opening segment: %v", err) + } + defer func() { + cerr := segment.Close() + if cerr != nil { + t.Fatalf("error closing segment: %v", err) + } + }() + + dict, err := segment.Dictionary("desc") + if err != nil { + t.Fatal(err) + } + + // test basic full iterator + expected := []string{"apple", "ball", "bat", "cat", "dog", "egg", "fish"} + var got []string + itr := dict.Iterator() + next, err := itr.Next() + for next != nil && err == nil { + got = append(got, next.Term) + next, err = itr.Next() + } + if err != nil { + t.Fatalf("dict itr error: %v", err) + } + + if !reflect.DeepEqual(expected, got) { + t.Errorf("expected: %v, got: %v", expected, got) + } + + // test prefix iterator + expected = []string{"ball", "bat"} + got = got[:0] + itr = dict.PrefixIterator("b") + next, err = itr.Next() + for next != nil && err == nil { + got = append(got, next.Term) + next, err = itr.Next() + } + if err != nil { + t.Fatalf("dict itr error: %v", err) + } + + if !reflect.DeepEqual(expected, got) { + t.Errorf("expected: %v, got: %v", expected, got) + } + + // test range iterator + expected = []string{"cat", "dog", "egg"} + got = got[:0] + itr = dict.RangeIterator("cat", "egg") + next, err = itr.Next() + for next != nil && err == nil { + got = append(got, next.Term) + next, err = itr.Next() + } + if err != nil { + t.Fatalf("dict itr error: %v", err) + } + + if !reflect.DeepEqual(expected, got) { + t.Errorf("expected: %v, got: %v", expected, got) + } +} diff --git a/index/scorch/segment/bolt/int.go b/index/scorch/segment/bolt/int.go new file mode 100644 index 00000000..a4af3a7a --- /dev/null +++ b/index/scorch/segment/bolt/int.go @@ -0,0 +1,94 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +// This code originated from: +// https://github.com/cockroachdb/cockroach/blob/2dd65dde5d90c157f4b93f92502ca1063b904e1d/pkg/util/encoding/encoding.go + +// Modified to not use pkg/errors + +package bolt + +import "fmt" + +const ( + maxVarintSize = 9 + + // IntMin is chosen such that the range of int tags does not overlap the + // ascii character set that is frequently used in testing. + IntMin = 0x80 // 128 + intMaxWidth = 8 + intZero = IntMin + intMaxWidth // 136 + intSmall = IntMax - intZero - intMaxWidth // 109 + // IntMax is the maximum int tag value. + IntMax = 0xfd // 253 +) + +// EncodeUvarintAscending encodes the uint64 value using a variable length +// (length-prefixed) representation. The length is encoded as a single +// byte indicating the number of encoded bytes (-8) to follow. See +// EncodeVarintAscending for rationale. The encoded bytes are appended to the +// supplied buffer and the final buffer is returned. +func EncodeUvarintAscending(b []byte, v uint64) []byte { + switch { + case v <= intSmall: + return append(b, intZero+byte(v)) + case v <= 0xff: + return append(b, IntMax-7, byte(v)) + case v <= 0xffff: + return append(b, IntMax-6, byte(v>>8), byte(v)) + case v <= 0xffffff: + return append(b, IntMax-5, byte(v>>16), byte(v>>8), byte(v)) + case v <= 0xffffffff: + return append(b, IntMax-4, byte(v>>24), byte(v>>16), byte(v>>8), byte(v)) + case v <= 0xffffffffff: + return append(b, IntMax-3, byte(v>>32), byte(v>>24), byte(v>>16), byte(v>>8), + byte(v)) + case v <= 0xffffffffffff: + return append(b, IntMax-2, byte(v>>40), byte(v>>32), byte(v>>24), byte(v>>16), + byte(v>>8), byte(v)) + case v <= 0xffffffffffffff: + return append(b, IntMax-1, byte(v>>48), byte(v>>40), byte(v>>32), byte(v>>24), + byte(v>>16), byte(v>>8), byte(v)) + default: + return append(b, IntMax, byte(v>>56), byte(v>>48), byte(v>>40), byte(v>>32), + byte(v>>24), byte(v>>16), byte(v>>8), byte(v)) + } +} + +// DecodeUvarintAscending decodes a varint encoded uint64 from the input +// buffer. The remainder of the input buffer and the decoded uint64 +// are returned. +func DecodeUvarintAscending(b []byte) ([]byte, uint64, error) { + if len(b) == 0 { + return nil, 0, fmt.Errorf("insufficient bytes to decode uvarint value") + } + length := int(b[0]) - intZero + b = b[1:] // skip length byte + if length <= intSmall { + return b, uint64(length), nil + } + length -= intSmall + if length < 0 || length > 8 { + return nil, 0, fmt.Errorf("invalid uvarint length of %d", length) + } else if len(b) < length { + return nil, 0, fmt.Errorf("insufficient bytes to decode uvarint value: %q", b) + } + var v uint64 + // It is faster to range over the elements in a slice than to index + // into the slice on each loop iteration. + for _, t := range b[:length] { + v = (v << 8) | uint64(t) + } + return b[length:], v, nil +} diff --git a/index/scorch/segment/bolt/int_test.go b/index/scorch/segment/bolt/int_test.go new file mode 100644 index 00000000..e59918c8 --- /dev/null +++ b/index/scorch/segment/bolt/int_test.go @@ -0,0 +1,96 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +// This code originated from: +// https://github.com/cockroachdb/cockroach/blob/2dd65dde5d90c157f4b93f92502ca1063b904e1d/pkg/util/encoding/encoding_test.go + +// Modified to only test the parts we borrowed + +package bolt + +import ( + "bytes" + "math" + "testing" +) + +type testCaseUint64 struct { + value uint64 + expEnc []byte +} + +func TestEncodeDecodeUvarint(t *testing.T) { + testBasicEncodeDecodeUint64(EncodeUvarintAscending, DecodeUvarintAscending, false, t) + testCases := []testCaseUint64{ + {0, []byte{0x88}}, + {1, []byte{0x89}}, + {109, []byte{0xf5}}, + {110, []byte{0xf6, 0x6e}}, + {1 << 8, []byte{0xf7, 0x01, 0x00}}, + {math.MaxUint64, []byte{0xfd, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}}, + } + testCustomEncodeUint64(testCases, EncodeUvarintAscending, t) +} + +func testBasicEncodeDecodeUint64( + encFunc func([]byte, uint64) []byte, + decFunc func([]byte) ([]byte, uint64, error), + descending bool, t *testing.T, +) { + testCases := []uint64{ + 0, 1, + 1<<8 - 1, 1 << 8, + 1<<16 - 1, 1 << 16, + 1<<24 - 1, 1 << 24, + 1<<32 - 1, 1 << 32, + 1<<40 - 1, 1 << 40, + 1<<48 - 1, 1 << 48, + 1<<56 - 1, 1 << 56, + math.MaxUint64 - 1, math.MaxUint64, + } + + var lastEnc []byte + for i, v := range testCases { + enc := encFunc(nil, v) + if i > 0 { + if (descending && bytes.Compare(enc, lastEnc) >= 0) || + (!descending && bytes.Compare(enc, lastEnc) < 0) { + t.Errorf("ordered constraint violated for %d: [% x] vs. [% x]", v, enc, lastEnc) + } + } + b, decode, err := decFunc(enc) + if err != nil { + t.Error(err) + continue + } + if len(b) != 0 { + t.Errorf("leftover bytes: [% x]", b) + } + if decode != v { + t.Errorf("decode yielded different value than input: %d vs. %d", decode, v) + } + lastEnc = enc + } +} + +func testCustomEncodeUint64( + testCases []testCaseUint64, encFunc func([]byte, uint64) []byte, t *testing.T, +) { + for _, test := range testCases { + enc := encFunc(nil, test.value) + if !bytes.Equal(enc, test.expEnc) { + t.Errorf("expected [% x]; got [% x] (value: %d)", test.expEnc, enc, test.value) + } + } +} diff --git a/index/scorch/segment/bolt/posting.go b/index/scorch/segment/bolt/posting.go new file mode 100644 index 00000000..e5d6c893 --- /dev/null +++ b/index/scorch/segment/bolt/posting.go @@ -0,0 +1,323 @@ +// Copyright (c) 2017 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "bytes" + "fmt" + "math" + + "github.com/RoaringBitmap/roaring" + "github.com/Smerity/govarint" + "github.com/blevesearch/bleve/index/scorch/segment" + "github.com/boltdb/bolt" +) + +// PostingsList is an in-memory represenation of a postings list +type PostingsList struct { + dictionary *Dictionary + term string + postingsID uint64 + postings *roaring.Bitmap + except *roaring.Bitmap + postingKey []byte +} + +// Iterator returns an iterator for this postings list +func (p *PostingsList) Iterator() segment.PostingsIterator { + rv := &PostingsIterator{ + postings: p, + } + if p.postings != nil { + detailsBucket := p.dictionary.segment.tx.Bucket(postingDetailsBucket) + rv.detailBucket = detailsBucket.Bucket(p.postingKey) + rv.all = p.postings.Iterator() + if p.except != nil { + allExcept := p.postings.Clone() + allExcept.AndNot(p.except) + rv.actual = allExcept.Iterator() + } else { + rv.actual = p.postings.Iterator() + } + } + + return rv +} + +// Count returns the number of items on this postings list +func (p *PostingsList) Count() uint64 { + var rv uint64 + if p.postings != nil { + rv = p.postings.GetCardinality() + if p.except != nil { + except := p.except.GetCardinality() + if except > rv { + // avoid underflow + except = rv + } + rv -= except + } + } + return rv +} + +// PostingsIterator provides a way to iterate through the postings list +type PostingsIterator struct { + postings *PostingsList + all roaring.IntIterable + offset int + locoffset int + actual roaring.IntIterable + detailBucket *bolt.Bucket + + currChunk uint32 + currChunkFreqNorm []byte + currChunkLoc []byte + freqNormDecoder *govarint.Base128Decoder + locDecoder *govarint.Base128Decoder +} + +func (i *PostingsIterator) loadChunk(chunk int) error { + // load correct chunk bytes + chunkID := EncodeUvarintAscending(nil, uint64(chunk)) + chunkBucket := i.detailBucket.Bucket(chunkID) + if chunkBucket == nil { + return fmt.Errorf("chunk %d missing", chunkID) + } + i.currChunkFreqNorm = chunkBucket.Get(freqNormKey) + i.freqNormDecoder = govarint.NewU64Base128Decoder(bytes.NewReader(i.currChunkFreqNorm)) + i.currChunkLoc = chunkBucket.Get(locKey) + i.locDecoder = govarint.NewU64Base128Decoder(bytes.NewReader(i.currChunkLoc)) + i.currChunk = uint32(chunk) + return nil +} + +func (i *PostingsIterator) readFreqNorm() (uint64, uint64, error) { + freq, err := i.freqNormDecoder.GetU64() + if err != nil { + return 0, 0, fmt.Errorf("error reading frequency: %v", err) + } + normBits, err := i.freqNormDecoder.GetU64() + if err != nil { + return 0, 0, fmt.Errorf("error reading norm: %v", err) + } + return freq, normBits, err +} + +// readLocation processes all the integers on the stream representing a single +// location. if you care about it, pass in a non-nil location struct, and we +// will fill it. if you don't care about it, pass in nil and we safely consume +// the contents. +func (i *PostingsIterator) readLocation(l *Location) error { + // read off field + fieldID, err := i.locDecoder.GetU64() + if err != nil { + return fmt.Errorf("error reading location field: %v", err) + } + // read off pos + pos, err := i.locDecoder.GetU64() + if err != nil { + return fmt.Errorf("error reading location pos: %v", err) + } + // read off start + start, err := i.locDecoder.GetU64() + if err != nil { + return fmt.Errorf("error reading location start: %v", err) + } + // read off end + end, err := i.locDecoder.GetU64() + if err != nil { + return fmt.Errorf("error reading location end: %v", err) + } + // read off num array pos + numArrayPos, err := i.locDecoder.GetU64() + if err != nil { + return fmt.Errorf("error reading location num array pos: %v", err) + } + + // group these together for less branching + if l != nil { + l.field = i.postings.dictionary.segment.fieldsInv[fieldID] + l.pos = pos + l.start = start + l.end = end + if numArrayPos > 0 { + l.ap = make([]uint64, int(numArrayPos)) + } + } + + // read off array positions + for k := 0; k < int(numArrayPos); k++ { + ap, err := i.locDecoder.GetU64() + if err != nil { + return fmt.Errorf("error reading array position: %v", err) + } + if l != nil { + l.ap[k] = ap + } + } + + return nil +} + +// Next returns the next posting on the postings list, or nil at the end +func (i *PostingsIterator) Next() (segment.Posting, error) { + if i.actual == nil || !i.actual.HasNext() { + return nil, nil + } + n := i.actual.Next() + nChunk := n / i.postings.dictionary.segment.chunkFactor + allN := i.all.Next() + allNChunk := allN / i.postings.dictionary.segment.chunkFactor + + // n is the next actual hit (excluding some postings) + // allN is the next hit in the full postings + // if they don't match, adjust offsets to factor in item we're skipping over + // incr the all iterator, and check again + for allN != n { + + // in different chunks, reset offsets + if allNChunk != nChunk { + i.locoffset = 0 + i.offset = 0 + } else { + + if i.currChunk != nChunk || i.currChunkFreqNorm == nil { + err := i.loadChunk(int(nChunk)) + if err != nil { + return nil, fmt.Errorf("error loading chunk: %v", err) + } + } + + // read off freq/offsets even though we don't care about them + freq, _, err := i.readFreqNorm() + if err != nil { + return nil, err + } + if i.postings.dictionary.segment.fieldsLoc[i.postings.dictionary.fieldID] { + for j := 0; j < int(freq); j++ { + err := i.readLocation(nil) + if err != nil { + return nil, err + } + } + } + + // in same chunk, need to account for offsets + i.offset++ + } + + allN = i.all.Next() + } + + if i.currChunk != nChunk || i.currChunkFreqNorm == nil { + err := i.loadChunk(int(nChunk)) + if err != nil { + return nil, fmt.Errorf("error loading chunk: %v", err) + } + } + + rv := &Posting{ + iterator: i, + docNum: uint64(n), + } + + var err error + var normBits uint64 + rv.freq, normBits, err = i.readFreqNorm() + if err != nil { + return nil, err + } + rv.norm = math.Float32frombits(uint32(normBits)) + if i.postings.dictionary.segment.fieldsLoc[i.postings.dictionary.fieldID] { + // read off 'freq' locations + rv.locs = make([]segment.Location, rv.freq) + locs := make([]Location, rv.freq) + for j := 0; j < int(rv.freq); j++ { + err := i.readLocation(&locs[j]) + if err != nil { + return nil, err + } + rv.locs[j] = &locs[j] + } + } + + return rv, nil +} + +// Posting is a single entry in a postings list +type Posting struct { + iterator *PostingsIterator + docNum uint64 + + freq uint64 + norm float32 + locs []segment.Location +} + +// Number returns the document number of this posting in this segment +func (p *Posting) Number() uint64 { + return p.docNum +} + +// Frequency returns the frequence of occurance of this term in this doc/field +func (p *Posting) Frequency() uint64 { + return p.freq +} + +// Norm returns the normalization factor for this posting +func (p *Posting) Norm() float64 { + return float64(p.norm) +} + +// Locations returns the location information for each occurance +func (p *Posting) Locations() []segment.Location { + return p.locs +} + +// Location represents the location of a single occurance +type Location struct { + field string + pos uint64 + start uint64 + end uint64 + ap []uint64 +} + +// Field returns the name of the field (useful in composite fields to know +// which original field the value came from) +func (l *Location) Field() string { + return l.field +} + +// Start returns the start byte offset of this occurance +func (l *Location) Start() uint64 { + return l.start +} + +// End returns the end byte offset of this occurance +func (l *Location) End() uint64 { + return l.end +} + +// Pos returns the 1-based phrase position of this occurance +func (l *Location) Pos() uint64 { + return l.pos +} + +// ArrayPositions returns the array position vector associated with this occurance +func (l *Location) ArrayPositions() []uint64 { + return l.ap +} diff --git a/index/scorch/segment/bolt/segment.go b/index/scorch/segment/bolt/segment.go new file mode 100644 index 00000000..835313b8 --- /dev/null +++ b/index/scorch/segment/bolt/segment.go @@ -0,0 +1,309 @@ +// Copyright (c) 2017 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + + "github.com/RoaringBitmap/roaring" + "github.com/Smerity/govarint" + "github.com/blevesearch/bleve/index/scorch/segment" + "github.com/boltdb/bolt" + "github.com/couchbaselabs/vellum" + "github.com/golang/snappy" +) + +var readOnlyOptions = &bolt.Options{ + ReadOnly: true, +} + +// _id field is always guaranteed to have fieldID of 0 +const idFieldID uint16 = 0 + +// Open returns a boltdb impl of a segment +func Open(path string) (segment.Segment, error) { + + db, err := bolt.Open(path, 0600, readOnlyOptions) + if err != nil { + return nil, err + } + + tx, err := db.Begin(false) + if err != nil { + _ = db.Close() + return nil, err + } + + rv := &Segment{ + db: db, + tx: tx, + fieldsMap: make(map[string]uint16), + } + + err = rv.loadConfig() + if err != nil { + _ = db.Close() + return nil, err + } + + err = rv.loadFields() + if err != nil { + _ = db.Close() + return nil, err + } + + return rv, nil +} + +// Segment implements a boltdb based implementation of a segment +type Segment struct { + version uint8 + chunkFactor uint32 + db *bolt.DB + tx *bolt.Tx + + fieldsMap map[string]uint16 + fieldsInv []string + fieldsLoc []bool +} + +func (s *Segment) loadConfig() (err error) { + bucket := s.tx.Bucket(configBucket) + if bucket == nil { + return fmt.Errorf("config bucket missing") + } + + ver := bucket.Get(versionKey) + if ver == nil { + return fmt.Errorf("version key missing") + } + s.version = ver[0] + + chunk := bucket.Get(chunkKey) + if chunk == nil { + return fmt.Errorf("chunk key is missing") + } + s.chunkFactor = binary.BigEndian.Uint32(chunk) + + return nil +} + +// loadFields reads the fields info from the segment so that we never have to go +// back to disk to access this (small and used frequently) +func (s *Segment) loadFields() (err error) { + + bucket := s.tx.Bucket(fieldsBucket) + if bucket == nil { + return fmt.Errorf("fields bucket missing") + } + + indexLocs := roaring.NewBitmap() + err = bucket.ForEach(func(k []byte, v []byte) error { + + // process index locations bitset + if k[0] == indexLocsKey[0] { + _, err2 := indexLocs.FromBuffer(v) + if err2 != nil { + return fmt.Errorf("error loading indexLocs: %v", err2) + } + } else { + + _, fieldID, err2 := DecodeUvarintAscending(k) + if err2 != nil { + return err2 + } + // we store fieldID+1 in so we can discern the zero value + s.fieldsMap[string(v)] = uint16(fieldID + 1) + } + return nil + }) + if err != nil { + return err + } + + // now setup the inverse (should have same size as map and be keyed 0-(len-1)) + s.fieldsInv = make([]string, len(s.fieldsMap)) + for k, v := range s.fieldsMap { + s.fieldsInv[int(v)-1] = k + } + s.fieldsLoc = make([]bool, len(s.fieldsInv)) + for i := range s.fieldsInv { + if indexLocs.ContainsInt(i) { + s.fieldsLoc[i] = true + } + } + + return nil +} + +// Fields returns the field names used in this segment +func (s *Segment) Fields() []string { + return s.fieldsInv +} + +// Count returns the number of documents in this segment +// (this has no notion of deleted docs) +func (s *Segment) Count() uint64 { + return uint64(s.tx.Bucket(storedBucket).Stats().BucketN - 1) +} + +// Dictionary returns the term dictionary for the specified field +func (s *Segment) Dictionary(field string) (segment.TermDictionary, error) { + return s.dictionary(field) +} + +func (s *Segment) dictionary(field string) (*Dictionary, error) { + + rv := &Dictionary{ + segment: s, + field: field, + } + + rv.fieldID = s.fieldsMap[field] + if rv.fieldID > 0 { + rv.fieldID = rv.fieldID - 1 + fieldIDKey := EncodeUvarintAscending(nil, uint64(rv.fieldID)) + bucket := s.tx.Bucket(dictBucket) + if bucket == nil { + return nil, fmt.Errorf("dictionary bucket missing") + } + fstBytes := bucket.Get(fieldIDKey) + if fstBytes == nil { + return nil, fmt.Errorf("dictionary field %s bytes nil", field) + } + if fstBytes != nil { + fst, err := vellum.Load(fstBytes) + if err != nil { + return nil, fmt.Errorf("dictionary field %s vellum err: %v", field, err) + } + if err == nil { + rv.fst = fst + } + } + + } + + return rv, nil +} + +// VisitDocument invokes the DocFieldValueVistor for each stored field +// for the specified doc number +func (s *Segment) VisitDocument(num uint64, visitor segment.DocumentFieldValueVisitor) error { + storedBuucket := s.tx.Bucket(storedBucket) + if storedBuucket == nil { + return fmt.Errorf("stored bucket missing") + } + docNumKey := EncodeUvarintAscending(nil, num) + docBucket := storedBuucket.Bucket(docNumKey) + if docBucket == nil { + return fmt.Errorf("segment has no doc number %d", num) + } + metaBytes := docBucket.Get(metaKey) + if metaBytes == nil { + return fmt.Errorf("stored meta bytes for doc number %d is nil", num) + } + dataBytes := docBucket.Get(dataKey) + if dataBytes == nil { + return fmt.Errorf("stored data bytes for doc number %d is nil", num) + } + uncompressed, err := snappy.Decode(nil, dataBytes) + if err != nil { + return err + } + + reader := bytes.NewReader(metaBytes) + decoder := govarint.NewU64Base128Decoder(reader) + + keepGoing := true + for keepGoing { + field, err := decoder.GetU64() + if err == io.EOF { + break + } + if err != nil { + return err + } + typ, err := decoder.GetU64() + if err != nil { + return err + } + offset, err := decoder.GetU64() + if err != nil { + return err + } + l, err := decoder.GetU64() + if err != nil { + return err + } + numap, err := decoder.GetU64() + if err != nil { + return err + } + var arrayPos []uint64 + if numap > 0 { + arrayPos = make([]uint64, numap) + for i := 0; i < int(numap); i++ { + ap, err := decoder.GetU64() + if err != nil { + return err + } + arrayPos[i] = ap + } + } + + value := uncompressed[offset : offset+l] + keepGoing = visitor(s.fieldsInv[field], byte(typ), value, arrayPos) + } + + return nil +} + +// DocNumbers returns a bitset corresponding to the doc numbers of all the +// provided _id strings +func (s *Segment) DocNumbers(ids []string) (*roaring.Bitmap, error) { + rv := roaring.New() + + if len(s.fieldsMap) > 0 { + idDict, err := s.dictionary("_id") + if err != nil { + return nil, err + } + + for _, id := range ids { + postings, err := idDict.postingsList(id, nil) + if err != nil { + return nil, err + } + if postings.postings != nil { + rv.Or(postings.postings) + } + } + } + + return rv, nil +} + +// Close releases all resources associated with this segment +func (s *Segment) Close() error { + err := s.tx.Rollback() + if err != nil { + _ = s.db.Close() + return err + } + return s.db.Close() +} diff --git a/index/scorch/segment/bolt/segment_test.go b/index/scorch/segment/bolt/segment_test.go new file mode 100644 index 00000000..b00c7192 --- /dev/null +++ b/index/scorch/segment/bolt/segment_test.go @@ -0,0 +1,517 @@ +// Copyright (c) 2017 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bolt + +import ( + "math" + "os" + "reflect" + "testing" +) + +func TestOpen(t *testing.T) { + _ = os.RemoveAll("/tmp/scorch.bolt") + + memSegment := buildMemSegment() + err := persistSegment(memSegment, "/tmp/scorch.bolt", 1024) + if err != nil { + t.Fatalf("error persisting segment: %v", err) + } + + segment, err := Open("/tmp/scorch.bolt") + if err != nil { + t.Fatalf("error opening segment: %v", err) + } + defer func() { + cerr := segment.Close() + if cerr != nil { + t.Fatalf("error closing segment: %v", err) + } + }() + + expectFields := map[string]struct{}{ + "_id": struct{}{}, + "_all": struct{}{}, + "name": struct{}{}, + "desc": struct{}{}, + "tag": struct{}{}, + } + fields := segment.Fields() + if len(fields) != len(expectFields) { + t.Errorf("expected %d fields, only got %d", len(expectFields), len(fields)) + } + for _, field := range fields { + if _, ok := expectFields[field]; !ok { + t.Errorf("got unexpected field: %s", field) + } + } + + docCount := segment.Count() + if docCount != 1 { + t.Errorf("expected count 1, got %d", docCount) + } + + // check the _id field + dict, err := segment.Dictionary("_id") + if err != nil { + t.Fatal(err) + } + if dict == nil { + t.Fatal("got nil dict, expected non-nil") + } + + postingsList, err := dict.PostingsList("a", nil) + if err != nil { + t.Fatal(err) + } + if postingsList == nil { + t.Fatal("got nil postings list, expected non-nil") + } + + postingsItr := postingsList.Iterator() + if postingsItr == nil { + t.Fatal("got nil iterator, expected non-nil") + } + + count := 0 + nextPosting, err := postingsItr.Next() + for nextPosting != nil && err == nil { + count++ + if nextPosting.Frequency() != 1 { + t.Errorf("expected frequency 1, got %d", nextPosting.Frequency()) + } + if nextPosting.Number() != 0 { + t.Errorf("expected doc number 0, got %d", nextPosting.Number()) + } + if nextPosting.Norm() != 1.0 { + t.Errorf("expected norm 1.0, got %f", nextPosting.Norm()) + } + + nextPosting, err = postingsItr.Next() + } + if err != nil { + t.Fatal(err) + } + + if count != 1 { + t.Errorf("expected count to be 1, got %d", count) + } + + // check the name field + dict, err = segment.Dictionary("name") + if err != nil { + t.Fatal(err) + } + if dict == nil { + t.Fatal("got nil dict, expected non-nil") + } + + postingsList, err = dict.PostingsList("wow", nil) + if err != nil { + t.Fatal(err) + } + if postingsList == nil { + t.Fatal("got nil postings list, expected non-nil") + } + + postingsItr = postingsList.Iterator() + if postingsItr == nil { + t.Fatal("got nil iterator, expected non-nil") + } + + count = 0 + nextPosting, err = postingsItr.Next() + for nextPosting != nil && err == nil { + count++ + if nextPosting.Frequency() != 1 { + t.Errorf("expected frequency 1, got %d", nextPosting.Frequency()) + } + if nextPosting.Number() != 0 { + t.Errorf("expected doc number 0, got %d", nextPosting.Number()) + } + if nextPosting.Norm() != 1.0 { + t.Errorf("expected norm 1.0, got %f", nextPosting.Norm()) + } + var numLocs uint64 + for _, loc := range nextPosting.Locations() { + numLocs++ + if loc.Field() != "name" { + t.Errorf("expected loc field to be 'name', got '%s'", loc.Field()) + } + if loc.Start() != 0 { + t.Errorf("expected loc start to be 0, got %d", loc.Start()) + } + if loc.End() != 3 { + t.Errorf("expected loc end to be 3, got %d", loc.End()) + } + if loc.Pos() != 1 { + t.Errorf("expected loc pos to be 1, got %d", loc.Pos()) + } + if loc.ArrayPositions() != nil { + t.Errorf("expect loc array pos to be nil, got %v", loc.ArrayPositions()) + } + } + if numLocs != nextPosting.Frequency() { + t.Errorf("expected %d locations, got %d", nextPosting.Frequency(), numLocs) + } + + nextPosting, err = postingsItr.Next() + } + if err != nil { + t.Fatal(err) + } + + if count != 1 { + t.Errorf("expected count to be 1, got %d", count) + } + + // check the _all field (composite) + dict, err = segment.Dictionary("_all") + if err != nil { + t.Fatal(err) + } + if dict == nil { + t.Fatal("got nil dict, expected non-nil") + } + + postingsList, err = dict.PostingsList("wow", nil) + if err != nil { + t.Fatal(err) + } + if postingsList == nil { + t.Fatal("got nil postings list, expected non-nil") + } + + postingsItr = postingsList.Iterator() + if postingsItr == nil { + t.Fatal("got nil iterator, expected non-nil") + } + + count = 0 + nextPosting, err = postingsItr.Next() + for nextPosting != nil && err == nil { + count++ + if nextPosting.Frequency() != 1 { + t.Errorf("expected frequency 1, got %d", nextPosting.Frequency()) + } + if nextPosting.Number() != 0 { + t.Errorf("expected doc number 0, got %d", nextPosting.Number()) + } + expectedNorm := float32(1.0 / math.Sqrt(float64(5))) + if nextPosting.Norm() != float64(expectedNorm) { + t.Errorf("expected norm %f, got %f", expectedNorm, nextPosting.Norm()) + } + var numLocs uint64 + for _, loc := range nextPosting.Locations() { + numLocs++ + if loc.Field() != "name" { + t.Errorf("expected loc field to be 'name', got '%s'", loc.Field()) + } + if loc.Start() != 0 { + t.Errorf("expected loc start to be 0, got %d", loc.Start()) + } + if loc.End() != 3 { + t.Errorf("expected loc end to be 3, got %d", loc.End()) + } + if loc.Pos() != 1 { + t.Errorf("expected loc pos to be 1, got %d", loc.Pos()) + } + if loc.ArrayPositions() != nil { + t.Errorf("expect loc array pos to be nil, got %v", loc.ArrayPositions()) + } + } + if numLocs != nextPosting.Frequency() { + t.Errorf("expected %d locations, got %d", nextPosting.Frequency(), numLocs) + } + + nextPosting, err = postingsItr.Next() + } + if err != nil { + t.Fatal(err) + } + + if count != 1 { + t.Errorf("expected count to be 1, got %d", count) + } + + // now try a field with array positions + dict, err = segment.Dictionary("tag") + if err != nil { + t.Fatal(err) + } + if dict == nil { + t.Fatal("got nil dict, expected non-nil") + } + + postingsList, err = dict.PostingsList("dark", nil) + if err != nil { + t.Fatal(err) + } + if postingsList == nil { + t.Fatal("got nil postings list, expected non-nil") + } + + postingsItr = postingsList.Iterator() + if postingsItr == nil { + t.Fatal("got nil iterator, expected non-nil") + } + + nextPosting, err = postingsItr.Next() + for nextPosting != nil && err == nil { + + if nextPosting.Frequency() != 1 { + t.Errorf("expected frequency 1, got %d", nextPosting.Frequency()) + } + if nextPosting.Number() != 0 { + t.Errorf("expected doc number 0, got %d", nextPosting.Number()) + } + var numLocs uint64 + for _, loc := range nextPosting.Locations() { + numLocs++ + if loc.Field() != "tag" { + t.Errorf("expected loc field to be 'name', got '%s'", loc.Field()) + } + if loc.Start() != 0 { + t.Errorf("expected loc start to be 0, got %d", loc.Start()) + } + if loc.End() != 4 { + t.Errorf("expected loc end to be 3, got %d", loc.End()) + } + if loc.Pos() != 1 { + t.Errorf("expected loc pos to be 1, got %d", loc.Pos()) + } + expectArrayPos := []uint64{1} + if !reflect.DeepEqual(loc.ArrayPositions(), expectArrayPos) { + t.Errorf("expect loc array pos to be %v, got %v", expectArrayPos, loc.ArrayPositions()) + } + } + if numLocs != nextPosting.Frequency() { + t.Errorf("expected %d locations, got %d", nextPosting.Frequency(), numLocs) + } + + nextPosting, err = postingsItr.Next() + } + if err != nil { + t.Fatal(err) + } + + // now try and visit a document + var fieldValuesSeen int + err = segment.VisitDocument(0, func(field string, typ byte, value []byte, pos []uint64) bool { + fieldValuesSeen++ + return true + }) + if err != nil { + t.Fatal(err) + } + if fieldValuesSeen != 5 { + t.Errorf("expected 5 field values, got %d", fieldValuesSeen) + } +} + +func TestOpenMulti(t *testing.T) { + _ = os.RemoveAll("/tmp/scorch.bolt") + + memSegment := buildMemSegmentMulti() + err := persistSegment(memSegment, "/tmp/scorch.bolt", 1024) + if err != nil { + t.Fatalf("error persisting segment: %v", err) + } + + segment, err := Open("/tmp/scorch.bolt") + if err != nil { + t.Fatalf("error opening segment: %v", err) + } + defer func() { + cerr := segment.Close() + if cerr != nil { + t.Fatalf("error closing segment: %v", err) + } + }() + + if segment.Count() != 2 { + t.Errorf("expected count 2, got %d", segment.Count()) + } + + // check the desc field + dict, err := segment.Dictionary("desc") + if err != nil { + t.Fatal(err) + } + if dict == nil { + t.Fatal("got nil dict, expected non-nil") + } + + postingsList, err := dict.PostingsList("thing", nil) + if err != nil { + t.Fatal(err) + } + if postingsList == nil { + t.Fatal("got nil postings list, expected non-nil") + } + + postingsItr := postingsList.Iterator() + if postingsItr == nil { + t.Fatal("got nil iterator, expected non-nil") + } + + count := 0 + nextPosting, err := postingsItr.Next() + for nextPosting != nil && err == nil { + count++ + nextPosting, err = postingsItr.Next() + } + if err != nil { + t.Fatal(err) + } + + if count != 2 { + t.Errorf("expected count to be 2, got %d", count) + } + + // get docnum of a + exclude, err := segment.DocNumbers([]string{"a"}) + if err != nil { + t.Fatal(err) + } + + // look for term 'thing' excluding doc 'a' + postingsListExcluding, err := dict.PostingsList("thing", exclude) + if err != nil { + t.Fatal(err) + } + if postingsList == nil { + t.Fatal("got nil postings list, expected non-nil") + } + + postingsListExcludingCount := postingsListExcluding.Count() + if postingsListExcludingCount != 1 { + t.Errorf("expected count from postings list to be 1, got %d", postingsListExcludingCount) + } + + postingsItrExcluding := postingsListExcluding.Iterator() + if postingsItr == nil { + t.Fatal("got nil iterator, expected non-nil") + } + + count = 0 + nextPosting, err = postingsItrExcluding.Next() + for nextPosting != nil && err == nil { + count++ + nextPosting, err = postingsItrExcluding.Next() + } + if err != nil { + t.Fatal(err) + } + + if count != 1 { + t.Errorf("expected count to be 1, got %d", count) + } +} + +func TestOpenMultiWithTwoChunks(t *testing.T) { + _ = os.RemoveAll("/tmp/scorch.bolt") + + memSegment := buildMemSegmentMulti() + err := persistSegment(memSegment, "/tmp/scorch.bolt", 1) + if err != nil { + t.Fatalf("error persisting segment: %v", err) + } + + segment, err := Open("/tmp/scorch.bolt") + if err != nil { + t.Fatalf("error opening segment: %v", err) + } + defer func() { + cerr := segment.Close() + if cerr != nil { + t.Fatalf("error closing segment: %v", err) + } + }() + + if segment.Count() != 2 { + t.Errorf("expected count 2, got %d", segment.Count()) + } + + // check the desc field + dict, err := segment.Dictionary("desc") + if err != nil { + t.Fatal(err) + } + if dict == nil { + t.Fatal("got nil dict, expected non-nil") + } + + postingsList, err := dict.PostingsList("thing", nil) + if err != nil { + t.Fatal(err) + } + if postingsList == nil { + t.Fatal("got nil postings list, expected non-nil") + } + + postingsItr := postingsList.Iterator() + if postingsItr == nil { + t.Fatal("got nil iterator, expected non-nil") + } + + count := 0 + nextPosting, err := postingsItr.Next() + for nextPosting != nil && err == nil { + count++ + nextPosting, err = postingsItr.Next() + } + if err != nil { + t.Fatal(err) + } + + if count != 2 { + t.Errorf("expected count to be 2, got %d", count) + } + + // get docnum of a + exclude, err := segment.DocNumbers([]string{"a"}) + if err != nil { + t.Fatal(err) + } + + // look for term 'thing' excluding doc 'a' + postingsListExcluding, err := dict.PostingsList("thing", exclude) + if err != nil { + t.Fatal(err) + } + if postingsList == nil { + t.Fatal("got nil postings list, expected non-nil") + } + + postingsItrExcluding := postingsListExcluding.Iterator() + if postingsItr == nil { + t.Fatal("got nil iterator, expected non-nil") + } + + count = 0 + nextPosting, err = postingsItrExcluding.Next() + for nextPosting != nil && err == nil { + count++ + nextPosting, err = postingsItrExcluding.Next() + } + if err != nil { + t.Fatal(err) + } + + if count != 1 { + t.Errorf("expected count to be 1, got %d", count) + } +}