diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 41abe065..fb4e80d2 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -146,6 +146,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error { // give it to the introducer select { case <-s.closeCh: + _ = segment.Close() return nil case s.merges <- sm: } diff --git a/index/scorch/persister.go b/index/scorch/persister.go index b19c1205..4ba0ca8e 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -197,17 +197,9 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { if err != nil { return err } - // defer fsync of the rootbolt + // defer rollback on error defer func() { - if err == nil { - err = s.rootBolt.Sync() - } - }() - // defer commit/rollback transaction - defer func() { - if err == nil { - err = tx.Commit() - } else { + if err != nil { _ = tx.Rollback() } }() @@ -241,18 +233,18 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { // first ensure that each segment in this snapshot has been persisted for _, segmentSnapshot := range snapshot.segment { snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id) - snapshotSegmentBucket, err2 := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey) - if err2 != nil { - return err2 + snapshotSegmentBucket, err := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey) + if err != nil { + return err } switch seg := segmentSnapshot.segment.(type) { case *zap.SegmentBase: // need to persist this to disk filename := zapFileName(segmentSnapshot.id) path := s.path + string(os.PathSeparator) + filename - err2 := zap.PersistSegmentBase(seg, path) - if err2 != nil { - return fmt.Errorf("error persisting segment: %v", err2) + err = zap.PersistSegmentBase(seg, path) + if err != nil { + return fmt.Errorf("error persisting segment: %v", err) } newSegmentPaths[segmentSnapshot.id] = path err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename)) @@ -295,14 +287,18 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { if len(newSegmentPaths) > 0 { // now try to open all the new snapshots newSegments := make(map[uint64]segment.Segment) + defer func() { + for _, s := range newSegments { + if s != nil { + // cleanup segments that were opened but not + // swapped into the new root + _ = s.Close() + } + } + }() for segmentID, path := range newSegmentPaths { newSegments[segmentID], err = zap.Open(path) if err != nil { - for _, s := range newSegments { - if s != nil { - _ = s.Close() // cleanup segments that were successfully opened - } - } return fmt.Errorf("error opening new segment at %s, %v", path, err) } } @@ -327,6 +323,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { cachedDocs: segmentSnapshot.cachedDocs, } newIndexSnapshot.segment[i] = newSegmentSnapshot + delete(newSegments, segmentSnapshot.id) // update items persisted incase of a new segment snapshot atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count()) } else { @@ -346,7 +343,19 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { _ = rootPrev.DecRef() } } - // allow files to become eligible for removal + + err = tx.Commit() + if err != nil { + return err + } + + err = s.rootBolt.Sync() + if err != nil { + return err + } + + // allow files to become eligible for removal after commit, such + // as file segments from snapshots that came from the merger s.rootLock.Lock() for _, filename := range filenames { delete(s.ineligibleForRemoval, filename) diff --git a/index/scorch/segment/zap/dict.go b/index/scorch/segment/zap/dict.go index 137c35d7..e5d71268 100644 --- a/index/scorch/segment/zap/dict.go +++ b/index/scorch/segment/zap/dict.go @@ -38,29 +38,41 @@ func (d *Dictionary) PostingsList(term string, except *roaring.Bitmap) (segment. } func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap, rv *PostingsList) (*PostingsList, error) { + if d.fst == nil { + return d.postingsListInit(rv, except), nil + } + + postingsOffset, exists, err := d.fst.Get(term) + if err != nil { + return nil, fmt.Errorf("vellum err: %v", err) + } + if !exists { + return d.postingsListInit(rv, except), nil + } + + return d.postingsListFromOffset(postingsOffset, except, rv) +} + +func (d *Dictionary) postingsListFromOffset(postingsOffset uint64, except *roaring.Bitmap, rv *PostingsList) (*PostingsList, error) { + rv = d.postingsListInit(rv, except) + + err := rv.read(postingsOffset, d) + if err != nil { + return nil, err + } + + return rv, nil +} + +func (d *Dictionary) postingsListInit(rv *PostingsList, except *roaring.Bitmap) *PostingsList { if rv == nil { rv = &PostingsList{} } else { *rv = PostingsList{} // clear the struct } rv.sb = d.sb - rv.term = term rv.except = except - - if d.fst != nil { - postingsOffset, exists, err := d.fst.Get(term) - if err != nil { - return nil, fmt.Errorf("vellum err: %v", err) - } - if exists { - err = rv.read(postingsOffset, d) - if err != nil { - return nil, err - } - } - } - - return rv, nil + return rv } // Iterator returns an iterator for this dictionary diff --git a/index/scorch/segment/zap/enumerator.go b/index/scorch/segment/zap/enumerator.go new file mode 100644 index 00000000..3c708dd5 --- /dev/null +++ b/index/scorch/segment/zap/enumerator.go @@ -0,0 +1,124 @@ +// Copyright (c) 2018 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "bytes" + + "github.com/couchbase/vellum" +) + +// enumerator provides an ordered traversal of multiple vellum +// iterators. Like JOIN of iterators, the enumerator produces a +// sequence of (key, iteratorIndex, value) tuples, sorted by key ASC, +// then iteratorIndex ASC, where the same key might be seen or +// repeated across multiple child iterators. +type enumerator struct { + itrs []vellum.Iterator + currKs [][]byte + currVs []uint64 + + lowK []byte + lowIdxs []int + lowCurr int +} + +// newEnumerator returns a new enumerator over the vellum Iterators +func newEnumerator(itrs []vellum.Iterator) (*enumerator, error) { + rv := &enumerator{ + itrs: itrs, + currKs: make([][]byte, len(itrs)), + currVs: make([]uint64, len(itrs)), + lowIdxs: make([]int, 0, len(itrs)), + } + for i, itr := range rv.itrs { + rv.currKs[i], rv.currVs[i] = itr.Current() + } + rv.updateMatches() + if rv.lowK == nil { + return rv, vellum.ErrIteratorDone + } + return rv, nil +} + +// updateMatches maintains the low key matches based on the currKs +func (m *enumerator) updateMatches() { + m.lowK = nil + m.lowIdxs = m.lowIdxs[:0] + m.lowCurr = 0 + + for i, key := range m.currKs { + if key == nil { + continue + } + + cmp := bytes.Compare(key, m.lowK) + if cmp < 0 || m.lowK == nil { + // reached a new low + m.lowK = key + m.lowIdxs = m.lowIdxs[:0] + m.lowIdxs = append(m.lowIdxs, i) + } else if cmp == 0 { + m.lowIdxs = append(m.lowIdxs, i) + } + } +} + +// Current returns the enumerator's current key, iterator-index, and +// value. If the enumerator is not pointing at a valid value (because +// Next returned an error previously), Current will return nil,0,0. +func (m *enumerator) Current() ([]byte, int, uint64) { + var i int + var v uint64 + if m.lowCurr < len(m.lowIdxs) { + i = m.lowIdxs[m.lowCurr] + v = m.currVs[i] + } + return m.lowK, i, v +} + +// Next advances the enumerator to the next key/iterator/value result, +// else vellum.ErrIteratorDone is returned. +func (m *enumerator) Next() error { + m.lowCurr += 1 + if m.lowCurr >= len(m.lowIdxs) { + // move all the current low iterators forwards + for _, vi := range m.lowIdxs { + err := m.itrs[vi].Next() + if err != nil && err != vellum.ErrIteratorDone { + return err + } + m.currKs[vi], m.currVs[vi] = m.itrs[vi].Current() + } + m.updateMatches() + } + if m.lowK == nil { + return vellum.ErrIteratorDone + } + return nil +} + +// Close all the underlying Iterators. The first error, if any, will +// be returned. +func (m *enumerator) Close() error { + var rv error + for _, itr := range m.itrs { + err := itr.Close() + if rv == nil { + rv = err + } + } + return rv +} diff --git a/index/scorch/segment/zap/enumerator_test.go b/index/scorch/segment/zap/enumerator_test.go new file mode 100644 index 00000000..b2778892 --- /dev/null +++ b/index/scorch/segment/zap/enumerator_test.go @@ -0,0 +1,233 @@ +// Copyright (c) 2018 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package zap + +import ( + "fmt" + "testing" + + "github.com/couchbase/vellum" +) + +type enumTestEntry struct { + key string + val uint64 +} + +type enumTestWant struct { + key string + idx int + val uint64 +} + +func TestEnumerator(t *testing.T) { + tests := []struct { + desc string + in [][]enumTestEntry + want []enumTestWant + }{ + { + desc: "two non-empty enumerators with no duplicate keys", + in: [][]enumTestEntry{ + []enumTestEntry{ + {"a", 1}, + {"c", 3}, + {"e", 5}, + }, + []enumTestEntry{ + {"b", 2}, + {"d", 4}, + {"f", 6}, + }, + }, + want: []enumTestWant{ + {"a", 0, 1}, + {"b", 1, 2}, + {"c", 0, 3}, + {"d", 1, 4}, + {"e", 0, 5}, + {"f", 1, 6}, + }, + }, + { + desc: "two non-empty enumerators with duplicate keys", + in: [][]enumTestEntry{ + []enumTestEntry{ + {"a", 1}, + {"c", 3}, + {"e", 5}, + }, + []enumTestEntry{ + {"a", 2}, + {"c", 4}, + {"e", 6}, + }, + }, + want: []enumTestWant{ + {"a", 0, 1}, + {"a", 1, 2}, + {"c", 0, 3}, + {"c", 1, 4}, + {"e", 0, 5}, + {"e", 1, 6}, + }, + }, + { + desc: "first iterator is empty", + in: [][]enumTestEntry{ + []enumTestEntry{}, + []enumTestEntry{ + {"a", 2}, + {"c", 4}, + {"e", 6}, + }, + }, + want: []enumTestWant{ + {"a", 1, 2}, + {"c", 1, 4}, + {"e", 1, 6}, + }, + }, + { + desc: "last iterator is empty", + in: [][]enumTestEntry{ + []enumTestEntry{ + {"a", 1}, + {"c", 3}, + {"e", 5}, + }, + []enumTestEntry{}, + }, + want: []enumTestWant{ + {"a", 0, 1}, + {"c", 0, 3}, + {"e", 0, 5}, + }, + }, + { + desc: "two different length enumerators with duplicate keys", + in: [][]enumTestEntry{ + []enumTestEntry{ + {"a", 1}, + {"c", 3}, + {"e", 5}, + }, + []enumTestEntry{ + {"a", 2}, + {"b", 4}, + {"d", 1000}, + {"e", 6}, + }, + }, + want: []enumTestWant{ + {"a", 0, 1}, + {"a", 1, 2}, + {"b", 1, 4}, + {"c", 0, 3}, + {"d", 1, 1000}, + {"e", 0, 5}, + {"e", 1, 6}, + }, + }, + } + + for _, test := range tests { + var itrs []vellum.Iterator + for _, entries := range test.in { + itrs = append(itrs, &testIterator{entries: entries}) + } + + enumerator, err := newEnumerator(itrs) + if err != nil { + t.Fatalf("%s - expected no err on newNumerator, got: %v", test.desc, err) + } + + wanti := 0 + for wanti < len(test.want) { + if err != nil { + t.Fatalf("%s - wanted no err, got: %v", test.desc, err) + } + + currK, currIdx, currV := enumerator.Current() + + want := test.want[wanti] + if want.key != string(currK) { + t.Fatalf("%s - wrong key, wanted: %#v, got: %q, %d, %d", test.desc, + want, currK, currIdx, currV) + } + if want.idx != currIdx { + t.Fatalf("%s - wrong idx, wanted: %#v, got: %q, %d, %d", test.desc, + want, currK, currIdx, currV) + } + if want.val != currV { + t.Fatalf("%s - wrong val, wanted: %#v, got: %q, %d, %d", test.desc, + want, currK, currIdx, currV) + } + + wanti += 1 + + err = enumerator.Next() + } + + if err != vellum.ErrIteratorDone { + t.Fatalf("%s - expected ErrIteratorDone, got: %v", test.desc, err) + } + + err = enumerator.Close() + if err != nil { + t.Fatalf("%s - expected nil err on close, got: %v", test.desc, err) + } + + for _, itr := range itrs { + if itr.(*testIterator).curr != 654321 { + t.Fatalf("%s - expected child iter to be closed", test.desc) + } + } + } +} + +type testIterator struct { + entries []enumTestEntry + curr int +} + +func (m *testIterator) Current() ([]byte, uint64) { + if m.curr >= len(m.entries) { + return nil, 0 + } + return []byte(m.entries[m.curr].key), m.entries[m.curr].val +} + +func (m *testIterator) Next() error { + m.curr++ + if m.curr >= len(m.entries) { + return vellum.ErrIteratorDone + } + return nil +} + +func (m *testIterator) Seek(key []byte) error { + return fmt.Errorf("not implemented for enumerator unit tests") +} + +func (m *testIterator) Reset(f *vellum.FST, + startKeyInclusive, endKeyExclusive []byte, aut vellum.Automaton) error { + return fmt.Errorf("not implemented for enumerator unit tests") +} + +func (m *testIterator) Close() error { + m.curr = 654321 + return nil +} diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index b1eed28b..525b7f93 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -154,8 +154,8 @@ func computeNewDocCount(segments []*SegmentBase, drops []*roaring.Bitmap) uint64 return newDocCount } -func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, - fieldsInv []string, fieldsMap map[string]uint16, newDocNums [][]uint64, +func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, + fieldsInv []string, fieldsMap map[string]uint16, newDocNumsIn [][]uint64, newSegDocCount uint64, chunkFactor uint32, w *CountHashWriter) ([]uint64, uint64, error) { @@ -164,10 +164,10 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, var bufLoc []uint64 var postings *PostingsList + var postItr *PostingsIterator rv := make([]uint64, len(fieldsInv)) fieldDvLocs := make([]uint64, len(fieldsInv)) - fieldDvLocsOffset := uint64(fieldNotUninverted) // docTermMap is keyed by docNum, where the array impl provides // better memory usage behavior than a sparse-friendlier hashmap @@ -187,36 +187,31 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, return nil, 0, err } - // collect FST iterators from all segments for this field + // collect FST iterators from all active segments for this field + var newDocNums [][]uint64 + var drops []*roaring.Bitmap var dicts []*Dictionary var itrs []vellum.Iterator - for _, segment := range segments { + + for segmentI, segment := range segments { dict, err2 := segment.dictionary(fieldName) if err2 != nil { return nil, 0, err2 } - dicts = append(dicts, dict) - if dict != nil && dict.fst != nil { itr, err2 := dict.fst.Iterator(nil, nil) if err2 != nil && err2 != vellum.ErrIteratorDone { return nil, 0, err2 } if itr != nil { + newDocNums = append(newDocNums, newDocNumsIn[segmentI]) + drops = append(drops, dropsIn[segmentI]) + dicts = append(dicts, dict) itrs = append(itrs, itr) } } } - // create merging iterator - mergeItr, err := vellum.NewMergeIterator(itrs, func(postingOffsets []uint64) uint64 { - // we don't actually use the merged value - return 0 - }) - - tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) - locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) - if uint64(cap(docTermMap)) < newSegDocCount { docTermMap = make([][]byte, newSegDocCount) } else { @@ -226,71 +221,17 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, } } - for err == nil { - term, _ := mergeItr.Current() + var prevTerm []byte - newRoaring := roaring.NewBitmap() - newRoaringLocs := roaring.NewBitmap() + newRoaring := roaring.NewBitmap() + newRoaringLocs := roaring.NewBitmap() - tfEncoder.Reset() - locEncoder.Reset() + tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) + locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) - // now go back and get posting list for this term - // but pass in the deleted docs for that segment - for dictI, dict := range dicts { - if dict == nil { - continue - } - var err2 error - postings, err2 = dict.postingsList(term, drops[dictI], postings) - if err2 != nil { - return nil, 0, err2 - } - - postItr := postings.Iterator() - next, err2 := postItr.Next() - for next != nil && err2 == nil { - hitNewDocNum := newDocNums[dictI][next.Number()] - if hitNewDocNum == docDropped { - return nil, 0, fmt.Errorf("see hit with dropped doc num") - } - newRoaring.Add(uint32(hitNewDocNum)) - // encode norm bits - norm := next.Norm() - normBits := math.Float32bits(float32(norm)) - err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits)) - if err != nil { - return nil, 0, err - } - locs := next.Locations() - if len(locs) > 0 { - newRoaringLocs.Add(uint32(hitNewDocNum)) - for _, loc := range locs { - if cap(bufLoc) < 5+len(loc.ArrayPositions()) { - bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions())) - } - args := bufLoc[0:5] - args[0] = uint64(fieldsMap[loc.Field()]) - args[1] = loc.Pos() - args[2] = loc.Start() - args[3] = loc.End() - args[4] = uint64(len(loc.ArrayPositions())) - args = append(args, loc.ArrayPositions()...) - err = locEncoder.Add(hitNewDocNum, args...) - if err != nil { - return nil, 0, err - } - } - } - - docTermMap[hitNewDocNum] = - append(append(docTermMap[hitNewDocNum], term...), termSeparator) - - next, err2 = postItr.Next() - } - if err2 != nil { - return nil, 0, err2 - } + finishTerm := func(term []byte) error { + if term == nil { + return nil } tfEncoder.Close() @@ -299,59 +240,140 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, if newRoaring.GetCardinality() > 0 { // this field/term actually has hits in the new segment, lets write it down freqOffset := uint64(w.Count()) - _, err = tfEncoder.Write(w) + _, err := tfEncoder.Write(w) if err != nil { - return nil, 0, err + return err } locOffset := uint64(w.Count()) _, err = locEncoder.Write(w) if err != nil { - return nil, 0, err + return err } postingLocOffset := uint64(w.Count()) _, err = writeRoaringWithLen(newRoaringLocs, w, &bufReuse, bufMaxVarintLen64) if err != nil { - return nil, 0, err + return err } postingOffset := uint64(w.Count()) + // write out the start of the term info - buf := bufMaxVarintLen64 - n := binary.PutUvarint(buf, freqOffset) - _, err = w.Write(buf[:n]) + n := binary.PutUvarint(bufMaxVarintLen64, freqOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { - return nil, 0, err + return err } - // write out the start of the loc info - n = binary.PutUvarint(buf, locOffset) - _, err = w.Write(buf[:n]) + n = binary.PutUvarint(bufMaxVarintLen64, locOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { - return nil, 0, err + return err } - - // write out the start of the loc posting list - n = binary.PutUvarint(buf, postingLocOffset) - _, err = w.Write(buf[:n]) + // write out the start of the posting locs + n = binary.PutUvarint(bufMaxVarintLen64, postingLocOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { - return nil, 0, err + return err } _, err = writeRoaringWithLen(newRoaring, w, &bufReuse, bufMaxVarintLen64) if err != nil { - return nil, 0, err + return err } err = newVellum.Insert(term, postingOffset) if err != nil { - return nil, 0, err + return err } } - err = mergeItr.Next() + newRoaring = roaring.NewBitmap() + newRoaringLocs = roaring.NewBitmap() + + tfEncoder.Reset() + locEncoder.Reset() + + return nil + } + + enumerator, err := newEnumerator(itrs) + + for err == nil { + term, itrI, postingsOffset := enumerator.Current() + + if !bytes.Equal(prevTerm, term) { + // if the term changed, write out the info collected + // for the previous term + err2 := finishTerm(prevTerm) + if err2 != nil { + return nil, 0, err2 + } + } + + var err2 error + postings, err2 = dicts[itrI].postingsListFromOffset( + postingsOffset, drops[itrI], postings) + if err2 != nil { + return nil, 0, err2 + } + + postItr = postings.iterator(postItr) + next, err2 := postItr.Next() + for next != nil && err2 == nil { + hitNewDocNum := newDocNums[itrI][next.Number()] + if hitNewDocNum == docDropped { + return nil, 0, fmt.Errorf("see hit with dropped doc num") + } + newRoaring.Add(uint32(hitNewDocNum)) + // encode norm bits + norm := next.Norm() + normBits := math.Float32bits(float32(norm)) + err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits)) + if err != nil { + return nil, 0, err + } + locs := next.Locations() + if len(locs) > 0 { + newRoaringLocs.Add(uint32(hitNewDocNum)) + for _, loc := range locs { + if cap(bufLoc) < 5+len(loc.ArrayPositions()) { + bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions())) + } + args := bufLoc[0:5] + args[0] = uint64(fieldsMap[loc.Field()]) + args[1] = loc.Pos() + args[2] = loc.Start() + args[3] = loc.End() + args[4] = uint64(len(loc.ArrayPositions())) + args = append(args, loc.ArrayPositions()...) + err = locEncoder.Add(hitNewDocNum, args...) + if err != nil { + return nil, 0, err + } + } + } + + docTermMap[hitNewDocNum] = + append(append(docTermMap[hitNewDocNum], term...), termSeparator) + + next, err2 = postItr.Next() + } + if err2 != nil { + return nil, 0, err2 + } + + prevTerm = prevTerm[:0] // copy to prevTerm in case Next() reuses term mem + prevTerm = append(prevTerm, term...) + + err = enumerator.Next() } if err != nil && err != vellum.ErrIteratorDone { return nil, 0, err } + err = finishTerm(prevTerm) + if err != nil { + return nil, 0, err + } + dictOffset := uint64(w.Count()) err = newVellum.Close() @@ -400,7 +422,7 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, } } - fieldDvLocsOffset = uint64(w.Count()) + fieldDvLocsOffset := uint64(w.Count()) buf := bufMaxVarintLen64 for _, offset := range fieldDvLocs { diff --git a/index/scorch/segment/zap/posting.go b/index/scorch/segment/zap/posting.go index 67e08d1a..d504885d 100644 --- a/index/scorch/segment/zap/posting.go +++ b/index/scorch/segment/zap/posting.go @@ -28,21 +28,27 @@ import ( // PostingsList is an in-memory represenation of a postings list type PostingsList struct { sb *SegmentBase - term []byte postingsOffset uint64 freqOffset uint64 locOffset uint64 locBitmap *roaring.Bitmap postings *roaring.Bitmap except *roaring.Bitmap - postingKey []byte } // Iterator returns an iterator for this postings list func (p *PostingsList) Iterator() segment.PostingsIterator { - rv := &PostingsIterator{ - postings: p, + return p.iterator(nil) +} + +func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator { + if rv == nil { + rv = &PostingsIterator{} + } else { + *rv = PostingsIterator{} // clear the struct } + rv.postings = p + if p.postings != nil { // prepare the freq chunk details var n uint64