diff --git a/index/scorch/segment/zap/dict.go b/index/scorch/segment/zap/dict.go index 55796ffa..e5d71268 100644 --- a/index/scorch/segment/zap/dict.go +++ b/index/scorch/segment/zap/dict.go @@ -38,6 +38,33 @@ func (d *Dictionary) PostingsList(term string, except *roaring.Bitmap) (segment. } func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap, rv *PostingsList) (*PostingsList, error) { + if d.fst == nil { + return d.postingsListInit(rv, except), nil + } + + postingsOffset, exists, err := d.fst.Get(term) + if err != nil { + return nil, fmt.Errorf("vellum err: %v", err) + } + if !exists { + return d.postingsListInit(rv, except), nil + } + + return d.postingsListFromOffset(postingsOffset, except, rv) +} + +func (d *Dictionary) postingsListFromOffset(postingsOffset uint64, except *roaring.Bitmap, rv *PostingsList) (*PostingsList, error) { + rv = d.postingsListInit(rv, except) + + err := rv.read(postingsOffset, d) + if err != nil { + return nil, err + } + + return rv, nil +} + +func (d *Dictionary) postingsListInit(rv *PostingsList, except *roaring.Bitmap) *PostingsList { if rv == nil { rv = &PostingsList{} } else { @@ -45,21 +72,7 @@ func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap, rv *Posti } rv.sb = d.sb rv.except = except - - if d.fst != nil { - postingsOffset, exists, err := d.fst.Get(term) - if err != nil { - return nil, fmt.Errorf("vellum err: %v", err) - } - if exists { - err = rv.read(postingsOffset, d) - if err != nil { - return nil, err - } - } - } - - return rv, nil + return rv } // Iterator returns an iterator for this dictionary diff --git a/index/scorch/segment/zap/enumerator.go b/index/scorch/segment/zap/enumerator.go new file mode 100644 index 00000000..3c708dd5 --- /dev/null +++ b/index/scorch/segment/zap/enumerator.go @@ -0,0 +1,124 @@ +// Copyright (c) 2018 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "bytes" + + "github.com/couchbase/vellum" +) + +// enumerator provides an ordered traversal of multiple vellum +// iterators. Like JOIN of iterators, the enumerator produces a +// sequence of (key, iteratorIndex, value) tuples, sorted by key ASC, +// then iteratorIndex ASC, where the same key might be seen or +// repeated across multiple child iterators. +type enumerator struct { + itrs []vellum.Iterator + currKs [][]byte + currVs []uint64 + + lowK []byte + lowIdxs []int + lowCurr int +} + +// newEnumerator returns a new enumerator over the vellum Iterators +func newEnumerator(itrs []vellum.Iterator) (*enumerator, error) { + rv := &enumerator{ + itrs: itrs, + currKs: make([][]byte, len(itrs)), + currVs: make([]uint64, len(itrs)), + lowIdxs: make([]int, 0, len(itrs)), + } + for i, itr := range rv.itrs { + rv.currKs[i], rv.currVs[i] = itr.Current() + } + rv.updateMatches() + if rv.lowK == nil { + return rv, vellum.ErrIteratorDone + } + return rv, nil +} + +// updateMatches maintains the low key matches based on the currKs +func (m *enumerator) updateMatches() { + m.lowK = nil + m.lowIdxs = m.lowIdxs[:0] + m.lowCurr = 0 + + for i, key := range m.currKs { + if key == nil { + continue + } + + cmp := bytes.Compare(key, m.lowK) + if cmp < 0 || m.lowK == nil { + // reached a new low + m.lowK = key + m.lowIdxs = m.lowIdxs[:0] + m.lowIdxs = append(m.lowIdxs, i) + } else if cmp == 0 { + m.lowIdxs = append(m.lowIdxs, i) + } + } +} + +// Current returns the enumerator's current key, iterator-index, and +// value. If the enumerator is not pointing at a valid value (because +// Next returned an error previously), Current will return nil,0,0. +func (m *enumerator) Current() ([]byte, int, uint64) { + var i int + var v uint64 + if m.lowCurr < len(m.lowIdxs) { + i = m.lowIdxs[m.lowCurr] + v = m.currVs[i] + } + return m.lowK, i, v +} + +// Next advances the enumerator to the next key/iterator/value result, +// else vellum.ErrIteratorDone is returned. +func (m *enumerator) Next() error { + m.lowCurr += 1 + if m.lowCurr >= len(m.lowIdxs) { + // move all the current low iterators forwards + for _, vi := range m.lowIdxs { + err := m.itrs[vi].Next() + if err != nil && err != vellum.ErrIteratorDone { + return err + } + m.currKs[vi], m.currVs[vi] = m.itrs[vi].Current() + } + m.updateMatches() + } + if m.lowK == nil { + return vellum.ErrIteratorDone + } + return nil +} + +// Close all the underlying Iterators. The first error, if any, will +// be returned. +func (m *enumerator) Close() error { + var rv error + for _, itr := range m.itrs { + err := itr.Close() + if rv == nil { + rv = err + } + } + return rv +} diff --git a/index/scorch/segment/zap/enumerator_test.go b/index/scorch/segment/zap/enumerator_test.go new file mode 100644 index 00000000..b2778892 --- /dev/null +++ b/index/scorch/segment/zap/enumerator_test.go @@ -0,0 +1,233 @@ +// Copyright (c) 2018 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package zap + +import ( + "fmt" + "testing" + + "github.com/couchbase/vellum" +) + +type enumTestEntry struct { + key string + val uint64 +} + +type enumTestWant struct { + key string + idx int + val uint64 +} + +func TestEnumerator(t *testing.T) { + tests := []struct { + desc string + in [][]enumTestEntry + want []enumTestWant + }{ + { + desc: "two non-empty enumerators with no duplicate keys", + in: [][]enumTestEntry{ + []enumTestEntry{ + {"a", 1}, + {"c", 3}, + {"e", 5}, + }, + []enumTestEntry{ + {"b", 2}, + {"d", 4}, + {"f", 6}, + }, + }, + want: []enumTestWant{ + {"a", 0, 1}, + {"b", 1, 2}, + {"c", 0, 3}, + {"d", 1, 4}, + {"e", 0, 5}, + {"f", 1, 6}, + }, + }, + { + desc: "two non-empty enumerators with duplicate keys", + in: [][]enumTestEntry{ + []enumTestEntry{ + {"a", 1}, + {"c", 3}, + {"e", 5}, + }, + []enumTestEntry{ + {"a", 2}, + {"c", 4}, + {"e", 6}, + }, + }, + want: []enumTestWant{ + {"a", 0, 1}, + {"a", 1, 2}, + {"c", 0, 3}, + {"c", 1, 4}, + {"e", 0, 5}, + {"e", 1, 6}, + }, + }, + { + desc: "first iterator is empty", + in: [][]enumTestEntry{ + []enumTestEntry{}, + []enumTestEntry{ + {"a", 2}, + {"c", 4}, + {"e", 6}, + }, + }, + want: []enumTestWant{ + {"a", 1, 2}, + {"c", 1, 4}, + {"e", 1, 6}, + }, + }, + { + desc: "last iterator is empty", + in: [][]enumTestEntry{ + []enumTestEntry{ + {"a", 1}, + {"c", 3}, + {"e", 5}, + }, + []enumTestEntry{}, + }, + want: []enumTestWant{ + {"a", 0, 1}, + {"c", 0, 3}, + {"e", 0, 5}, + }, + }, + { + desc: "two different length enumerators with duplicate keys", + in: [][]enumTestEntry{ + []enumTestEntry{ + {"a", 1}, + {"c", 3}, + {"e", 5}, + }, + []enumTestEntry{ + {"a", 2}, + {"b", 4}, + {"d", 1000}, + {"e", 6}, + }, + }, + want: []enumTestWant{ + {"a", 0, 1}, + {"a", 1, 2}, + {"b", 1, 4}, + {"c", 0, 3}, + {"d", 1, 1000}, + {"e", 0, 5}, + {"e", 1, 6}, + }, + }, + } + + for _, test := range tests { + var itrs []vellum.Iterator + for _, entries := range test.in { + itrs = append(itrs, &testIterator{entries: entries}) + } + + enumerator, err := newEnumerator(itrs) + if err != nil { + t.Fatalf("%s - expected no err on newNumerator, got: %v", test.desc, err) + } + + wanti := 0 + for wanti < len(test.want) { + if err != nil { + t.Fatalf("%s - wanted no err, got: %v", test.desc, err) + } + + currK, currIdx, currV := enumerator.Current() + + want := test.want[wanti] + if want.key != string(currK) { + t.Fatalf("%s - wrong key, wanted: %#v, got: %q, %d, %d", test.desc, + want, currK, currIdx, currV) + } + if want.idx != currIdx { + t.Fatalf("%s - wrong idx, wanted: %#v, got: %q, %d, %d", test.desc, + want, currK, currIdx, currV) + } + if want.val != currV { + t.Fatalf("%s - wrong val, wanted: %#v, got: %q, %d, %d", test.desc, + want, currK, currIdx, currV) + } + + wanti += 1 + + err = enumerator.Next() + } + + if err != vellum.ErrIteratorDone { + t.Fatalf("%s - expected ErrIteratorDone, got: %v", test.desc, err) + } + + err = enumerator.Close() + if err != nil { + t.Fatalf("%s - expected nil err on close, got: %v", test.desc, err) + } + + for _, itr := range itrs { + if itr.(*testIterator).curr != 654321 { + t.Fatalf("%s - expected child iter to be closed", test.desc) + } + } + } +} + +type testIterator struct { + entries []enumTestEntry + curr int +} + +func (m *testIterator) Current() ([]byte, uint64) { + if m.curr >= len(m.entries) { + return nil, 0 + } + return []byte(m.entries[m.curr].key), m.entries[m.curr].val +} + +func (m *testIterator) Next() error { + m.curr++ + if m.curr >= len(m.entries) { + return vellum.ErrIteratorDone + } + return nil +} + +func (m *testIterator) Seek(key []byte) error { + return fmt.Errorf("not implemented for enumerator unit tests") +} + +func (m *testIterator) Reset(f *vellum.FST, + startKeyInclusive, endKeyExclusive []byte, aut vellum.Automaton) error { + return fmt.Errorf("not implemented for enumerator unit tests") +} + +func (m *testIterator) Close() error { + m.curr = 654321 + return nil +} diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index 751fcb58..525b7f93 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -154,8 +154,8 @@ func computeNewDocCount(segments []*SegmentBase, drops []*roaring.Bitmap) uint64 return newDocCount } -func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, - fieldsInv []string, fieldsMap map[string]uint16, newDocNums [][]uint64, +func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, + fieldsInv []string, fieldsMap map[string]uint16, newDocNumsIn [][]uint64, newSegDocCount uint64, chunkFactor uint32, w *CountHashWriter) ([]uint64, uint64, error) { @@ -168,7 +168,6 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, rv := make([]uint64, len(fieldsInv)) fieldDvLocs := make([]uint64, len(fieldsInv)) - fieldDvLocsOffset := uint64(fieldNotUninverted) // docTermMap is keyed by docNum, where the array impl provides // better memory usage behavior than a sparse-friendlier hashmap @@ -188,36 +187,31 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, return nil, 0, err } - // collect FST iterators from all segments for this field + // collect FST iterators from all active segments for this field + var newDocNums [][]uint64 + var drops []*roaring.Bitmap var dicts []*Dictionary var itrs []vellum.Iterator - for _, segment := range segments { + + for segmentI, segment := range segments { dict, err2 := segment.dictionary(fieldName) if err2 != nil { return nil, 0, err2 } - dicts = append(dicts, dict) - if dict != nil && dict.fst != nil { itr, err2 := dict.fst.Iterator(nil, nil) if err2 != nil && err2 != vellum.ErrIteratorDone { return nil, 0, err2 } if itr != nil { + newDocNums = append(newDocNums, newDocNumsIn[segmentI]) + drops = append(drops, dropsIn[segmentI]) + dicts = append(dicts, dict) itrs = append(itrs, itr) } } } - // create merging iterator - mergeItr, err := vellum.NewMergeIterator(itrs, func(postingOffsets []uint64) uint64 { - // we don't actually use the merged value - return 0 - }) - - tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) - locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) - if uint64(cap(docTermMap)) < newSegDocCount { docTermMap = make([][]byte, newSegDocCount) } else { @@ -227,71 +221,17 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, } } - for err == nil { - term, _ := mergeItr.Current() + var prevTerm []byte - newRoaring := roaring.NewBitmap() - newRoaringLocs := roaring.NewBitmap() + newRoaring := roaring.NewBitmap() + newRoaringLocs := roaring.NewBitmap() - tfEncoder.Reset() - locEncoder.Reset() + tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) + locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) - // now go back and get posting list for this term - // but pass in the deleted docs for that segment - for dictI, dict := range dicts { - if dict == nil { - continue - } - var err2 error - postings, err2 = dict.postingsList(term, drops[dictI], postings) - if err2 != nil { - return nil, 0, err2 - } - - postItr = postings.iterator(postItr) - next, err2 := postItr.Next() - for next != nil && err2 == nil { - hitNewDocNum := newDocNums[dictI][next.Number()] - if hitNewDocNum == docDropped { - return nil, 0, fmt.Errorf("see hit with dropped doc num") - } - newRoaring.Add(uint32(hitNewDocNum)) - // encode norm bits - norm := next.Norm() - normBits := math.Float32bits(float32(norm)) - err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits)) - if err != nil { - return nil, 0, err - } - locs := next.Locations() - if len(locs) > 0 { - newRoaringLocs.Add(uint32(hitNewDocNum)) - for _, loc := range locs { - if cap(bufLoc) < 5+len(loc.ArrayPositions()) { - bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions())) - } - args := bufLoc[0:5] - args[0] = uint64(fieldsMap[loc.Field()]) - args[1] = loc.Pos() - args[2] = loc.Start() - args[3] = loc.End() - args[4] = uint64(len(loc.ArrayPositions())) - args = append(args, loc.ArrayPositions()...) - err = locEncoder.Add(hitNewDocNum, args...) - if err != nil { - return nil, 0, err - } - } - } - - docTermMap[hitNewDocNum] = - append(append(docTermMap[hitNewDocNum], term...), termSeparator) - - next, err2 = postItr.Next() - } - if err2 != nil { - return nil, 0, err2 - } + finishTerm := func(term []byte) error { + if term == nil { + return nil } tfEncoder.Close() @@ -300,59 +240,140 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, if newRoaring.GetCardinality() > 0 { // this field/term actually has hits in the new segment, lets write it down freqOffset := uint64(w.Count()) - _, err = tfEncoder.Write(w) + _, err := tfEncoder.Write(w) if err != nil { - return nil, 0, err + return err } locOffset := uint64(w.Count()) _, err = locEncoder.Write(w) if err != nil { - return nil, 0, err + return err } postingLocOffset := uint64(w.Count()) _, err = writeRoaringWithLen(newRoaringLocs, w, &bufReuse, bufMaxVarintLen64) if err != nil { - return nil, 0, err + return err } postingOffset := uint64(w.Count()) + // write out the start of the term info - buf := bufMaxVarintLen64 - n := binary.PutUvarint(buf, freqOffset) - _, err = w.Write(buf[:n]) + n := binary.PutUvarint(bufMaxVarintLen64, freqOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { - return nil, 0, err + return err } - // write out the start of the loc info - n = binary.PutUvarint(buf, locOffset) - _, err = w.Write(buf[:n]) + n = binary.PutUvarint(bufMaxVarintLen64, locOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { - return nil, 0, err + return err } - - // write out the start of the loc posting list - n = binary.PutUvarint(buf, postingLocOffset) - _, err = w.Write(buf[:n]) + // write out the start of the posting locs + n = binary.PutUvarint(bufMaxVarintLen64, postingLocOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { - return nil, 0, err + return err } _, err = writeRoaringWithLen(newRoaring, w, &bufReuse, bufMaxVarintLen64) if err != nil { - return nil, 0, err + return err } err = newVellum.Insert(term, postingOffset) if err != nil { - return nil, 0, err + return err } } - err = mergeItr.Next() + newRoaring = roaring.NewBitmap() + newRoaringLocs = roaring.NewBitmap() + + tfEncoder.Reset() + locEncoder.Reset() + + return nil + } + + enumerator, err := newEnumerator(itrs) + + for err == nil { + term, itrI, postingsOffset := enumerator.Current() + + if !bytes.Equal(prevTerm, term) { + // if the term changed, write out the info collected + // for the previous term + err2 := finishTerm(prevTerm) + if err2 != nil { + return nil, 0, err2 + } + } + + var err2 error + postings, err2 = dicts[itrI].postingsListFromOffset( + postingsOffset, drops[itrI], postings) + if err2 != nil { + return nil, 0, err2 + } + + postItr = postings.iterator(postItr) + next, err2 := postItr.Next() + for next != nil && err2 == nil { + hitNewDocNum := newDocNums[itrI][next.Number()] + if hitNewDocNum == docDropped { + return nil, 0, fmt.Errorf("see hit with dropped doc num") + } + newRoaring.Add(uint32(hitNewDocNum)) + // encode norm bits + norm := next.Norm() + normBits := math.Float32bits(float32(norm)) + err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits)) + if err != nil { + return nil, 0, err + } + locs := next.Locations() + if len(locs) > 0 { + newRoaringLocs.Add(uint32(hitNewDocNum)) + for _, loc := range locs { + if cap(bufLoc) < 5+len(loc.ArrayPositions()) { + bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions())) + } + args := bufLoc[0:5] + args[0] = uint64(fieldsMap[loc.Field()]) + args[1] = loc.Pos() + args[2] = loc.Start() + args[3] = loc.End() + args[4] = uint64(len(loc.ArrayPositions())) + args = append(args, loc.ArrayPositions()...) + err = locEncoder.Add(hitNewDocNum, args...) + if err != nil { + return nil, 0, err + } + } + } + + docTermMap[hitNewDocNum] = + append(append(docTermMap[hitNewDocNum], term...), termSeparator) + + next, err2 = postItr.Next() + } + if err2 != nil { + return nil, 0, err2 + } + + prevTerm = prevTerm[:0] // copy to prevTerm in case Next() reuses term mem + prevTerm = append(prevTerm, term...) + + err = enumerator.Next() } if err != nil && err != vellum.ErrIteratorDone { return nil, 0, err } + err = finishTerm(prevTerm) + if err != nil { + return nil, 0, err + } + dictOffset := uint64(w.Count()) err = newVellum.Close() @@ -401,7 +422,7 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, } } - fieldDvLocsOffset = uint64(w.Count()) + fieldDvLocsOffset := uint64(w.Count()) buf := bufMaxVarintLen64 for _, offset := range fieldDvLocs {