diff --git a/index/scorch/merge.go b/index/scorch/merge.go index ec2c8d4b..41086ad3 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -111,6 +111,11 @@ func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions, if err != nil { return &mergePlannerOptions, err } + + err = mergeplan.ValidateMergePlannerOptions(&mergePlannerOptions) + if err != nil { + return nil, err + } } return &mergePlannerOptions, nil } diff --git a/index/scorch/mergeplan/merge_plan.go b/index/scorch/mergeplan/merge_plan.go index 62f643f4..b09e5381 100644 --- a/index/scorch/mergeplan/merge_plan.go +++ b/index/scorch/mergeplan/merge_plan.go @@ -18,6 +18,7 @@ package mergeplan import ( + "errors" "fmt" "math" "sort" @@ -115,7 +116,15 @@ func (o *MergePlanOptions) RaiseToFloorSegmentSize(s int64) int64 { return o.FloorSegmentSize } -// Suggested default options. +// MaxSegmentSizeLimit represents the maximum size of a segment, +// this limit comes with hit-1 optimisation/max encoding limit uint31. +const MaxSegmentSizeLimit = 1<<31 - 1 + +// ErrMaxSegmentSizeTooLarge is returned when the size of the segment +// exceeds the MaxSegmentSizeLimit +var ErrMaxSegmentSizeTooLarge = errors.New("MaxSegmentSize exceeds the size limit") + +// DefaultMergePlanOptions suggests the default options. var DefaultMergePlanOptions = MergePlanOptions{ MaxSegmentsPerTier: 10, MaxSegmentSize: 5000000, @@ -367,3 +376,11 @@ func ToBarChart(prefix string, barMax int, segments []Segment, plan *MergePlan) return strings.Join(rv, "\n") } + +// ValidateMergePlannerOptions validates the merge planner options +func ValidateMergePlannerOptions(options *MergePlanOptions) error { + if options.MaxSegmentSize > MaxSegmentSizeLimit { + return ErrMaxSegmentSizeTooLarge + } + return nil +} diff --git a/index/scorch/mergeplan/merge_plan_test.go b/index/scorch/mergeplan/merge_plan_test.go index 419ab825..3adc1f4b 100644 --- a/index/scorch/mergeplan/merge_plan_test.go +++ b/index/scorch/mergeplan/merge_plan_test.go @@ -17,10 +17,12 @@ package mergeplan import ( "encoding/json" "fmt" + "math/rand" "os" "reflect" "sort" "testing" + "time" ) // Implements the Segment interface for testing, @@ -401,6 +403,62 @@ func TestManySameSizedSegmentsWithDeletesBetweenMerges(t *testing.T) { } } +func TestValidateMergePlannerOptions(t *testing.T) { + o := &MergePlanOptions{ + MaxSegmentSize: 1 << 32, + MaxSegmentsPerTier: 3, + TierGrowth: 3.0, + SegmentsPerMergeTask: 3, + } + err := ValidateMergePlannerOptions(o) + if err != ErrMaxSegmentSizeTooLarge { + t.Error("Validation expected to fail as the MaxSegmentSize exceeds limit") + } +} + +func TestPlanMaxSegmentSizeLimit(t *testing.T) { + o := &MergePlanOptions{ + MaxSegmentSize: 20, + MaxSegmentsPerTier: 5, + TierGrowth: 3.0, + SegmentsPerMergeTask: 5, + FloorSegmentSize: 5, + } + segments := makeLinearSegments(20) + + s := rand.NewSource(time.Now().UnixNano()) + r := rand.New(s) + + max := 20 + min := 5 + randomInRange := func() int64 { + return int64(r.Intn(max-min) + min) + } + for i := 1; i < 20; i++ { + o.MaxSegmentSize = randomInRange() + plans, err := Plan(segments, o) + if err != nil { + t.Errorf("Plan failed, err: %v", err) + } + if len(plans.Tasks) == 0 { + t.Errorf("expected some plans with tasks") + } + + for _, task := range plans.Tasks { + var totalLiveSize int64 + for _, segs := range task.Segments { + totalLiveSize += segs.LiveSize() + + } + if totalLiveSize >= o.MaxSegmentSize { + t.Errorf("merged segments size: %d exceeding the MaxSegmentSize"+ + "limit: %d", totalLiveSize, o.MaxSegmentSize) + } + } + } + +} + // ---------------------------------------- type testCyclesSpec struct { diff --git a/index/scorch/persister.go b/index/scorch/persister.go index f1a372e7..ccb0c1f2 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -633,14 +633,14 @@ func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) { return 0, err } - if len(persistedEpochs) <= NumSnapshotsToKeep { + if len(persistedEpochs) <= s.numSnapshotsToKeep { // we need to keep everything return 0, nil } // make a map of epochs to protect from deletion - protectedEpochs := make(map[uint64]struct{}, NumSnapshotsToKeep) - for _, epoch := range persistedEpochs[0:NumSnapshotsToKeep] { + protectedEpochs := make(map[uint64]struct{}, s.numSnapshotsToKeep) + for _, epoch := range persistedEpochs[0:s.numSnapshotsToKeep] { protectedEpochs[epoch] = struct{}{} } diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 6d0bcd1e..4bda2d34 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -28,7 +28,6 @@ import ( "github.com/blevesearch/bleve/document" "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/index/scorch/segment" - "github.com/blevesearch/bleve/index/scorch/segment/mem" "github.com/blevesearch/bleve/index/scorch/segment/zap" "github.com/blevesearch/bleve/index/store" "github.com/blevesearch/bleve/registry" @@ -58,6 +57,7 @@ type Scorch struct { nextSnapshotEpoch uint64 eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC. ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet. + numSnapshotsToKeep int closeCh chan struct{} introductions chan *segmentIntroduction @@ -191,6 +191,17 @@ func (s *Scorch) openBolt() error { } } + s.numSnapshotsToKeep = NumSnapshotsToKeep + if v, ok := s.config["numSnapshotsToKeep"]; ok { + var t int + if t, err = parseToInteger(v); err != nil { + return fmt.Errorf("numSnapshotsToKeep parse err: %v", err) + } + if t > 0 { + s.numSnapshotsToKeep = t + } + } + return nil } @@ -289,7 +300,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) { var newSegment segment.Segment if len(analysisResults) > 0 { - newSegment, err = zap.NewSegmentBase(mem.NewFromAnalyzedDocs(analysisResults), DefaultChunkFactor) + newSegment, err = zap.AnalysisResultsToSegmentBase(analysisResults, DefaultChunkFactor) if err != nil { return err } @@ -504,3 +515,15 @@ func (s *Scorch) unmarkIneligibleForRemoval(filename string) { func init() { registry.RegisterIndexType(Name, NewScorch) } + +func parseToInteger(i interface{}) (int, error) { + switch v := i.(type) { + case float64: + return int(v), nil + case int: + return v, nil + + default: + return 0, fmt.Errorf("expects int or float64 value") + } +} diff --git a/index/scorch/segment/zap/build.go b/index/scorch/segment/zap/build.go index 28df9bd6..a601072c 100644 --- a/index/scorch/segment/zap/build.go +++ b/index/scorch/segment/zap/build.go @@ -16,16 +16,10 @@ package zap import ( "bufio" - "bytes" - "encoding/binary" "math" "os" - "sort" "github.com/Smerity/govarint" - "github.com/blevesearch/bleve/index/scorch/segment/mem" - "github.com/couchbase/vellum" - "github.com/golang/snappy" ) const version uint32 = 6 @@ -82,186 +76,6 @@ func PersistSegmentBase(sb *SegmentBase, path string) error { return nil } -// PersistSegment takes the in-memory segment and persists it to -// the specified path in the zap file format. -func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) error { - flag := os.O_RDWR | os.O_CREATE - - f, err := os.OpenFile(path, flag, 0600) - if err != nil { - return err - } - - cleanup := func() { - _ = f.Close() - _ = os.Remove(path) - } - - // buffer the output - br := bufio.NewWriter(f) - - // wrap it for counting (tracking offsets) - cr := NewCountHashWriter(br) - - numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, _, err := - persistBase(memSegment, cr, chunkFactor) - if err != nil { - cleanup() - return err - } - - err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, - chunkFactor, cr.Sum32(), cr) - if err != nil { - cleanup() - return err - } - - err = br.Flush() - if err != nil { - cleanup() - return err - } - - err = f.Sync() - if err != nil { - cleanup() - return err - } - - err = f.Close() - if err != nil { - cleanup() - return err - } - - return nil -} - -func persistBase(memSegment *mem.Segment, cr *CountHashWriter, chunkFactor uint32) ( - numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64, - dictLocs []uint64, err error) { - docValueOffset = uint64(fieldNotUninverted) - - if len(memSegment.Stored) > 0 { - storedIndexOffset, err = persistStored(memSegment, cr) - if err != nil { - return 0, 0, 0, 0, nil, err - } - - freqOffsets, locOffsets, err := persistPostingDetails(memSegment, cr, chunkFactor) - if err != nil { - return 0, 0, 0, 0, nil, err - } - - postingsListLocs, err := persistPostingsLocs(memSegment, cr) - if err != nil { - return 0, 0, 0, 0, nil, err - } - - postingsLocs, err := persistPostingsLists(memSegment, cr, postingsListLocs, freqOffsets, locOffsets) - if err != nil { - return 0, 0, 0, 0, nil, err - } - - dictLocs, err = persistDictionary(memSegment, cr, postingsLocs) - if err != nil { - return 0, 0, 0, 0, nil, err - } - - docValueOffset, err = persistFieldDocValues(memSegment, cr, chunkFactor) - if err != nil { - return 0, 0, 0, 0, nil, err - } - } else { - dictLocs = make([]uint64, len(memSegment.FieldsInv)) - } - - fieldsIndexOffset, err = persistFields(memSegment.FieldsInv, cr, dictLocs) - if err != nil { - return 0, 0, 0, 0, nil, err - } - - return uint64(len(memSegment.Stored)), storedIndexOffset, fieldsIndexOffset, docValueOffset, - dictLocs, nil -} - -func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) { - var curr int - var metaBuf bytes.Buffer - var data, compressed []byte - - metaEncoder := govarint.NewU64Base128Encoder(&metaBuf) - - docNumOffsets := make(map[int]uint64, len(memSegment.Stored)) - - for docNum, storedValues := range memSegment.Stored { - if docNum != 0 { - // reset buffer if necessary - curr = 0 - metaBuf.Reset() - data = data[:0] - compressed = compressed[:0] - } - - st := memSegment.StoredTypes[docNum] - sp := memSegment.StoredPos[docNum] - - // encode fields in order - for fieldID := range memSegment.FieldsInv { - if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok { - stf := st[uint16(fieldID)] - spf := sp[uint16(fieldID)] - - var err2 error - curr, data, err2 = persistStoredFieldValues(fieldID, - storedFieldValues, stf, spf, curr, metaEncoder, data) - if err2 != nil { - return 0, err2 - } - } - } - - metaEncoder.Close() - metaBytes := metaBuf.Bytes() - - // compress the data - compressed = snappy.Encode(compressed, data) - - // record where we're about to start writing - docNumOffsets[docNum] = uint64(w.Count()) - - // write out the meta len and compressed data len - _, err := writeUvarints(w, uint64(len(metaBytes)), uint64(len(compressed))) - if err != nil { - return 0, err - } - - // now write the meta - _, err = w.Write(metaBytes) - if err != nil { - return 0, err - } - // now write the compressed data - _, err = w.Write(compressed) - if err != nil { - return 0, err - } - } - - // return value is the start of the stored index - rv := uint64(w.Count()) - // now write out the stored doc index - for docNum := range memSegment.Stored { - err := binary.Write(w, binary.BigEndian, docNumOffsets[docNum]) - if err != nil { - return 0, err - } - } - - return rv, nil -} - func persistStoredFieldValues(fieldID int, storedFieldValues [][]byte, stf []byte, spf [][]uint64, curr int, metaEncoder *govarint.Base128Encoder, data []byte) ( @@ -307,308 +121,6 @@ func persistStoredFieldValues(fieldID int, return curr, data, nil } -func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) { - freqOffsets := make([]uint64, 0, len(memSegment.Postings)) - tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1)) - for postingID := range memSegment.Postings { - if postingID != 0 { - tfEncoder.Reset() - } - freqs := memSegment.Freqs[postingID] - norms := memSegment.Norms[postingID] - postingsListItr := memSegment.Postings[postingID].Iterator() - var offset int - for postingsListItr.HasNext() { - docNum := uint64(postingsListItr.Next()) - - // put freq & norm - err := tfEncoder.Add(docNum, freqs[offset], uint64(math.Float32bits(norms[offset]))) - if err != nil { - return nil, nil, err - } - - offset++ - } - - // record where this postings freq info starts - freqOffsets = append(freqOffsets, uint64(w.Count())) - - tfEncoder.Close() - _, err := tfEncoder.Write(w) - if err != nil { - return nil, nil, err - } - } - - // now do it again for the locations - locOffsets := make([]uint64, 0, len(memSegment.Postings)) - locEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1)) - for postingID := range memSegment.Postings { - if postingID != 0 { - locEncoder.Reset() - } - freqs := memSegment.Freqs[postingID] - locfields := memSegment.Locfields[postingID] - locpos := memSegment.Locpos[postingID] - locstarts := memSegment.Locstarts[postingID] - locends := memSegment.Locends[postingID] - locarraypos := memSegment.Locarraypos[postingID] - postingsListItr := memSegment.Postings[postingID].Iterator() - var offset int - var locOffset int - for postingsListItr.HasNext() { - docNum := uint64(postingsListItr.Next()) - n := int(freqs[offset]) - for i := 0; i < n; i++ { - if len(locfields) > 0 { - err := locEncoder.Add(docNum, uint64(locfields[locOffset]), - locpos[locOffset], locstarts[locOffset], locends[locOffset], - uint64(len(locarraypos[locOffset]))) - if err != nil { - return nil, nil, err - } - - // put each array position - err = locEncoder.Add(docNum, locarraypos[locOffset]...) - if err != nil { - return nil, nil, err - } - } - locOffset++ - } - offset++ - } - - // record where this postings loc info starts - locOffsets = append(locOffsets, uint64(w.Count())) - - locEncoder.Close() - _, err := locEncoder.Write(w) - if err != nil { - return nil, nil, err - } - } - - return freqOffsets, locOffsets, nil -} - -func persistPostingsLocs(memSegment *mem.Segment, w *CountHashWriter) (rv []uint64, err error) { - rv = make([]uint64, 0, len(memSegment.PostingsLocs)) - reuseBufVarint := make([]byte, binary.MaxVarintLen64) - for postingID := range memSegment.PostingsLocs { - // record where we start this posting loc - rv = append(rv, uint64(w.Count())) - // write out the length and bitmap - _, err = writeRoaringWithLen(memSegment.PostingsLocs[postingID], w, reuseBufVarint) - if err != nil { - return nil, err - } - } - return rv, nil -} - -func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter, - postingsListLocs, freqOffsets, locOffsets []uint64) (rv []uint64, err error) { - rv = make([]uint64, 0, len(memSegment.Postings)) - reuseBufVarint := make([]byte, binary.MaxVarintLen64) - for postingID := range memSegment.Postings { - // record where we start this posting list - rv = append(rv, uint64(w.Count())) - - // write out the term info, loc info, and loc posting list offset - _, err = writeUvarints(w, freqOffsets[postingID], - locOffsets[postingID], postingsListLocs[postingID]) - if err != nil { - return nil, err - } - - // write out the length and bitmap - _, err = writeRoaringWithLen(memSegment.Postings[postingID], w, reuseBufVarint) - if err != nil { - return nil, err - } - } - return rv, nil -} - -func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs []uint64) ([]uint64, error) { - rv := make([]uint64, 0, len(memSegment.DictKeys)) - - varintBuf := make([]byte, binary.MaxVarintLen64) - - var buffer bytes.Buffer - builder, err := vellum.New(&buffer, nil) - if err != nil { - return nil, err - } - for fieldID, fieldTerms := range memSegment.DictKeys { - - dict := memSegment.Dicts[fieldID] - // now walk the dictionary in order of fieldTerms (already sorted) - for _, fieldTerm := range fieldTerms { - postingID := dict[fieldTerm] - 1 - postingsAddr := postingsLocs[postingID] - err = builder.Insert([]byte(fieldTerm), postingsAddr) - if err != nil { - return nil, err - } - } - err = builder.Close() - if err != nil { - return nil, err - } - - // record where this dictionary starts - rv = append(rv, uint64(w.Count())) - - vellumData := buffer.Bytes() - - // write out the length of the vellum data - n := binary.PutUvarint(varintBuf, uint64(len(vellumData))) - _, err = w.Write(varintBuf[:n]) - if err != nil { - return nil, err - } - - // write this vellum to disk - _, err = w.Write(vellumData) - if err != nil { - return nil, err - } - - // reset buffer and vellum builder - buffer.Reset() - err = builder.Reset(&buffer) - if err != nil { - return nil, err - } - } - - return rv, nil -} - -type docIDRange []uint64 - -func (a docIDRange) Len() int { return len(a) } -func (a docIDRange) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a docIDRange) Less(i, j int) bool { return a[i] < a[j] } - -func persistDocValues(memSegment *mem.Segment, w *CountHashWriter, - chunkFactor uint32) (map[uint16]uint64, error) { - fieldChunkOffsets := make(map[uint16]uint64, len(memSegment.FieldsInv)) - fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1)) - - var postings *mem.PostingsList - var postingsItr *mem.PostingsIterator - - for fieldID := range memSegment.DocValueFields { - field := memSegment.FieldsInv[fieldID] - docTermMap := make(map[uint64][]byte, 0) - dict, err := memSegment.Dictionary(field) - if err != nil { - return nil, err - } - - dictItr := dict.Iterator() - next, err := dictItr.Next() - for err == nil && next != nil { - var err1 error - postings, err1 = dict.(*mem.Dictionary).InitPostingsList(next.Term, nil, postings) - if err1 != nil { - return nil, err1 - } - - postingsItr = postings.InitIterator(postingsItr) - nextPosting, err2 := postingsItr.Next() - for err2 == nil && nextPosting != nil { - docNum := nextPosting.Number() - docTermMap[docNum] = append(append(docTermMap[docNum], []byte(next.Term)...), termSeparator) - nextPosting, err2 = postingsItr.Next() - } - if err2 != nil { - return nil, err2 - } - - next, err = dictItr.Next() - } - if err != nil { - return nil, err - } - - // sort wrt to docIDs - docNumbers := make(docIDRange, 0, len(docTermMap)) - for k := range docTermMap { - docNumbers = append(docNumbers, k) - } - sort.Sort(docNumbers) - - for _, docNum := range docNumbers { - err = fdvEncoder.Add(docNum, docTermMap[docNum]) - if err != nil { - return nil, err - } - } - - fieldChunkOffsets[fieldID] = uint64(w.Count()) - err = fdvEncoder.Close() - if err != nil { - return nil, err - } - // persist the doc value details for this field - _, err = fdvEncoder.Write(w) - if err != nil { - return nil, err - } - // reseting encoder for the next field - fdvEncoder.Reset() - } - - return fieldChunkOffsets, nil -} - -func persistFieldDocValues(memSegment *mem.Segment, w *CountHashWriter, - chunkFactor uint32) (uint64, error) { - fieldDvOffsets, err := persistDocValues(memSegment, w, chunkFactor) - if err != nil { - return 0, err - } - - fieldDocValuesOffset := uint64(w.Count()) - buf := make([]byte, binary.MaxVarintLen64) - offset := uint64(0) - ok := true - for fieldID := range memSegment.FieldsInv { - // if the field isn't configured for docValue, then mark - // the offset accordingly - if offset, ok = fieldDvOffsets[uint16(fieldID)]; !ok { - offset = fieldNotUninverted - } - n := binary.PutUvarint(buf, uint64(offset)) - _, err := w.Write(buf[:n]) - if err != nil { - return 0, err - } - } - - return fieldDocValuesOffset, nil -} - -func NewSegmentBase(memSegment *mem.Segment, chunkFactor uint32) (*SegmentBase, error) { - var br bytes.Buffer - - cr := NewCountHashWriter(&br) - - numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs, err := - persistBase(memSegment, cr, chunkFactor) - if err != nil { - return nil, err - } - - return InitSegmentBase(br.Bytes(), cr.Sum32(), chunkFactor, - memSegment.FieldsMap, memSegment.FieldsInv, numDocs, - storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs) -} - func InitSegmentBase(mem []byte, memCRC uint32, chunkFactor uint32, fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64, storedIndexOffset uint64, fieldsIndexOffset uint64, docValueOffset uint64, diff --git a/index/scorch/segment/zap/build_test.go b/index/scorch/segment/zap/build_test.go index 9063980b..65de7931 100644 --- a/index/scorch/segment/zap/build_test.go +++ b/index/scorch/segment/zap/build_test.go @@ -21,20 +21,22 @@ import ( "github.com/blevesearch/bleve/analysis" "github.com/blevesearch/bleve/document" "github.com/blevesearch/bleve/index" - "github.com/blevesearch/bleve/index/scorch/segment/mem" ) func TestBuild(t *testing.T) { _ = os.RemoveAll("/tmp/scorch.zap") - memSegment := buildMemSegment() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + sb, err := buildTestSegment() + if err != nil { + t.Fatal(err) + } + err = PersistSegmentBase(sb, "/tmp/scorch.zap") if err != nil { t.Fatal(err) } } -func buildMemSegment() *mem.Segment { +func buildTestSegment() (*SegmentBase, error) { doc := &document.Document{ ID: "a", Fields: []document.Field{ @@ -120,11 +122,22 @@ func buildMemSegment() *mem.Segment { } } - return mem.NewFromAnalyzedDocs(results) + return AnalysisResultsToSegmentBase(results, 1024) } -func buildMemSegmentMulti() *mem.Segment { +func buildTestSegmentMulti() (*SegmentBase, error) { + results := buildTestAnalysisResultsMulti() + return AnalysisResultsToSegmentBase(results, 1024) +} + +func buildTestSegmentMultiWithChunkFactor(chunkFactor uint32) (*SegmentBase, error) { + results := buildTestAnalysisResultsMulti() + + return AnalysisResultsToSegmentBase(results, chunkFactor) +} + +func buildTestAnalysisResultsMulti() []*index.AnalysisResult { doc := &document.Document{ ID: "a", Fields: []document.Field{ @@ -282,13 +295,11 @@ func buildMemSegmentMulti() *mem.Segment { } } - segment := mem.NewFromAnalyzedDocs(results) - - return segment + return results } -func buildMemSegmentWithDefaultFieldMapping() (*mem.Segment, []string) { - +func buildTestSegmentWithDefaultFieldMapping(chunkFactor uint32) ( + *SegmentBase, []string, error) { doc := &document.Document{ ID: "a", Fields: []document.Field{ @@ -371,5 +382,7 @@ func buildMemSegmentWithDefaultFieldMapping() (*mem.Segment, []string) { } } - return mem.NewFromAnalyzedDocs(results), fields + sb, err := AnalysisResultsToSegmentBase(results, chunkFactor) + + return sb, fields, err } diff --git a/index/scorch/segment/zap/contentcoder.go b/index/scorch/segment/zap/contentcoder.go index 30197c0e..1e7a785c 100644 --- a/index/scorch/segment/zap/contentcoder.go +++ b/index/scorch/segment/zap/contentcoder.go @@ -154,9 +154,11 @@ func (c *chunkedContentCoder) Write(w io.Writer) (int, error) { if err != nil { return tw, err } - // write out the chunk lens - for _, chunkLen := range c.chunkLens { - n := binary.PutUvarint(buf, uint64(chunkLen)) + + chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens) + // write out the chunk offsets + for _, chunkOffset := range chunkOffsets { + n := binary.PutUvarint(buf, chunkOffset) nw, err = w.Write(buf[:n]) tw += nw if err != nil { diff --git a/index/scorch/segment/zap/contentcoder_test.go b/index/scorch/segment/zap/contentcoder_test.go index fce84714..ff26138a 100644 --- a/index/scorch/segment/zap/contentcoder_test.go +++ b/index/scorch/segment/zap/contentcoder_test.go @@ -46,7 +46,7 @@ func TestChunkContentCoder(t *testing.T) { []byte("scorch"), }, - expected: string([]byte{0x02, 0x0b, 0x0b, 0x01, 0x00, 0x06, 0x06, 0x14, + expected: string([]byte{0x02, 0x0b, 0x16, 0x01, 0x00, 0x06, 0x06, 0x14, 0x75, 0x70, 0x73, 0x69, 0x64, 0x65, 0x01, 0x01, 0x06, 0x06, 0x14, 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68}), }, diff --git a/index/scorch/segment/zap/dict.go b/index/scorch/segment/zap/dict.go index e5d71268..3b8132f2 100644 --- a/index/scorch/segment/zap/dict.go +++ b/index/scorch/segment/zap/dict.go @@ -68,7 +68,19 @@ func (d *Dictionary) postingsListInit(rv *PostingsList, except *roaring.Bitmap) if rv == nil { rv = &PostingsList{} } else { + postings := rv.postings + if postings != nil { + postings.Clear() + } + locBitmap := rv.locBitmap + if locBitmap != nil { + locBitmap.Clear() + } + *rv = PostingsList{} // clear the struct + + rv.postings = postings + rv.locBitmap = locBitmap } rv.sb = d.sb rv.except = except diff --git a/index/scorch/segment/zap/dict_test.go b/index/scorch/segment/zap/dict_test.go index 336fb37c..b70f2adf 100644 --- a/index/scorch/segment/zap/dict_test.go +++ b/index/scorch/segment/zap/dict_test.go @@ -22,10 +22,9 @@ import ( "github.com/blevesearch/bleve/analysis" "github.com/blevesearch/bleve/document" "github.com/blevesearch/bleve/index" - "github.com/blevesearch/bleve/index/scorch/segment/mem" ) -func buildMemSegmentForDict() *mem.Segment { +func buildTestSegmentForDict() (*SegmentBase, error) { doc := &document.Document{ ID: "a", Fields: []document.Field{ @@ -99,17 +98,15 @@ func buildMemSegmentForDict() *mem.Segment { }, } - segment := mem.NewFromAnalyzedDocs(results) - - return segment + return AnalysisResultsToSegmentBase(results, 1024) } func TestDictionary(t *testing.T) { _ = os.RemoveAll("/tmp/scorch.zap") - memSegment := buildMemSegmentForDict() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + testSeg, _ := buildTestSegmentForDict() + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatalf("error persisting segment: %v", err) } diff --git a/index/scorch/segment/zap/docvalues.go b/index/scorch/segment/zap/docvalues.go index 2399801a..84427190 100644 --- a/index/scorch/segment/zap/docvalues.go +++ b/index/scorch/segment/zap/docvalues.go @@ -38,7 +38,7 @@ type docValueIterator struct { field string curChunkNum uint64 numChunks uint64 - chunkLens []uint64 + chunkOffsets []uint64 dvDataLoc uint64 curChunkHeader []MetaData curChunkData []byte // compressed data cache @@ -47,7 +47,7 @@ type docValueIterator struct { func (di *docValueIterator) size() int { return reflectStaticSizedocValueIterator + size.SizeOfPtr + len(di.field) + - len(di.chunkLens)*size.SizeOfUint64 + + len(di.chunkOffsets)*size.SizeOfUint64 + len(di.curChunkHeader)*reflectStaticSizeMetaData + len(di.curChunkData) } @@ -69,7 +69,7 @@ func (s *SegmentBase) loadFieldDocValueIterator(field string, } // read the number of chunks, chunk lengths - var offset, clen uint64 + var offset, loc uint64 numChunks, read := binary.Uvarint(s.mem[fieldDvLoc : fieldDvLoc+binary.MaxVarintLen64]) if read <= 0 { return nil, fmt.Errorf("failed to read the field "+ @@ -78,16 +78,16 @@ func (s *SegmentBase) loadFieldDocValueIterator(field string, offset += uint64(read) fdvIter := &docValueIterator{ - curChunkNum: math.MaxUint64, - field: field, - chunkLens: make([]uint64, int(numChunks)), + curChunkNum: math.MaxUint64, + field: field, + chunkOffsets: make([]uint64, int(numChunks)), } for i := 0; i < int(numChunks); i++ { - clen, read = binary.Uvarint(s.mem[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64]) + loc, read = binary.Uvarint(s.mem[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64]) if read <= 0 { - return nil, fmt.Errorf("corrupted chunk length during segment load") + return nil, fmt.Errorf("corrupted chunk offset during segment load") } - fdvIter.chunkLens[i] = clen + fdvIter.chunkOffsets[i] = loc offset += uint64(read) } @@ -99,12 +99,11 @@ func (di *docValueIterator) loadDvChunk(chunkNumber, localDocNum uint64, s *SegmentBase) error { // advance to the chunk where the docValues // reside for the given docNum - destChunkDataLoc := di.dvDataLoc - for i := 0; i < int(chunkNumber); i++ { - destChunkDataLoc += di.chunkLens[i] - } + destChunkDataLoc, curChunkEnd := di.dvDataLoc, di.dvDataLoc + start, end := readChunkBoundary(int(chunkNumber), di.chunkOffsets) + destChunkDataLoc += start + curChunkEnd += end - curChunkSize := di.chunkLens[chunkNumber] // read the number of docs reside in the chunk numDocs, read := binary.Uvarint(s.mem[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64]) if read <= 0 { @@ -122,7 +121,7 @@ func (di *docValueIterator) loadDvChunk(chunkNumber, } compressedDataLoc := chunkMetaLoc + offset - dataLength := destChunkDataLoc + curChunkSize - compressedDataLoc + dataLength := curChunkEnd - compressedDataLoc di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength] di.curChunkNum = chunkNumber return nil diff --git a/index/scorch/segment/zap/intcoder.go b/index/scorch/segment/zap/intcoder.go index 6680e608..81ef8bb2 100644 --- a/index/scorch/segment/zap/intcoder.go +++ b/index/scorch/segment/zap/intcoder.go @@ -111,10 +111,13 @@ func (c *chunkedIntCoder) Write(w io.Writer) (int, error) { } buf := c.buf - // write out the number of chunks & each chunkLen - n := binary.PutUvarint(buf, uint64(len(c.chunkLens))) - for _, chunkLen := range c.chunkLens { - n += binary.PutUvarint(buf[n:], uint64(chunkLen)) + // convert the chunk lengths into chunk offsets + chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens) + + // write out the number of chunks & each chunk offsets + n := binary.PutUvarint(buf, uint64(len(chunkOffsets))) + for _, chunkOffset := range chunkOffsets { + n += binary.PutUvarint(buf[n:], chunkOffset) } tw, err := w.Write(buf[:n]) @@ -134,3 +137,36 @@ func (c *chunkedIntCoder) Write(w io.Writer) (int, error) { func (c *chunkedIntCoder) FinalSize() int { return len(c.final) } + +// modifyLengthsToEndOffsets converts the chunk length array +// to a chunk offset array. The readChunkBoundary +// will figure out the start and end of every chunk from +// these offsets. Starting offset of i'th index is stored +// in i-1'th position except for 0'th index and ending offset +// is stored at i'th index position. +// For 0'th element, starting position is always zero. +// eg: +// Lens -> 5 5 5 5 => 5 10 15 20 +// Lens -> 0 5 0 5 => 0 5 5 10 +// Lens -> 0 0 0 5 => 0 0 0 5 +// Lens -> 5 0 0 0 => 5 5 5 5 +// Lens -> 0 5 0 0 => 0 5 5 5 +// Lens -> 0 0 5 0 => 0 0 5 5 +func modifyLengthsToEndOffsets(lengths []uint64) []uint64 { + var runningOffset uint64 + var index, i int + for i = 1; i <= len(lengths); i++ { + runningOffset += lengths[i-1] + lengths[index] = runningOffset + index++ + } + return lengths +} + +func readChunkBoundary(chunk int, offsets []uint64) (uint64, uint64) { + var start uint64 + if chunk > 0 { + start = offsets[chunk-1] + } + return start, offsets[chunk] +} diff --git a/index/scorch/segment/zap/intcoder_test.go b/index/scorch/segment/zap/intcoder_test.go index 85d2c5a7..952e0669 100644 --- a/index/scorch/segment/zap/intcoder_test.go +++ b/index/scorch/segment/zap/intcoder_test.go @@ -46,8 +46,8 @@ func TestChunkIntCoder(t *testing.T) { []uint64{3}, []uint64{7}, }, - // 2 chunks, chunk-0 length 1, chunk-1 length 1, value 3, value 7 - expected: []byte{0x2, 0x1, 0x1, 0x3, 0x7}, + // 2 chunks, chunk-0 offset 1, chunk-1 offset 2, value 3, value 7 + expected: []byte{0x2, 0x1, 0x2, 0x3, 0x7}, }, } @@ -71,3 +71,199 @@ func TestChunkIntCoder(t *testing.T) { } } } + +func TestChunkLengthToOffsets(t *testing.T) { + + tests := []struct { + lengths []uint64 + expectedOffsets []uint64 + }{ + { + lengths: []uint64{5, 5, 5, 5, 5}, + expectedOffsets: []uint64{5, 10, 15, 20, 25}, + }, + { + lengths: []uint64{0, 5, 0, 5, 0}, + expectedOffsets: []uint64{0, 5, 5, 10, 10}, + }, + { + lengths: []uint64{0, 0, 0, 0, 5}, + expectedOffsets: []uint64{0, 0, 0, 0, 5}, + }, + { + lengths: []uint64{5, 0, 0, 0, 0}, + expectedOffsets: []uint64{5, 5, 5, 5, 5}, + }, + { + lengths: []uint64{0, 5, 0, 0, 0}, + expectedOffsets: []uint64{0, 5, 5, 5, 5}, + }, + { + lengths: []uint64{0, 0, 0, 5, 0}, + expectedOffsets: []uint64{0, 0, 0, 5, 5}, + }, + { + lengths: []uint64{0, 0, 0, 5, 5}, + expectedOffsets: []uint64{0, 0, 0, 5, 10}, + }, + { + lengths: []uint64{5, 5, 5, 0, 0}, + expectedOffsets: []uint64{5, 10, 15, 15, 15}, + }, + { + lengths: []uint64{5}, + expectedOffsets: []uint64{5}, + }, + { + lengths: []uint64{5, 5}, + expectedOffsets: []uint64{5, 10}, + }, + } + + for i, test := range tests { + modifyLengthsToEndOffsets(test.lengths) + if !reflect.DeepEqual(test.expectedOffsets, test.lengths) { + t.Errorf("Test: %d failed, got %+v, expected %+v", i, test.lengths, test.expectedOffsets) + } + } +} + +func TestChunkReadBoundaryFromOffsets(t *testing.T) { + + tests := []struct { + chunkNumber int + offsets []uint64 + expectedStart uint64 + expectedEnd uint64 + }{ + { + offsets: []uint64{5, 10, 15, 20, 25}, + chunkNumber: 4, + expectedStart: 20, + expectedEnd: 25, + }, + { + offsets: []uint64{5, 10, 15, 20, 25}, + chunkNumber: 0, + expectedStart: 0, + expectedEnd: 5, + }, + { + offsets: []uint64{5, 10, 15, 20, 25}, + chunkNumber: 2, + expectedStart: 10, + expectedEnd: 15, + }, + { + offsets: []uint64{0, 5, 5, 10, 10}, + chunkNumber: 4, + expectedStart: 10, + expectedEnd: 10, + }, + { + offsets: []uint64{0, 5, 5, 10, 10}, + chunkNumber: 1, + expectedStart: 0, + expectedEnd: 5, + }, + { + offsets: []uint64{5, 5, 5, 5, 5}, + chunkNumber: 0, + expectedStart: 0, + expectedEnd: 5, + }, + { + offsets: []uint64{5, 5, 5, 5, 5}, + chunkNumber: 4, + expectedStart: 5, + expectedEnd: 5, + }, + { + offsets: []uint64{5, 5, 5, 5, 5}, + chunkNumber: 1, + expectedStart: 5, + expectedEnd: 5, + }, + { + offsets: []uint64{0, 5, 5, 5, 5}, + chunkNumber: 1, + expectedStart: 0, + expectedEnd: 5, + }, + { + offsets: []uint64{0, 5, 5, 5, 5}, + chunkNumber: 0, + expectedStart: 0, + expectedEnd: 0, + }, + { + offsets: []uint64{0, 0, 0, 5, 5}, + chunkNumber: 2, + expectedStart: 0, + expectedEnd: 0, + }, + { + offsets: []uint64{0, 0, 0, 5, 5}, + chunkNumber: 1, + expectedStart: 0, + expectedEnd: 0, + }, + { + offsets: []uint64{0, 0, 0, 0, 5}, + chunkNumber: 4, + expectedStart: 0, + expectedEnd: 5, + }, + { + offsets: []uint64{0, 0, 0, 0, 5}, + chunkNumber: 2, + expectedStart: 0, + expectedEnd: 0, + }, + { + offsets: []uint64{5, 10, 15, 15, 15}, + chunkNumber: 0, + expectedStart: 0, + expectedEnd: 5, + }, + { + offsets: []uint64{5, 10, 15, 15, 15}, + chunkNumber: 1, + expectedStart: 5, + expectedEnd: 10, + }, + { + offsets: []uint64{5, 10, 15, 15, 15}, + chunkNumber: 2, + expectedStart: 10, + expectedEnd: 15, + }, + { + offsets: []uint64{5, 10, 15, 15, 15}, + chunkNumber: 3, + expectedStart: 15, + expectedEnd: 15, + }, + { + offsets: []uint64{5, 10, 15, 15, 15}, + chunkNumber: 4, + expectedStart: 15, + expectedEnd: 15, + }, + { + offsets: []uint64{5}, + chunkNumber: 0, + expectedStart: 0, + expectedEnd: 5, + }, + } + + for i, test := range tests { + s, e := readChunkBoundary(test.chunkNumber, test.offsets) + if test.expectedStart != s || test.expectedEnd != e { + t.Errorf("Test: %d failed for chunkNumber: %d got start: %d end: %d,"+ + " expected start: %d end: %d", i, test.chunkNumber, s, e, + test.expectedStart, test.expectedEnd) + } + } +} diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index 383fedbf..07de0943 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -183,6 +183,9 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, return nil, 0, err } + newRoaring := roaring.NewBitmap() + newRoaringLocs := roaring.NewBitmap() + // for each field for fieldID, fieldName := range fieldsInv { @@ -222,8 +225,8 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, var prevTerm []byte - newRoaring := roaring.NewBitmap() - newRoaringLocs := roaring.NewBitmap() + newRoaring.Clear() + newRoaringLocs.Clear() var lastDocNum, lastFreq, lastNorm uint64 @@ -248,63 +251,22 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, tfEncoder.Close() locEncoder.Close() - termCardinality := newRoaring.GetCardinality() + postingsOffset, err := writePostings( + newRoaring, newRoaringLocs, tfEncoder, locEncoder, + use1HitEncoding, w, bufMaxVarintLen64) + if err != nil { + return err + } - encodeAs1Hit, docNum1Hit, normBits1Hit := use1HitEncoding(termCardinality) - if encodeAs1Hit { - err = newVellum.Insert(term, FSTValEncode1Hit(docNum1Hit, normBits1Hit)) - if err != nil { - return err - } - } else if termCardinality > 0 { - // this field/term has hits in the new segment - freqOffset := uint64(w.Count()) - _, err := tfEncoder.Write(w) - if err != nil { - return err - } - locOffset := uint64(w.Count()) - _, err = locEncoder.Write(w) - if err != nil { - return err - } - postingLocOffset := uint64(w.Count()) - _, err = writeRoaringWithLen(newRoaringLocs, w, bufMaxVarintLen64) - if err != nil { - return err - } - postingOffset := uint64(w.Count()) - // write out the start of the term info - n := binary.PutUvarint(bufMaxVarintLen64, freqOffset) - _, err = w.Write(bufMaxVarintLen64[:n]) - if err != nil { - return err - } - // write out the start of the loc info - n = binary.PutUvarint(bufMaxVarintLen64, locOffset) - _, err = w.Write(bufMaxVarintLen64[:n]) - if err != nil { - return err - } - // write out the start of the posting locs - n = binary.PutUvarint(bufMaxVarintLen64, postingLocOffset) - _, err = w.Write(bufMaxVarintLen64[:n]) - if err != nil { - return err - } - _, err = writeRoaringWithLen(newRoaring, w, bufMaxVarintLen64) - if err != nil { - return err - } - - err = newVellum.Insert(term, postingOffset) + if postingsOffset > 0 { + err = newVellum.Insert(term, postingsOffset) if err != nil { return err } } - newRoaring = roaring.NewBitmap() - newRoaringLocs = roaring.NewBitmap() + newRoaring.Clear() + newRoaringLocs.Clear() tfEncoder.Reset() locEncoder.Reset() @@ -460,6 +422,69 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, return rv, fieldDvLocsOffset, nil } +func writePostings(postings, postingLocs *roaring.Bitmap, + tfEncoder, locEncoder *chunkedIntCoder, + use1HitEncoding func(uint64) (bool, uint64, uint64), + w *CountHashWriter, bufMaxVarintLen64 []byte) ( + offset uint64, err error) { + termCardinality := postings.GetCardinality() + if termCardinality <= 0 { + return 0, nil + } + + if use1HitEncoding != nil { + encodeAs1Hit, docNum1Hit, normBits1Hit := use1HitEncoding(termCardinality) + if encodeAs1Hit { + return FSTValEncode1Hit(docNum1Hit, normBits1Hit), nil + } + } + + tfOffset := uint64(w.Count()) + _, err = tfEncoder.Write(w) + if err != nil { + return 0, err + } + + locOffset := uint64(w.Count()) + _, err = locEncoder.Write(w) + if err != nil { + return 0, err + } + + postingLocsOffset := uint64(w.Count()) + _, err = writeRoaringWithLen(postingLocs, w, bufMaxVarintLen64) + if err != nil { + return 0, err + } + + postingsOffset := uint64(w.Count()) + + n := binary.PutUvarint(bufMaxVarintLen64, tfOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return 0, err + } + + n = binary.PutUvarint(bufMaxVarintLen64, locOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return 0, err + } + + n = binary.PutUvarint(bufMaxVarintLen64, postingLocsOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return 0, err + } + + _, err = writeRoaringWithLen(postings, w, bufMaxVarintLen64) + if err != nil { + return 0, err + } + + return postingsOffset, nil +} + func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64, w *CountHashWriter) (uint64, [][]uint64, error) { diff --git a/index/scorch/segment/zap/merge_test.go b/index/scorch/segment/zap/merge_test.go index d80b2608..d931f6c2 100644 --- a/index/scorch/segment/zap/merge_test.go +++ b/index/scorch/segment/zap/merge_test.go @@ -26,7 +26,6 @@ import ( "github.com/blevesearch/bleve/analysis" "github.com/blevesearch/bleve/document" "github.com/blevesearch/bleve/index" - "github.com/blevesearch/bleve/index/scorch/segment/mem" ) func TestMerge(t *testing.T) { @@ -34,14 +33,14 @@ func TestMerge(t *testing.T) { _ = os.RemoveAll("/tmp/scorch2.zap") _ = os.RemoveAll("/tmp/scorch3.zap") - memSegment := buildMemSegmentMulti() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + testSeg, _ := buildTestSegmentMulti() + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatal(err) } - memSegment2 := buildMemSegmentMulti2() - err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024) + testSeg2, _ := buildTestSegmentMulti2() + err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap") if err != nil { t.Fatal(err) } @@ -121,8 +120,8 @@ func TestMergeWithEmptySegmentsFirst(t *testing.T) { func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int) { _ = os.RemoveAll("/tmp/scorch.zap") - memSegment := buildMemSegmentMulti() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + testSeg, _ := buildTestSegmentMulti() + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatal(err) } @@ -148,8 +147,8 @@ func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int) _ = os.RemoveAll("/tmp/" + fname) - emptySegment := mem.NewFromAnalyzedDocs([]*index.AnalysisResult{}) - err = PersistSegment(emptySegment, "/tmp/"+fname, 1024) + emptySegment, _ := AnalysisResultsToSegmentBase([]*index.AnalysisResult{}, 1024) + err = PersistSegmentBase(emptySegment, "/tmp/"+fname) if err != nil { t.Fatal(err) } @@ -462,8 +461,8 @@ func testMergeAndDrop(t *testing.T, docsToDrop []*roaring.Bitmap) { _ = os.RemoveAll("/tmp/scorch.zap") _ = os.RemoveAll("/tmp/scorch2.zap") - memSegment := buildMemSegmentMulti() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + testSeg, _ := buildTestSegmentMulti() + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatal(err) } @@ -478,8 +477,8 @@ func testMergeAndDrop(t *testing.T, docsToDrop []*roaring.Bitmap) { } }() - memSegment2 := buildMemSegmentMulti2() - err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024) + testSeg2, _ := buildTestSegmentMulti2() + err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap") if err != nil { t.Fatal(err) } @@ -565,8 +564,8 @@ func testMergeWithUpdates(t *testing.T, segmentDocIds [][]string, docsToDrop []* _ = os.RemoveAll("/tmp/" + fname) - memSegment := buildMemSegmentMultiHelper(docIds) - err := PersistSegment(memSegment, "/tmp/"+fname, 1024) + testSeg, _ := buildTestSegmentMultiHelper(docIds) + err := PersistSegmentBase(testSeg, "/tmp/"+fname) if err != nil { t.Fatal(err) } @@ -616,11 +615,11 @@ func testMergeAndDropSegments(t *testing.T, segsToMerge []*Segment, docsToDrop [ testMergeWithSelf(t, segm.(*Segment), expectedNumDocs) } -func buildMemSegmentMulti2() *mem.Segment { - return buildMemSegmentMultiHelper([]string{"c", "d"}) +func buildTestSegmentMulti2() (*SegmentBase, error) { + return buildTestSegmentMultiHelper([]string{"c", "d"}) } -func buildMemSegmentMultiHelper(docIds []string) *mem.Segment { +func buildTestSegmentMultiHelper(docIds []string) (*SegmentBase, error) { doc := &document.Document{ ID: "c", Fields: []document.Field{ @@ -778,9 +777,7 @@ func buildMemSegmentMultiHelper(docIds []string) *mem.Segment { } } - segment := mem.NewFromAnalyzedDocs(results) - - return segment + return AnalysisResultsToSegmentBase(results, 1024) } func TestMergeBytesWritten(t *testing.T) { @@ -788,14 +785,14 @@ func TestMergeBytesWritten(t *testing.T) { _ = os.RemoveAll("/tmp/scorch2.zap") _ = os.RemoveAll("/tmp/scorch3.zap") - memSegment := buildMemSegmentMulti() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + testSeg, _ := buildTestSegmentMulti() + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatal(err) } - memSegment2 := buildMemSegmentMulti2() - err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024) + testSeg2, _ := buildTestSegmentMulti2() + err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap") if err != nil { t.Fatal(err) } diff --git a/index/scorch/segment/zap/new.go b/index/scorch/segment/zap/new.go new file mode 100644 index 00000000..4c9ec9c1 --- /dev/null +++ b/index/scorch/segment/zap/new.go @@ -0,0 +1,770 @@ +// Copyright (c) 2018 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "bytes" + "encoding/binary" + "math" + "sort" + "sync" + + "github.com/RoaringBitmap/roaring" + "github.com/Smerity/govarint" + "github.com/blevesearch/bleve/analysis" + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/index" + "github.com/couchbase/vellum" + "github.com/golang/snappy" +) + +// AnalysisResultsToSegmentBase produces an in-memory zap-encoded +// SegmentBase from analysis results +func AnalysisResultsToSegmentBase(results []*index.AnalysisResult, + chunkFactor uint32) (*SegmentBase, error) { + var br bytes.Buffer + + s := interimPool.Get().(*interim) + + s.results = results + s.chunkFactor = chunkFactor + s.w = NewCountHashWriter(&br) + + storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets, + err := s.convert() + if err != nil { + return nil, err + } + + sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkFactor, + s.FieldsMap, s.FieldsInv, uint64(len(results)), + storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets) + + if err == nil && s.reset() == nil { + interimPool.Put(s) + } + + return sb, err +} + +var interimPool = sync.Pool{New: func() interface{} { return &interim{} }} + +// interim holds temporary working data used while converting from +// analysis results to a zap-encoded segment +type interim struct { + results []*index.AnalysisResult + + chunkFactor uint32 + + w *CountHashWriter + + // FieldsMap adds 1 to field id to avoid zero value issues + // name -> field id + 1 + FieldsMap map[string]uint16 + + // FieldsInv is the inverse of FieldsMap + // field id -> name + FieldsInv []string + + // Term dictionaries for each field + // field id -> term -> postings list id + 1 + Dicts []map[string]uint64 + + // Terms for each field, where terms are sorted ascending + // field id -> []term + DictKeys [][]string + + // Fields whose IncludeDocValues is true + // field id -> bool + IncludeDocValues []bool + + // postings id -> bitmap of docNums + Postings []*roaring.Bitmap + + // postings id -> bitmap of docNums that have locations + PostingsLocs []*roaring.Bitmap + + // postings id -> freq/norm's, one for each docNum in postings + FreqNorms [][]interimFreqNorm + freqNormsBacking []interimFreqNorm + + // postings id -> locs, one for each freq + Locs [][]interimLoc + locsBacking []interimLoc + + numTermsPerPostingsList []int // key is postings list id + numLocsPerPostingsList []int // key is postings list id + + builder *vellum.Builder + builderBuf bytes.Buffer + + metaBuf bytes.Buffer + + tmp0 []byte + tmp1 []byte +} + +func (s *interim) reset() (err error) { + s.results = nil + s.chunkFactor = 0 + s.w = nil + s.FieldsMap = nil + s.FieldsInv = s.FieldsInv[:0] + for i := range s.Dicts { + s.Dicts[i] = nil + } + s.Dicts = s.Dicts[:0] + for i := range s.DictKeys { + s.DictKeys[i] = s.DictKeys[i][:0] + } + s.DictKeys = s.DictKeys[:0] + for i := range s.IncludeDocValues { + s.IncludeDocValues[i] = false + } + s.IncludeDocValues = s.IncludeDocValues[:0] + for _, idn := range s.Postings { + idn.Clear() + } + s.Postings = s.Postings[:0] + for _, idn := range s.PostingsLocs { + idn.Clear() + } + s.PostingsLocs = s.PostingsLocs[:0] + s.FreqNorms = s.FreqNorms[:0] + for i := range s.freqNormsBacking { + s.freqNormsBacking[i] = interimFreqNorm{} + } + s.freqNormsBacking = s.freqNormsBacking[:0] + s.Locs = s.Locs[:0] + for i := range s.locsBacking { + s.locsBacking[i] = interimLoc{} + } + s.locsBacking = s.locsBacking[:0] + s.numTermsPerPostingsList = s.numTermsPerPostingsList[:0] + s.numLocsPerPostingsList = s.numLocsPerPostingsList[:0] + s.builderBuf.Reset() + if s.builder != nil { + err = s.builder.Reset(&s.builderBuf) + } + s.metaBuf.Reset() + s.tmp0 = s.tmp0[:0] + s.tmp1 = s.tmp1[:0] + + return err +} + +func (s *interim) grabBuf(size int) []byte { + buf := s.tmp0 + if cap(buf) < size { + buf = make([]byte, size) + s.tmp0 = buf + } + return buf[0:size] +} + +type interimStoredField struct { + vals [][]byte + typs []byte + arrayposs [][]uint64 // array positions +} + +type interimFreqNorm struct { + freq uint64 + norm float32 +} + +type interimLoc struct { + fieldID uint16 + pos uint64 + start uint64 + end uint64 + arrayposs []uint64 +} + +func (s *interim) convert() (uint64, uint64, uint64, []uint64, error) { + s.FieldsMap = map[string]uint16{} + + s.getOrDefineField("_id") // _id field is fieldID 0 + + for _, result := range s.results { + for _, field := range result.Document.CompositeFields { + s.getOrDefineField(field.Name()) + } + for _, field := range result.Document.Fields { + s.getOrDefineField(field.Name()) + } + } + + sort.Strings(s.FieldsInv[1:]) // keep _id as first field + + for fieldID, fieldName := range s.FieldsInv { + s.FieldsMap[fieldName] = uint16(fieldID + 1) + } + + if cap(s.IncludeDocValues) >= len(s.FieldsInv) { + s.IncludeDocValues = s.IncludeDocValues[:len(s.FieldsInv)] + } else { + s.IncludeDocValues = make([]bool, len(s.FieldsInv)) + } + + s.prepareDicts() + + for _, dict := range s.DictKeys { + sort.Strings(dict) + } + + s.processDocuments() + + storedIndexOffset, err := s.writeStoredFields() + if err != nil { + return 0, 0, 0, nil, err + } + + var fdvIndexOffset uint64 + var dictOffsets []uint64 + + if len(s.results) > 0 { + fdvIndexOffset, dictOffsets, err = s.writeDicts() + if err != nil { + return 0, 0, 0, nil, err + } + } else { + dictOffsets = make([]uint64, len(s.FieldsInv)) + } + + fieldsIndexOffset, err := persistFields(s.FieldsInv, s.w, dictOffsets) + if err != nil { + return 0, 0, 0, nil, err + } + + return storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets, nil +} + +func (s *interim) getOrDefineField(fieldName string) int { + fieldIDPlus1, exists := s.FieldsMap[fieldName] + if !exists { + fieldIDPlus1 = uint16(len(s.FieldsInv) + 1) + s.FieldsMap[fieldName] = fieldIDPlus1 + s.FieldsInv = append(s.FieldsInv, fieldName) + + s.Dicts = append(s.Dicts, make(map[string]uint64)) + + n := len(s.DictKeys) + if n < cap(s.DictKeys) { + s.DictKeys = s.DictKeys[:n+1] + s.DictKeys[n] = s.DictKeys[n][:0] + } else { + s.DictKeys = append(s.DictKeys, []string(nil)) + } + } + + return int(fieldIDPlus1 - 1) +} + +// fill Dicts and DictKeys from analysis results +func (s *interim) prepareDicts() { + var pidNext int + + var totTFs int + var totLocs int + + visitField := func(fieldID uint16, tfs analysis.TokenFrequencies) { + dict := s.Dicts[fieldID] + dictKeys := s.DictKeys[fieldID] + + for term, tf := range tfs { + pidPlus1, exists := dict[term] + if !exists { + pidNext++ + pidPlus1 = uint64(pidNext) + + dict[term] = pidPlus1 + dictKeys = append(dictKeys, term) + + s.numTermsPerPostingsList = append(s.numTermsPerPostingsList, 0) + s.numLocsPerPostingsList = append(s.numLocsPerPostingsList, 0) + } + + pid := pidPlus1 - 1 + + s.numTermsPerPostingsList[pid] += 1 + s.numLocsPerPostingsList[pid] += len(tf.Locations) + + totLocs += len(tf.Locations) + } + + totTFs += len(tfs) + + s.DictKeys[fieldID] = dictKeys + } + + for _, result := range s.results { + // walk each composite field + for _, field := range result.Document.CompositeFields { + fieldID := uint16(s.getOrDefineField(field.Name())) + _, tf := field.Analyze() + visitField(fieldID, tf) + } + + // walk each field + for i, field := range result.Document.Fields { + fieldID := uint16(s.getOrDefineField(field.Name())) + tf := result.Analyzed[i] + visitField(fieldID, tf) + } + } + + numPostingsLists := pidNext + + if cap(s.Postings) >= numPostingsLists { + s.Postings = s.Postings[:numPostingsLists] + } else { + postings := make([]*roaring.Bitmap, numPostingsLists) + copy(postings, s.Postings[:cap(s.Postings)]) + for i := 0; i < numPostingsLists; i++ { + if postings[i] == nil { + postings[i] = roaring.New() + } + } + s.Postings = postings + } + + if cap(s.PostingsLocs) >= numPostingsLists { + s.PostingsLocs = s.PostingsLocs[:numPostingsLists] + } else { + postingsLocs := make([]*roaring.Bitmap, numPostingsLists) + copy(postingsLocs, s.PostingsLocs[:cap(s.PostingsLocs)]) + for i := 0; i < numPostingsLists; i++ { + if postingsLocs[i] == nil { + postingsLocs[i] = roaring.New() + } + } + s.PostingsLocs = postingsLocs + } + + if cap(s.FreqNorms) >= numPostingsLists { + s.FreqNorms = s.FreqNorms[:numPostingsLists] + } else { + s.FreqNorms = make([][]interimFreqNorm, numPostingsLists) + } + + if cap(s.freqNormsBacking) >= totTFs { + s.freqNormsBacking = s.freqNormsBacking[:totTFs] + } else { + s.freqNormsBacking = make([]interimFreqNorm, totTFs) + } + + freqNormsBacking := s.freqNormsBacking + for pid, numTerms := range s.numTermsPerPostingsList { + s.FreqNorms[pid] = freqNormsBacking[0:0] + freqNormsBacking = freqNormsBacking[numTerms:] + } + + if cap(s.Locs) >= numPostingsLists { + s.Locs = s.Locs[:numPostingsLists] + } else { + s.Locs = make([][]interimLoc, numPostingsLists) + } + + if cap(s.locsBacking) >= totLocs { + s.locsBacking = s.locsBacking[:totLocs] + } else { + s.locsBacking = make([]interimLoc, totLocs) + } + + locsBacking := s.locsBacking + for pid, numLocs := range s.numLocsPerPostingsList { + s.Locs[pid] = locsBacking[0:0] + locsBacking = locsBacking[numLocs:] + } +} + +func (s *interim) processDocuments() { + numFields := len(s.FieldsInv) + reuseFieldLens := make([]int, numFields) + reuseFieldTFs := make([]analysis.TokenFrequencies, numFields) + + for docNum, result := range s.results { + for i := 0; i < numFields; i++ { // clear these for reuse + reuseFieldLens[i] = 0 + reuseFieldTFs[i] = nil + } + + s.processDocument(uint64(docNum), result, + reuseFieldLens, reuseFieldTFs) + } +} + +func (s *interim) processDocument(docNum uint64, + result *index.AnalysisResult, + fieldLens []int, fieldTFs []analysis.TokenFrequencies) { + visitField := func(fieldID uint16, fieldName string, + ln int, tf analysis.TokenFrequencies) { + fieldLens[fieldID] += ln + + existingFreqs := fieldTFs[fieldID] + if existingFreqs != nil { + existingFreqs.MergeAll(fieldName, tf) + } else { + fieldTFs[fieldID] = tf + } + } + + // walk each composite field + for _, field := range result.Document.CompositeFields { + fieldID := uint16(s.getOrDefineField(field.Name())) + ln, tf := field.Analyze() + visitField(fieldID, field.Name(), ln, tf) + } + + // walk each field + for i, field := range result.Document.Fields { + fieldID := uint16(s.getOrDefineField(field.Name())) + ln := result.Length[i] + tf := result.Analyzed[i] + visitField(fieldID, field.Name(), ln, tf) + } + + // now that it's been rolled up into fieldTFs, walk that + for fieldID, tfs := range fieldTFs { + dict := s.Dicts[fieldID] + norm := float32(1.0 / math.Sqrt(float64(fieldLens[fieldID]))) + + for term, tf := range tfs { + pid := dict[term] - 1 + bs := s.Postings[pid] + bs.Add(uint32(docNum)) + + s.FreqNorms[pid] = append(s.FreqNorms[pid], + interimFreqNorm{ + freq: uint64(tf.Frequency()), + norm: norm, + }) + + if len(tf.Locations) > 0 { + locBS := s.PostingsLocs[pid] + locBS.Add(uint32(docNum)) + + locs := s.Locs[pid] + + for _, loc := range tf.Locations { + var locf = uint16(fieldID) + if loc.Field != "" { + locf = uint16(s.getOrDefineField(loc.Field)) + } + var arrayposs []uint64 + if len(loc.ArrayPositions) > 0 { + arrayposs = loc.ArrayPositions + } + locs = append(locs, interimLoc{ + fieldID: locf, + pos: uint64(loc.Position), + start: uint64(loc.Start), + end: uint64(loc.End), + arrayposs: arrayposs, + }) + } + + s.Locs[pid] = locs + } + } + } +} + +func (s *interim) writeStoredFields() ( + storedIndexOffset uint64, err error) { + metaEncoder := govarint.NewU64Base128Encoder(&s.metaBuf) + + data, compressed := s.tmp0[:0], s.tmp1[:0] + defer func() { s.tmp0, s.tmp1 = data, compressed }() + + // keyed by docNum + docStoredOffsets := make([]uint64, len(s.results)) + + // keyed by fieldID, for the current doc in the loop + docStoredFields := map[uint16]interimStoredField{} + + for docNum, result := range s.results { + for fieldID := range docStoredFields { // reset for next doc + delete(docStoredFields, fieldID) + } + + for _, field := range result.Document.Fields { + fieldID := uint16(s.getOrDefineField(field.Name())) + + opts := field.Options() + + if opts.IsStored() { + isf := docStoredFields[fieldID] + isf.vals = append(isf.vals, field.Value()) + isf.typs = append(isf.typs, encodeFieldType(field)) + isf.arrayposs = append(isf.arrayposs, field.ArrayPositions()) + docStoredFields[fieldID] = isf + } + + if opts.IncludeDocValues() { + s.IncludeDocValues[fieldID] = true + } + } + + var curr int + + s.metaBuf.Reset() + data = data[:0] + compressed = compressed[:0] + + for fieldID := range s.FieldsInv { + isf, exists := docStoredFields[uint16(fieldID)] + if exists { + curr, data, err = persistStoredFieldValues( + fieldID, isf.vals, isf.typs, isf.arrayposs, + curr, metaEncoder, data) + if err != nil { + return 0, err + } + } + } + + metaEncoder.Close() + metaBytes := s.metaBuf.Bytes() + + compressed = snappy.Encode(compressed, data) + + docStoredOffsets[docNum] = uint64(s.w.Count()) + + _, err := writeUvarints(s.w, + uint64(len(metaBytes)), + uint64(len(compressed))) + if err != nil { + return 0, err + } + + _, err = s.w.Write(metaBytes) + if err != nil { + return 0, err + } + + _, err = s.w.Write(compressed) + if err != nil { + return 0, err + } + } + + storedIndexOffset = uint64(s.w.Count()) + + for _, docStoredOffset := range docStoredOffsets { + err = binary.Write(s.w, binary.BigEndian, docStoredOffset) + if err != nil { + return 0, err + } + } + + return storedIndexOffset, nil +} + +func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err error) { + dictOffsets = make([]uint64, len(s.FieldsInv)) + + fdvOffsets := make([]uint64, len(s.FieldsInv)) + + buf := s.grabBuf(binary.MaxVarintLen64) + + tfEncoder := newChunkedIntCoder(uint64(s.chunkFactor), uint64(len(s.results)-1)) + locEncoder := newChunkedIntCoder(uint64(s.chunkFactor), uint64(len(s.results)-1)) + fdvEncoder := newChunkedContentCoder(uint64(s.chunkFactor), uint64(len(s.results)-1)) + + var docTermMap [][]byte + + if s.builder == nil { + s.builder, err = vellum.New(&s.builderBuf, nil) + if err != nil { + return 0, nil, err + } + } + + for fieldID, terms := range s.DictKeys { + if cap(docTermMap) < len(s.results) { + docTermMap = make([][]byte, len(s.results)) + } else { + docTermMap = docTermMap[0:len(s.results)] + for docNum := range docTermMap { // reset the docTermMap + docTermMap[docNum] = docTermMap[docNum][:0] + } + } + + dict := s.Dicts[fieldID] + + for _, term := range terms { // terms are already sorted + pid := dict[term] - 1 + + postingsBS := s.Postings[pid] + postingsLocsBS := s.PostingsLocs[pid] + + freqNorms := s.FreqNorms[pid] + freqNormOffset := 0 + + locs := s.Locs[pid] + locOffset := 0 + + postingsItr := postingsBS.Iterator() + for postingsItr.HasNext() { + docNum := uint64(postingsItr.Next()) + + freqNorm := freqNorms[freqNormOffset] + + err = tfEncoder.Add(docNum, freqNorm.freq, + uint64(math.Float32bits(freqNorm.norm))) + if err != nil { + return 0, nil, err + } + + for i := uint64(0); i < freqNorm.freq; i++ { + if len(locs) > 0 { + loc := locs[locOffset] + + err = locEncoder.Add(docNum, uint64(loc.fieldID), + loc.pos, loc.start, loc.end, + uint64(len(loc.arrayposs))) + if err != nil { + return 0, nil, err + } + + err = locEncoder.Add(docNum, loc.arrayposs...) + if err != nil { + return 0, nil, err + } + } + + locOffset++ + } + + freqNormOffset++ + + docTermMap[docNum] = append( + append(docTermMap[docNum], term...), + termSeparator) + } + + tfEncoder.Close() + locEncoder.Close() + + postingsOffset, err := writePostings( + postingsBS, postingsLocsBS, tfEncoder, locEncoder, + nil, s.w, buf) + if err != nil { + return 0, nil, err + } + + if postingsOffset > uint64(0) { + err = s.builder.Insert([]byte(term), postingsOffset) + if err != nil { + return 0, nil, err + } + } + + tfEncoder.Reset() + locEncoder.Reset() + } + + err = s.builder.Close() + if err != nil { + return 0, nil, err + } + + // record where this dictionary starts + dictOffsets[fieldID] = uint64(s.w.Count()) + + vellumData := s.builderBuf.Bytes() + + // write out the length of the vellum data + n := binary.PutUvarint(buf, uint64(len(vellumData))) + _, err = s.w.Write(buf[:n]) + if err != nil { + return 0, nil, err + } + + // write this vellum to disk + _, err = s.w.Write(vellumData) + if err != nil { + return 0, nil, err + } + + // reset vellum for reuse + s.builderBuf.Reset() + + err = s.builder.Reset(&s.builderBuf) + if err != nil { + return 0, nil, err + } + + // write the field doc values + if s.IncludeDocValues[fieldID] { + for docNum, docTerms := range docTermMap { + if len(docTerms) > 0 { + err = fdvEncoder.Add(uint64(docNum), docTerms) + if err != nil { + return 0, nil, err + } + } + } + err = fdvEncoder.Close() + if err != nil { + return 0, nil, err + } + + fdvOffsets[fieldID] = uint64(s.w.Count()) + + _, err = fdvEncoder.Write(s.w) + if err != nil { + return 0, nil, err + } + + fdvEncoder.Reset() + } else { + fdvOffsets[fieldID] = fieldNotUninverted + } + } + + fdvIndexOffset = uint64(s.w.Count()) + + for _, fdvOffset := range fdvOffsets { + n := binary.PutUvarint(buf, fdvOffset) + _, err := s.w.Write(buf[:n]) + if err != nil { + return 0, nil, err + } + } + + return fdvIndexOffset, dictOffsets, nil +} + +func encodeFieldType(f document.Field) byte { + fieldType := byte('x') + switch f.(type) { + case *document.TextField: + fieldType = 't' + case *document.NumericField: + fieldType = 'n' + case *document.DateTimeField: + fieldType = 'd' + case *document.BooleanField: + fieldType = 'b' + case *document.GeoPointField: + fieldType = 'g' + case *document.CompositeField: + fieldType = 'c' + } + return fieldType +} diff --git a/index/scorch/segment/zap/posting.go b/index/scorch/segment/zap/posting.go index 7ae36120..6bc6e926 100644 --- a/index/scorch/segment/zap/posting.go +++ b/index/scorch/segment/zap/posting.go @@ -92,6 +92,8 @@ func under32Bits(x uint64) bool { return x <= mask31Bits } +const docNum1HitFinished = math.MaxUint64 + // PostingsList is an in-memory represenation of a postings list type PostingsList struct { sb *SegmentBase @@ -102,8 +104,9 @@ type PostingsList struct { postings *roaring.Bitmap except *roaring.Bitmap - // when postingsOffset == freqOffset == 0, then the postings list - // represents a "1-hit" encoding, and has the following norm + // when normBits1Hit != 0, then this postings list came from a + // 1-hit encoding, and only the docNum1Hit & normBits1Hit apply + docNum1Hit uint64 normBits1Hit uint64 } @@ -117,6 +120,17 @@ func (p *PostingsList) Size() int { return sizeInBytes } +func (p *PostingsList) OrInto(receiver *roaring.Bitmap) { + if p.normBits1Hit != 0 { + receiver.Add(uint32(p.docNum1Hit)) + return + } + + if p.postings != nil { + receiver.Or(p.postings) + } +} + // Iterator returns an iterator for this postings list func (p *PostingsList) Iterator() segment.PostingsIterator { return p.iterator(nil) @@ -152,39 +166,47 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator { } rv.postings = p + if p.normBits1Hit != 0 { + // "1-hit" encoding + rv.docNum1Hit = p.docNum1Hit + rv.normBits1Hit = p.normBits1Hit + + if p.except != nil && p.except.Contains(uint32(rv.docNum1Hit)) { + rv.docNum1Hit = docNum1HitFinished + } + + return rv + } + + // "general" encoding, check if empty if p.postings == nil { return rv } - if p.freqOffset > 0 && p.locOffset > 0 { - // "general" encoding, so prepare the freq chunk details - var n uint64 - var read int - var numFreqChunks uint64 - numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) + // prepare the freq chunk details + var n uint64 + var read int + var numFreqChunks uint64 + numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) + n += uint64(read) + rv.freqChunkOffsets = make([]uint64, int(numFreqChunks)) + for i := 0; i < int(numFreqChunks); i++ { + rv.freqChunkOffsets[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) n += uint64(read) - rv.freqChunkLens = make([]uint64, int(numFreqChunks)) - for i := 0; i < int(numFreqChunks); i++ { - rv.freqChunkLens[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) - n += uint64(read) - } - rv.freqChunkStart = p.freqOffset + n - - // prepare the loc chunk details - n = 0 - var numLocChunks uint64 - numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) - n += uint64(read) - rv.locChunkLens = make([]uint64, int(numLocChunks)) - for i := 0; i < int(numLocChunks); i++ { - rv.locChunkLens[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) - n += uint64(read) - } - rv.locChunkStart = p.locOffset + n - } else { - // "1-hit" encoding - rv.normBits1Hit = p.normBits1Hit } + rv.freqChunkStart = p.freqOffset + n + + // prepare the loc chunk details + n = 0 + var numLocChunks uint64 + numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) + n += uint64(read) + rv.locChunkOffsets = make([]uint64, int(numLocChunks)) + for i := 0; i < int(numLocChunks); i++ { + rv.locChunkOffsets[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) + n += uint64(read) + } + rv.locChunkStart = p.locOffset + n rv.locBitmap = p.locBitmap @@ -201,18 +223,20 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator { // Count returns the number of items on this postings list func (p *PostingsList) Count() uint64 { - if p.postings != nil { - n := p.postings.GetCardinality() - if p.except != nil { - e := p.except.GetCardinality() - if e > n { - e = n - } - return n - e - } - return n + var n uint64 + if p.normBits1Hit != 0 { + n = 1 + } else if p.postings != nil { + n = p.postings.GetCardinality() } - return 0 + var e uint64 + if p.except != nil { + e = p.except.GetCardinality() + } + if n <= e { + return 0 + } + return n - e } func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error { @@ -242,7 +266,9 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error { locRoaringBytes := d.sb.mem[locBitmapOffset+uint64(read) : locBitmapOffset+uint64(read)+locBitmapLen] - rv.locBitmap = roaring.NewBitmap() + if rv.locBitmap == nil { + rv.locBitmap = roaring.NewBitmap() + } _, err := rv.locBitmap.FromBuffer(locRoaringBytes) if err != nil { return fmt.Errorf("error loading roaring bitmap of locations with hits: %v", err) @@ -254,7 +280,9 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error { roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen] - rv.postings = roaring.NewBitmap() + if rv.postings == nil { + rv.postings = roaring.NewBitmap() + } _, err = rv.postings.FromBuffer(roaringBytes) if err != nil { return fmt.Errorf("error loading roaring bitmap: %v", err) @@ -263,19 +291,10 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error { return nil } -var emptyRoaring = roaring.NewBitmap() - func (rv *PostingsList) init1Hit(fstVal uint64) error { docNum, normBits := FSTValDecode1Hit(fstVal) - rv.locBitmap = emptyRoaring - - rv.postings = roaring.NewBitmap() - rv.postings.Add(uint32(docNum)) - - // TODO: we can likely do better than allocating a roaring bitmap - // with just 1 entry, but for now reuse existing machinery - + rv.docNum1Hit = docNum rv.normBits1Hit = normBits return nil @@ -297,17 +316,18 @@ type PostingsIterator struct { locDecoder *govarint.Base128Decoder locReader *bytes.Reader - freqChunkLens []uint64 - freqChunkStart uint64 + freqChunkOffsets []uint64 + freqChunkStart uint64 - locChunkLens []uint64 - locChunkStart uint64 + locChunkOffsets []uint64 + locChunkStart uint64 locBitmap *roaring.Bitmap next Posting // reused across Next() calls nextLocs []Location // reused across Next() calls + docNum1Hit uint64 normBits1Hit uint64 buf []byte @@ -317,8 +337,8 @@ func (i *PostingsIterator) Size() int { sizeInBytes := reflectStaticSizePostingsIterator + size.SizeOfPtr + len(i.currChunkFreqNorm) + len(i.currChunkLoc) + - len(i.freqChunkLens)*size.SizeOfUint64 + - len(i.locChunkLens)*size.SizeOfUint64 + + len(i.freqChunkOffsets)*size.SizeOfUint64 + + len(i.locChunkOffsets)*size.SizeOfUint64 + i.next.Size() if i.locBitmap != nil { @@ -333,16 +353,14 @@ func (i *PostingsIterator) Size() int { } func (i *PostingsIterator) loadChunk(chunk int) error { - if chunk >= len(i.freqChunkLens) || chunk >= len(i.locChunkLens) { - return fmt.Errorf("tried to load chunk that doesn't exist %d/(%d %d)", chunk, len(i.freqChunkLens), len(i.locChunkLens)) + if chunk >= len(i.freqChunkOffsets) || chunk >= len(i.locChunkOffsets) { + return fmt.Errorf("tried to load chunk that doesn't exist %d/(%d %d)", chunk, len(i.freqChunkOffsets), len(i.locChunkOffsets)) } - // load freq chunk bytes - start := i.freqChunkStart - for j := 0; j < chunk; j++ { - start += i.freqChunkLens[j] - } - end := start + i.freqChunkLens[chunk] + end, start := i.freqChunkStart, i.freqChunkStart + s, e := readChunkBoundary(chunk, i.freqChunkOffsets) + start += s + end += e i.currChunkFreqNorm = i.postings.sb.mem[start:end] if i.freqNormReader == nil { i.freqNormReader = bytes.NewReader(i.currChunkFreqNorm) @@ -351,12 +369,10 @@ func (i *PostingsIterator) loadChunk(chunk int) error { i.freqNormReader.Reset(i.currChunkFreqNorm) } - // load loc chunk bytes - start = i.locChunkStart - for j := 0; j < chunk; j++ { - start += i.locChunkLens[j] - } - end = start + i.locChunkLens[chunk] + end, start = i.locChunkStart, i.locChunkStart + s, e = readChunkBoundary(chunk, i.locChunkOffsets) + start += s + end += e i.currChunkLoc = i.postings.sb.mem[start:end] if i.locReader == nil { i.locReader = bytes.NewReader(i.currChunkLoc) @@ -424,6 +440,8 @@ func (i *PostingsIterator) readLocation(l *Location) error { l.end = end if numArrayPos > 0 { l.ap = make([]uint64, int(numArrayPos)) + } else { + l.ap = l.ap[:0] } } @@ -460,7 +478,7 @@ func (i *PostingsIterator) Next() (segment.Posting, error) { } rv.norm = math.Float32frombits(uint32(normBits)) - if i.locBitmap.Contains(uint32(docNum)) { + if i.locBitmap != nil && i.locBitmap.Contains(uint32(docNum)) { // read off 'freq' locations, into reused slices if cap(i.nextLocs) >= int(rv.freq) { i.nextLocs = i.nextLocs[0:rv.freq] @@ -513,7 +531,7 @@ func (i *PostingsIterator) nextBytes() ( endFreqNorm := len(i.currChunkFreqNorm) - i.freqNormReader.Len() bytesFreqNorm = i.currChunkFreqNorm[startFreqNorm:endFreqNorm] - if i.locBitmap.Contains(uint32(docNum)) { + if i.locBitmap != nil && i.locBitmap.Contains(uint32(docNum)) { startLoc := len(i.currChunkLoc) - i.locReader.Len() for j := uint64(0); j < freq; j++ { @@ -533,6 +551,15 @@ func (i *PostingsIterator) nextBytes() ( // nextDocNum returns the next docNum on the postings list, and also // sets up the currChunk / loc related fields of the iterator. func (i *PostingsIterator) nextDocNum() (uint64, bool, error) { + if i.normBits1Hit != 0 { + if i.docNum1Hit == docNum1HitFinished { + return 0, false, nil + } + docNum := i.docNum1Hit + i.docNum1Hit = docNum1HitFinished // consume our 1-hit docNum + return docNum, true, nil + } + if i.actual == nil || !i.actual.HasNext() { return 0, false, nil } @@ -540,10 +567,6 @@ func (i *PostingsIterator) nextDocNum() (uint64, bool, error) { n := i.actual.Next() allN := i.all.Next() - if i.normBits1Hit != 0 { - return uint64(n), true, nil - } - nChunk := n / i.postings.sb.chunkFactor allNChunk := allN / i.postings.sb.chunkFactor diff --git a/index/scorch/segment/zap/segment.go b/index/scorch/segment/zap/segment.go index 972b7578..e1d2a14f 100644 --- a/index/scorch/segment/zap/segment.go +++ b/index/scorch/segment/zap/segment.go @@ -341,15 +341,13 @@ func (s *SegmentBase) DocNumbers(ids []string) (*roaring.Bitmap, error) { return nil, err } - var postings *PostingsList + var postingsList *PostingsList for _, id := range ids { - postings, err = idDict.postingsList([]byte(id), nil, postings) + postingsList, err = idDict.postingsList([]byte(id), nil, postingsList) if err != nil { return nil, err } - if postings.postings != nil { - rv.Or(postings.postings) - } + postingsList.OrInto(rv) } } diff --git a/index/scorch/segment/zap/segment_test.go b/index/scorch/segment/zap/segment_test.go index 9ce354ce..50d5dbd7 100644 --- a/index/scorch/segment/zap/segment_test.go +++ b/index/scorch/segment/zap/segment_test.go @@ -28,8 +28,8 @@ import ( func TestOpen(t *testing.T) { _ = os.RemoveAll("/tmp/scorch.zap") - memSegment := buildMemSegment() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + testSeg, _ := buildTestSegment() + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatalf("error persisting segment: %v", err) } @@ -328,8 +328,8 @@ func TestOpen(t *testing.T) { func TestOpenMulti(t *testing.T) { _ = os.RemoveAll("/tmp/scorch.zap") - memSegment := buildMemSegmentMulti() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + testSeg, _ := buildTestSegmentMulti() + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatalf("error persisting segment: %v", err) } @@ -428,8 +428,8 @@ func TestOpenMulti(t *testing.T) { func TestOpenMultiWithTwoChunks(t *testing.T) { _ = os.RemoveAll("/tmp/scorch.zap") - memSegment := buildMemSegmentMulti() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1) + testSeg, _ := buildTestSegmentMultiWithChunkFactor(1) + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatalf("error persisting segment: %v", err) } @@ -523,8 +523,8 @@ func TestOpenMultiWithTwoChunks(t *testing.T) { func TestSegmentVisitableDocValueFieldsList(t *testing.T) { _ = os.RemoveAll("/tmp/scorch.zap") - memSegment := buildMemSegmentMulti() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1) + testSeg, _ := buildTestSegmentMultiWithChunkFactor(1) + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatalf("error persisting segment: %v", err) } @@ -551,8 +551,8 @@ func TestSegmentVisitableDocValueFieldsList(t *testing.T) { } _ = os.RemoveAll("/tmp/scorch.zap") - memSegment, expectedFields := buildMemSegmentWithDefaultFieldMapping() - err = PersistSegment(memSegment, "/tmp/scorch.zap", 1) + testSeg, expectedFields, _ := buildTestSegmentWithDefaultFieldMapping(1) + err = PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatalf("error persisting segment: %v", err) } diff --git a/index_impl.go b/index_impl.go index 68777f07..4d03b78a 100644 --- a/index_impl.go +++ b/index_impl.go @@ -534,7 +534,8 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr doc, err := indexReader.Document(hit.ID) if err == nil && doc != nil { if len(req.Fields) > 0 { - for _, f := range req.Fields { + fieldsToLoad := deDuplicate(req.Fields) + for _, f := range fieldsToLoad { for _, docF := range doc.Fields { if f == "*" || docF.Name() == f { var value interface{} @@ -830,3 +831,16 @@ func (f *indexImplFieldDict) Close() error { } return f.indexReader.Close() } + +// helper function to remove duplicate entries from slice of strings +func deDuplicate(fields []string) []string { + entries := make(map[string]struct{}) + ret := []string{} + for _, entry := range fields { + if _, exists := entries[entry]; !exists { + entries[entry] = struct{}{} + ret = append(ret, entry) + } + } + return ret +}