From dee6a2b1c64802c6a84a830f4a149bf25fbf91c4 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Thu, 8 Feb 2018 12:33:19 -0800 Subject: [PATCH 1/9] scorch persistSnapshot() consistently uses err to commit vs abort Some codepaths in persistSnapshot() were saving errors into an err2 local variable, which might lead incorrectly to commit during an error situation rather than abort. --- index/scorch/persister.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 61a266ad..07f38b81 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -196,17 +196,17 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { for _, segmentSnapshot := range snapshot.segment { snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id) snapshotSegmentBucket, err2 := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey) - if err2 != nil { - return err2 + if err != nil { + return err } switch seg := segmentSnapshot.segment.(type) { case *zap.SegmentBase: // need to persist this to disk filename := zapFileName(segmentSnapshot.id) path := s.path + string(os.PathSeparator) + filename - err2 := zap.PersistSegmentBase(seg, path) - if err2 != nil { - return fmt.Errorf("error persisting segment: %v", err2) + err = zap.PersistSegmentBase(seg, path) + if err != nil { + return fmt.Errorf("error persisting segment: %v", err) } newSegmentPaths[segmentSnapshot.id] = path err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename)) From 83272a9629509d40e8db4fe382e7b95e1fd3dcff Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Thu, 8 Feb 2018 12:47:07 -0800 Subject: [PATCH 2/9] scorch persistSnapshot() err handling & propagation --- index/scorch/persister.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 07f38b81..dab753d7 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -145,23 +145,15 @@ OUTER: } } -func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { +func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { // start a write transaction tx, err := s.rootBolt.Begin(true) if err != nil { return err } - // defer fsync of the rootbolt + // defer rollback on error defer func() { - if err == nil { - err = s.rootBolt.Sync() - } - }() - // defer commit/rollback transaction - defer func() { - if err == nil { - err = tx.Commit() - } else { + if err != nil { _ = tx.Rollback() } }() @@ -195,7 +187,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { // first ensure that each segment in this snapshot has been persisted for _, segmentSnapshot := range snapshot.segment { snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id) - snapshotSegmentBucket, err2 := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey) + snapshotSegmentBucket, err := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey) if err != nil { return err } @@ -300,7 +292,19 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { _ = rootPrev.DecRef() } } - // allow files to become eligible for removal + + err = tx.Commit() + if err != nil { + return err + } + + err = s.rootBolt.Sync() + if err != nil { + return err + } + + // allow files to become eligible for removal after commit, such + // as file segments from snapshots that came from the merger s.rootLock.Lock() for _, filename := range filenames { delete(s.ineligibleForRemoval, filename) From 6f5f90cd41720665b04e36805605638fef3120f9 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 7 Feb 2018 16:54:58 -0800 Subject: [PATCH 3/9] scorch zap segment cleanup handling for some edge cases Two cases in this commit... If we're shutting down, the merger might not have handed off its latest merged segment to the introducer yet, so the merger still owns the segment and needs to Close() that segment itself. In persistSnapshot(), there migth be cases where the persister might not be able to swap in its newly persisted segments -- so, the persistSnapshot() needs to Close() those segments itself. --- index/scorch/merge.go | 1 + index/scorch/persister.go | 15 ++++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 41abe065..fb4e80d2 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -146,6 +146,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error { // give it to the introducer select { case <-s.closeCh: + _ = segment.Close() return nil case s.merges <- sm: } diff --git a/index/scorch/persister.go b/index/scorch/persister.go index dab753d7..83909a88 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -241,14 +241,18 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { if len(newSegmentPaths) > 0 { // now try to open all the new snapshots newSegments := make(map[uint64]segment.Segment) + defer func() { + for _, s := range newSegments { + if s != nil { + // cleanup segments that were opened but not + // swapped into the new root + _ = s.Close() + } + } + }() for segmentID, path := range newSegmentPaths { newSegments[segmentID], err = zap.Open(path) if err != nil { - for _, s := range newSegments { - if s != nil { - _ = s.Close() // cleanup segments that were successfully opened - } - } return fmt.Errorf("error opening new segment at %s, %v", path, err) } } @@ -273,6 +277,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { cachedDocs: segmentSnapshot.cachedDocs, } newIndexSnapshot.segment[i] = newSegmentSnapshot + delete(newSegments, segmentSnapshot.id) // update items persisted incase of a new segment snapshot atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count()) } else { From f177f07613dcaa6c993c6bcdb4963f6fa58f0a67 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Thu, 8 Feb 2018 17:11:35 -0800 Subject: [PATCH 4/9] scorch zap segment merging reuses prealloc'ed PostingsIterator During zap segment merging, a new zap PostingsIterator was allocated for every field X segment X term. This change optimizes by reusing a single PostingsIterator instance per persistMergedRest() invocation. And, also unused fields are removed from the PostingsIterator. --- index/scorch/segment/zap/dict.go | 1 - index/scorch/segment/zap/merge.go | 3 ++- index/scorch/segment/zap/posting.go | 14 ++++++++++---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/index/scorch/segment/zap/dict.go b/index/scorch/segment/zap/dict.go index 137c35d7..55796ffa 100644 --- a/index/scorch/segment/zap/dict.go +++ b/index/scorch/segment/zap/dict.go @@ -44,7 +44,6 @@ func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap, rv *Posti *rv = PostingsList{} // clear the struct } rv.sb = d.sb - rv.term = term rv.except = except if d.fst != nil { diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index b1eed28b..751fcb58 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -164,6 +164,7 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, var bufLoc []uint64 var postings *PostingsList + var postItr *PostingsIterator rv := make([]uint64, len(fieldsInv)) fieldDvLocs := make([]uint64, len(fieldsInv)) @@ -247,7 +248,7 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, return nil, 0, err2 } - postItr := postings.Iterator() + postItr = postings.iterator(postItr) next, err2 := postItr.Next() for next != nil && err2 == nil { hitNewDocNum := newDocNums[dictI][next.Number()] diff --git a/index/scorch/segment/zap/posting.go b/index/scorch/segment/zap/posting.go index 67e08d1a..d504885d 100644 --- a/index/scorch/segment/zap/posting.go +++ b/index/scorch/segment/zap/posting.go @@ -28,21 +28,27 @@ import ( // PostingsList is an in-memory represenation of a postings list type PostingsList struct { sb *SegmentBase - term []byte postingsOffset uint64 freqOffset uint64 locOffset uint64 locBitmap *roaring.Bitmap postings *roaring.Bitmap except *roaring.Bitmap - postingKey []byte } // Iterator returns an iterator for this postings list func (p *PostingsList) Iterator() segment.PostingsIterator { - rv := &PostingsIterator{ - postings: p, + return p.iterator(nil) +} + +func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator { + if rv == nil { + rv = &PostingsIterator{} + } else { + *rv = PostingsIterator{} // clear the struct } + rv.postings = p + if p.postings != nil { // prepare the freq chunk details var n uint64 From e37c563c560a7cc35449fb29b7b4d42b6d105b37 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Thu, 8 Feb 2018 18:01:23 -0800 Subject: [PATCH 5/9] scorch zap merge move fieldDvLocsOffset var declaration Move the var declaration to nearer where its used. --- index/scorch/segment/zap/merge.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index 751fcb58..c9e275c5 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -168,7 +168,6 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, rv := make([]uint64, len(fieldsInv)) fieldDvLocs := make([]uint64, len(fieldsInv)) - fieldDvLocsOffset := uint64(fieldNotUninverted) // docTermMap is keyed by docNum, where the array impl provides // better memory usage behavior than a sparse-friendlier hashmap @@ -401,7 +400,7 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, } } - fieldDvLocsOffset = uint64(w.Count()) + fieldDvLocsOffset := uint64(w.Count()) buf := bufMaxVarintLen64 for _, offset := range fieldDvLocs { From 95a4f37e5c3d66feae18c6f1d3aa200e5764b95e Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Mon, 12 Feb 2018 14:37:40 -0800 Subject: [PATCH 6/9] scorch zap enumerator impl that joins multiple vellum iterators Unlike vellum's MergeIterator, the enumerator introduced in this commit doesn't merge when there are matching keys across iterators. Instead, the enumerator implementation provides a traversal of all the tuples of (key, iteratorIndex, val) from the underlying vellum iterators, ordered by key ASC, iteratorIndex ASC. --- index/scorch/segment/zap/enumerator.go | 124 +++++++++++ index/scorch/segment/zap/enumerator_test.go | 233 ++++++++++++++++++++ 2 files changed, 357 insertions(+) create mode 100644 index/scorch/segment/zap/enumerator.go create mode 100644 index/scorch/segment/zap/enumerator_test.go diff --git a/index/scorch/segment/zap/enumerator.go b/index/scorch/segment/zap/enumerator.go new file mode 100644 index 00000000..3c708dd5 --- /dev/null +++ b/index/scorch/segment/zap/enumerator.go @@ -0,0 +1,124 @@ +// Copyright (c) 2018 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "bytes" + + "github.com/couchbase/vellum" +) + +// enumerator provides an ordered traversal of multiple vellum +// iterators. Like JOIN of iterators, the enumerator produces a +// sequence of (key, iteratorIndex, value) tuples, sorted by key ASC, +// then iteratorIndex ASC, where the same key might be seen or +// repeated across multiple child iterators. +type enumerator struct { + itrs []vellum.Iterator + currKs [][]byte + currVs []uint64 + + lowK []byte + lowIdxs []int + lowCurr int +} + +// newEnumerator returns a new enumerator over the vellum Iterators +func newEnumerator(itrs []vellum.Iterator) (*enumerator, error) { + rv := &enumerator{ + itrs: itrs, + currKs: make([][]byte, len(itrs)), + currVs: make([]uint64, len(itrs)), + lowIdxs: make([]int, 0, len(itrs)), + } + for i, itr := range rv.itrs { + rv.currKs[i], rv.currVs[i] = itr.Current() + } + rv.updateMatches() + if rv.lowK == nil { + return rv, vellum.ErrIteratorDone + } + return rv, nil +} + +// updateMatches maintains the low key matches based on the currKs +func (m *enumerator) updateMatches() { + m.lowK = nil + m.lowIdxs = m.lowIdxs[:0] + m.lowCurr = 0 + + for i, key := range m.currKs { + if key == nil { + continue + } + + cmp := bytes.Compare(key, m.lowK) + if cmp < 0 || m.lowK == nil { + // reached a new low + m.lowK = key + m.lowIdxs = m.lowIdxs[:0] + m.lowIdxs = append(m.lowIdxs, i) + } else if cmp == 0 { + m.lowIdxs = append(m.lowIdxs, i) + } + } +} + +// Current returns the enumerator's current key, iterator-index, and +// value. If the enumerator is not pointing at a valid value (because +// Next returned an error previously), Current will return nil,0,0. +func (m *enumerator) Current() ([]byte, int, uint64) { + var i int + var v uint64 + if m.lowCurr < len(m.lowIdxs) { + i = m.lowIdxs[m.lowCurr] + v = m.currVs[i] + } + return m.lowK, i, v +} + +// Next advances the enumerator to the next key/iterator/value result, +// else vellum.ErrIteratorDone is returned. +func (m *enumerator) Next() error { + m.lowCurr += 1 + if m.lowCurr >= len(m.lowIdxs) { + // move all the current low iterators forwards + for _, vi := range m.lowIdxs { + err := m.itrs[vi].Next() + if err != nil && err != vellum.ErrIteratorDone { + return err + } + m.currKs[vi], m.currVs[vi] = m.itrs[vi].Current() + } + m.updateMatches() + } + if m.lowK == nil { + return vellum.ErrIteratorDone + } + return nil +} + +// Close all the underlying Iterators. The first error, if any, will +// be returned. +func (m *enumerator) Close() error { + var rv error + for _, itr := range m.itrs { + err := itr.Close() + if rv == nil { + rv = err + } + } + return rv +} diff --git a/index/scorch/segment/zap/enumerator_test.go b/index/scorch/segment/zap/enumerator_test.go new file mode 100644 index 00000000..b2778892 --- /dev/null +++ b/index/scorch/segment/zap/enumerator_test.go @@ -0,0 +1,233 @@ +// Copyright (c) 2018 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package zap + +import ( + "fmt" + "testing" + + "github.com/couchbase/vellum" +) + +type enumTestEntry struct { + key string + val uint64 +} + +type enumTestWant struct { + key string + idx int + val uint64 +} + +func TestEnumerator(t *testing.T) { + tests := []struct { + desc string + in [][]enumTestEntry + want []enumTestWant + }{ + { + desc: "two non-empty enumerators with no duplicate keys", + in: [][]enumTestEntry{ + []enumTestEntry{ + {"a", 1}, + {"c", 3}, + {"e", 5}, + }, + []enumTestEntry{ + {"b", 2}, + {"d", 4}, + {"f", 6}, + }, + }, + want: []enumTestWant{ + {"a", 0, 1}, + {"b", 1, 2}, + {"c", 0, 3}, + {"d", 1, 4}, + {"e", 0, 5}, + {"f", 1, 6}, + }, + }, + { + desc: "two non-empty enumerators with duplicate keys", + in: [][]enumTestEntry{ + []enumTestEntry{ + {"a", 1}, + {"c", 3}, + {"e", 5}, + }, + []enumTestEntry{ + {"a", 2}, + {"c", 4}, + {"e", 6}, + }, + }, + want: []enumTestWant{ + {"a", 0, 1}, + {"a", 1, 2}, + {"c", 0, 3}, + {"c", 1, 4}, + {"e", 0, 5}, + {"e", 1, 6}, + }, + }, + { + desc: "first iterator is empty", + in: [][]enumTestEntry{ + []enumTestEntry{}, + []enumTestEntry{ + {"a", 2}, + {"c", 4}, + {"e", 6}, + }, + }, + want: []enumTestWant{ + {"a", 1, 2}, + {"c", 1, 4}, + {"e", 1, 6}, + }, + }, + { + desc: "last iterator is empty", + in: [][]enumTestEntry{ + []enumTestEntry{ + {"a", 1}, + {"c", 3}, + {"e", 5}, + }, + []enumTestEntry{}, + }, + want: []enumTestWant{ + {"a", 0, 1}, + {"c", 0, 3}, + {"e", 0, 5}, + }, + }, + { + desc: "two different length enumerators with duplicate keys", + in: [][]enumTestEntry{ + []enumTestEntry{ + {"a", 1}, + {"c", 3}, + {"e", 5}, + }, + []enumTestEntry{ + {"a", 2}, + {"b", 4}, + {"d", 1000}, + {"e", 6}, + }, + }, + want: []enumTestWant{ + {"a", 0, 1}, + {"a", 1, 2}, + {"b", 1, 4}, + {"c", 0, 3}, + {"d", 1, 1000}, + {"e", 0, 5}, + {"e", 1, 6}, + }, + }, + } + + for _, test := range tests { + var itrs []vellum.Iterator + for _, entries := range test.in { + itrs = append(itrs, &testIterator{entries: entries}) + } + + enumerator, err := newEnumerator(itrs) + if err != nil { + t.Fatalf("%s - expected no err on newNumerator, got: %v", test.desc, err) + } + + wanti := 0 + for wanti < len(test.want) { + if err != nil { + t.Fatalf("%s - wanted no err, got: %v", test.desc, err) + } + + currK, currIdx, currV := enumerator.Current() + + want := test.want[wanti] + if want.key != string(currK) { + t.Fatalf("%s - wrong key, wanted: %#v, got: %q, %d, %d", test.desc, + want, currK, currIdx, currV) + } + if want.idx != currIdx { + t.Fatalf("%s - wrong idx, wanted: %#v, got: %q, %d, %d", test.desc, + want, currK, currIdx, currV) + } + if want.val != currV { + t.Fatalf("%s - wrong val, wanted: %#v, got: %q, %d, %d", test.desc, + want, currK, currIdx, currV) + } + + wanti += 1 + + err = enumerator.Next() + } + + if err != vellum.ErrIteratorDone { + t.Fatalf("%s - expected ErrIteratorDone, got: %v", test.desc, err) + } + + err = enumerator.Close() + if err != nil { + t.Fatalf("%s - expected nil err on close, got: %v", test.desc, err) + } + + for _, itr := range itrs { + if itr.(*testIterator).curr != 654321 { + t.Fatalf("%s - expected child iter to be closed", test.desc) + } + } + } +} + +type testIterator struct { + entries []enumTestEntry + curr int +} + +func (m *testIterator) Current() ([]byte, uint64) { + if m.curr >= len(m.entries) { + return nil, 0 + } + return []byte(m.entries[m.curr].key), m.entries[m.curr].val +} + +func (m *testIterator) Next() error { + m.curr++ + if m.curr >= len(m.entries) { + return vellum.ErrIteratorDone + } + return nil +} + +func (m *testIterator) Seek(key []byte) error { + return fmt.Errorf("not implemented for enumerator unit tests") +} + +func (m *testIterator) Reset(f *vellum.FST, + startKeyInclusive, endKeyExclusive []byte, aut vellum.Automaton) error { + return fmt.Errorf("not implemented for enumerator unit tests") +} + +func (m *testIterator) Close() error { + m.curr = 654321 + return nil +} From 2158e06c40dd40f14685c066531ec91352a57686 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Mon, 12 Feb 2018 17:29:50 -0800 Subject: [PATCH 7/9] scorch zap merge collects dicts & itrs in lock-step The theory with this change is that the dicts and itrs should be positionally in "lock-step" with paired entries. And, since later code also uses the same array indexing to access the drops and newDocNums, those also need to be positionally in pair-wise lock-step, too. --- index/scorch/segment/zap/merge.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index c9e275c5..0457fc82 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -154,8 +154,8 @@ func computeNewDocCount(segments []*SegmentBase, drops []*roaring.Bitmap) uint64 return newDocCount } -func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, - fieldsInv []string, fieldsMap map[string]uint16, newDocNums [][]uint64, +func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, + fieldsInv []string, fieldsMap map[string]uint16, newDocNumsIn [][]uint64, newSegDocCount uint64, chunkFactor uint32, w *CountHashWriter) ([]uint64, uint64, error) { @@ -187,15 +187,17 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, return nil, 0, err } - // collect FST iterators from all segments for this field + // collect FST iterators from all active segments for this field + var newDocNums [][]uint64 + var drops []*roaring.Bitmap var dicts []*Dictionary var itrs []vellum.Iterator - for _, segment := range segments { + + for segmentI, segment := range segments { dict, err2 := segment.dictionary(fieldName) if err2 != nil { return nil, 0, err2 } - dicts = append(dicts, dict) if dict != nil && dict.fst != nil { itr, err2 := dict.fst.Iterator(nil, nil) @@ -203,6 +205,9 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, return nil, 0, err2 } if itr != nil { + newDocNums = append(newDocNums, newDocNumsIn[segmentI]) + drops = append(drops, dropsIn[segmentI]) + dicts = append(dicts, dict) itrs = append(itrs, itr) } } From a073424e5ac51ee96053f09ccabac32521625aab Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Mon, 12 Feb 2018 17:47:28 -0800 Subject: [PATCH 8/9] scorch zap dict.postingsListFromOffset() method A helper method that can create a PostingsList if the caller already knows the postingsOffset. --- index/scorch/segment/zap/dict.go | 43 +++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/index/scorch/segment/zap/dict.go b/index/scorch/segment/zap/dict.go index 55796ffa..e5d71268 100644 --- a/index/scorch/segment/zap/dict.go +++ b/index/scorch/segment/zap/dict.go @@ -38,6 +38,33 @@ func (d *Dictionary) PostingsList(term string, except *roaring.Bitmap) (segment. } func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap, rv *PostingsList) (*PostingsList, error) { + if d.fst == nil { + return d.postingsListInit(rv, except), nil + } + + postingsOffset, exists, err := d.fst.Get(term) + if err != nil { + return nil, fmt.Errorf("vellum err: %v", err) + } + if !exists { + return d.postingsListInit(rv, except), nil + } + + return d.postingsListFromOffset(postingsOffset, except, rv) +} + +func (d *Dictionary) postingsListFromOffset(postingsOffset uint64, except *roaring.Bitmap, rv *PostingsList) (*PostingsList, error) { + rv = d.postingsListInit(rv, except) + + err := rv.read(postingsOffset, d) + if err != nil { + return nil, err + } + + return rv, nil +} + +func (d *Dictionary) postingsListInit(rv *PostingsList, except *roaring.Bitmap) *PostingsList { if rv == nil { rv = &PostingsList{} } else { @@ -45,21 +72,7 @@ func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap, rv *Posti } rv.sb = d.sb rv.except = except - - if d.fst != nil { - postingsOffset, exists, err := d.fst.Get(term) - if err != nil { - return nil, fmt.Errorf("vellum err: %v", err) - } - if exists { - err = rv.read(postingsOffset, d) - if err != nil { - return nil, err - } - } - } - - return rv, nil + return rv } // Iterator returns an iterator for this dictionary From fe544f33522ea1f89df03f4722da37d4e4ad9a40 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Mon, 12 Feb 2018 17:48:49 -0800 Subject: [PATCH 9/9] scorch zap merge uses enumerator for vellum.Iterator's --- index/scorch/segment/zap/merge.go | 201 ++++++++++++++++-------------- 1 file changed, 109 insertions(+), 92 deletions(-) diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index 0457fc82..525b7f93 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -198,7 +198,6 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, if err2 != nil { return nil, 0, err2 } - if dict != nil && dict.fst != nil { itr, err2 := dict.fst.Iterator(nil, nil) if err2 != nil && err2 != vellum.ErrIteratorDone { @@ -213,15 +212,6 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, } } - // create merging iterator - mergeItr, err := vellum.NewMergeIterator(itrs, func(postingOffsets []uint64) uint64 { - // we don't actually use the merged value - return 0 - }) - - tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) - locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) - if uint64(cap(docTermMap)) < newSegDocCount { docTermMap = make([][]byte, newSegDocCount) } else { @@ -231,71 +221,17 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, } } - for err == nil { - term, _ := mergeItr.Current() + var prevTerm []byte - newRoaring := roaring.NewBitmap() - newRoaringLocs := roaring.NewBitmap() + newRoaring := roaring.NewBitmap() + newRoaringLocs := roaring.NewBitmap() - tfEncoder.Reset() - locEncoder.Reset() + tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) + locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) - // now go back and get posting list for this term - // but pass in the deleted docs for that segment - for dictI, dict := range dicts { - if dict == nil { - continue - } - var err2 error - postings, err2 = dict.postingsList(term, drops[dictI], postings) - if err2 != nil { - return nil, 0, err2 - } - - postItr = postings.iterator(postItr) - next, err2 := postItr.Next() - for next != nil && err2 == nil { - hitNewDocNum := newDocNums[dictI][next.Number()] - if hitNewDocNum == docDropped { - return nil, 0, fmt.Errorf("see hit with dropped doc num") - } - newRoaring.Add(uint32(hitNewDocNum)) - // encode norm bits - norm := next.Norm() - normBits := math.Float32bits(float32(norm)) - err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits)) - if err != nil { - return nil, 0, err - } - locs := next.Locations() - if len(locs) > 0 { - newRoaringLocs.Add(uint32(hitNewDocNum)) - for _, loc := range locs { - if cap(bufLoc) < 5+len(loc.ArrayPositions()) { - bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions())) - } - args := bufLoc[0:5] - args[0] = uint64(fieldsMap[loc.Field()]) - args[1] = loc.Pos() - args[2] = loc.Start() - args[3] = loc.End() - args[4] = uint64(len(loc.ArrayPositions())) - args = append(args, loc.ArrayPositions()...) - err = locEncoder.Add(hitNewDocNum, args...) - if err != nil { - return nil, 0, err - } - } - } - - docTermMap[hitNewDocNum] = - append(append(docTermMap[hitNewDocNum], term...), termSeparator) - - next, err2 = postItr.Next() - } - if err2 != nil { - return nil, 0, err2 - } + finishTerm := func(term []byte) error { + if term == nil { + return nil } tfEncoder.Close() @@ -304,59 +240,140 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, if newRoaring.GetCardinality() > 0 { // this field/term actually has hits in the new segment, lets write it down freqOffset := uint64(w.Count()) - _, err = tfEncoder.Write(w) + _, err := tfEncoder.Write(w) if err != nil { - return nil, 0, err + return err } locOffset := uint64(w.Count()) _, err = locEncoder.Write(w) if err != nil { - return nil, 0, err + return err } postingLocOffset := uint64(w.Count()) _, err = writeRoaringWithLen(newRoaringLocs, w, &bufReuse, bufMaxVarintLen64) if err != nil { - return nil, 0, err + return err } postingOffset := uint64(w.Count()) + // write out the start of the term info - buf := bufMaxVarintLen64 - n := binary.PutUvarint(buf, freqOffset) - _, err = w.Write(buf[:n]) + n := binary.PutUvarint(bufMaxVarintLen64, freqOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { - return nil, 0, err + return err } - // write out the start of the loc info - n = binary.PutUvarint(buf, locOffset) - _, err = w.Write(buf[:n]) + n = binary.PutUvarint(bufMaxVarintLen64, locOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { - return nil, 0, err + return err } - - // write out the start of the loc posting list - n = binary.PutUvarint(buf, postingLocOffset) - _, err = w.Write(buf[:n]) + // write out the start of the posting locs + n = binary.PutUvarint(bufMaxVarintLen64, postingLocOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { - return nil, 0, err + return err } _, err = writeRoaringWithLen(newRoaring, w, &bufReuse, bufMaxVarintLen64) if err != nil { - return nil, 0, err + return err } err = newVellum.Insert(term, postingOffset) if err != nil { - return nil, 0, err + return err } } - err = mergeItr.Next() + newRoaring = roaring.NewBitmap() + newRoaringLocs = roaring.NewBitmap() + + tfEncoder.Reset() + locEncoder.Reset() + + return nil + } + + enumerator, err := newEnumerator(itrs) + + for err == nil { + term, itrI, postingsOffset := enumerator.Current() + + if !bytes.Equal(prevTerm, term) { + // if the term changed, write out the info collected + // for the previous term + err2 := finishTerm(prevTerm) + if err2 != nil { + return nil, 0, err2 + } + } + + var err2 error + postings, err2 = dicts[itrI].postingsListFromOffset( + postingsOffset, drops[itrI], postings) + if err2 != nil { + return nil, 0, err2 + } + + postItr = postings.iterator(postItr) + next, err2 := postItr.Next() + for next != nil && err2 == nil { + hitNewDocNum := newDocNums[itrI][next.Number()] + if hitNewDocNum == docDropped { + return nil, 0, fmt.Errorf("see hit with dropped doc num") + } + newRoaring.Add(uint32(hitNewDocNum)) + // encode norm bits + norm := next.Norm() + normBits := math.Float32bits(float32(norm)) + err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits)) + if err != nil { + return nil, 0, err + } + locs := next.Locations() + if len(locs) > 0 { + newRoaringLocs.Add(uint32(hitNewDocNum)) + for _, loc := range locs { + if cap(bufLoc) < 5+len(loc.ArrayPositions()) { + bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions())) + } + args := bufLoc[0:5] + args[0] = uint64(fieldsMap[loc.Field()]) + args[1] = loc.Pos() + args[2] = loc.Start() + args[3] = loc.End() + args[4] = uint64(len(loc.ArrayPositions())) + args = append(args, loc.ArrayPositions()...) + err = locEncoder.Add(hitNewDocNum, args...) + if err != nil { + return nil, 0, err + } + } + } + + docTermMap[hitNewDocNum] = + append(append(docTermMap[hitNewDocNum], term...), termSeparator) + + next, err2 = postItr.Next() + } + if err2 != nil { + return nil, 0, err2 + } + + prevTerm = prevTerm[:0] // copy to prevTerm in case Next() reuses term mem + prevTerm = append(prevTerm, term...) + + err = enumerator.Next() } if err != nil && err != vellum.ErrIteratorDone { return nil, 0, err } + err = finishTerm(prevTerm) + if err != nil { + return nil, 0, err + } + dictOffset := uint64(w.Count()) err = newVellum.Close()