Merge branch 'master' into persister_pause
This commit is contained in:
commit
35611f4287
|
@ -146,6 +146,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
|
||||||
// give it to the introducer
|
// give it to the introducer
|
||||||
select {
|
select {
|
||||||
case <-s.closeCh:
|
case <-s.closeCh:
|
||||||
|
_ = segment.Close()
|
||||||
return nil
|
return nil
|
||||||
case s.merges <- sm:
|
case s.merges <- sm:
|
||||||
}
|
}
|
||||||
|
|
|
@ -197,17 +197,9 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// defer fsync of the rootbolt
|
// defer rollback on error
|
||||||
defer func() {
|
defer func() {
|
||||||
if err == nil {
|
if err != nil {
|
||||||
err = s.rootBolt.Sync()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
// defer commit/rollback transaction
|
|
||||||
defer func() {
|
|
||||||
if err == nil {
|
|
||||||
err = tx.Commit()
|
|
||||||
} else {
|
|
||||||
_ = tx.Rollback()
|
_ = tx.Rollback()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -241,18 +233,18 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) {
|
||||||
// first ensure that each segment in this snapshot has been persisted
|
// first ensure that each segment in this snapshot has been persisted
|
||||||
for _, segmentSnapshot := range snapshot.segment {
|
for _, segmentSnapshot := range snapshot.segment {
|
||||||
snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id)
|
snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id)
|
||||||
snapshotSegmentBucket, err2 := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey)
|
snapshotSegmentBucket, err := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey)
|
||||||
if err2 != nil {
|
if err != nil {
|
||||||
return err2
|
return err
|
||||||
}
|
}
|
||||||
switch seg := segmentSnapshot.segment.(type) {
|
switch seg := segmentSnapshot.segment.(type) {
|
||||||
case *zap.SegmentBase:
|
case *zap.SegmentBase:
|
||||||
// need to persist this to disk
|
// need to persist this to disk
|
||||||
filename := zapFileName(segmentSnapshot.id)
|
filename := zapFileName(segmentSnapshot.id)
|
||||||
path := s.path + string(os.PathSeparator) + filename
|
path := s.path + string(os.PathSeparator) + filename
|
||||||
err2 := zap.PersistSegmentBase(seg, path)
|
err = zap.PersistSegmentBase(seg, path)
|
||||||
if err2 != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error persisting segment: %v", err2)
|
return fmt.Errorf("error persisting segment: %v", err)
|
||||||
}
|
}
|
||||||
newSegmentPaths[segmentSnapshot.id] = path
|
newSegmentPaths[segmentSnapshot.id] = path
|
||||||
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
|
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
|
||||||
|
@ -295,14 +287,18 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) {
|
||||||
if len(newSegmentPaths) > 0 {
|
if len(newSegmentPaths) > 0 {
|
||||||
// now try to open all the new snapshots
|
// now try to open all the new snapshots
|
||||||
newSegments := make(map[uint64]segment.Segment)
|
newSegments := make(map[uint64]segment.Segment)
|
||||||
|
defer func() {
|
||||||
|
for _, s := range newSegments {
|
||||||
|
if s != nil {
|
||||||
|
// cleanup segments that were opened but not
|
||||||
|
// swapped into the new root
|
||||||
|
_ = s.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
for segmentID, path := range newSegmentPaths {
|
for segmentID, path := range newSegmentPaths {
|
||||||
newSegments[segmentID], err = zap.Open(path)
|
newSegments[segmentID], err = zap.Open(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
for _, s := range newSegments {
|
|
||||||
if s != nil {
|
|
||||||
_ = s.Close() // cleanup segments that were successfully opened
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return fmt.Errorf("error opening new segment at %s, %v", path, err)
|
return fmt.Errorf("error opening new segment at %s, %v", path, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -327,6 +323,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) {
|
||||||
cachedDocs: segmentSnapshot.cachedDocs,
|
cachedDocs: segmentSnapshot.cachedDocs,
|
||||||
}
|
}
|
||||||
newIndexSnapshot.segment[i] = newSegmentSnapshot
|
newIndexSnapshot.segment[i] = newSegmentSnapshot
|
||||||
|
delete(newSegments, segmentSnapshot.id)
|
||||||
// update items persisted incase of a new segment snapshot
|
// update items persisted incase of a new segment snapshot
|
||||||
atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count())
|
atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count())
|
||||||
} else {
|
} else {
|
||||||
|
@ -346,7 +343,19 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) {
|
||||||
_ = rootPrev.DecRef()
|
_ = rootPrev.DecRef()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// allow files to become eligible for removal
|
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.rootBolt.Sync()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// allow files to become eligible for removal after commit, such
|
||||||
|
// as file segments from snapshots that came from the merger
|
||||||
s.rootLock.Lock()
|
s.rootLock.Lock()
|
||||||
for _, filename := range filenames {
|
for _, filename := range filenames {
|
||||||
delete(s.ineligibleForRemoval, filename)
|
delete(s.ineligibleForRemoval, filename)
|
||||||
|
|
|
@ -38,29 +38,41 @@ func (d *Dictionary) PostingsList(term string, except *roaring.Bitmap) (segment.
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap, rv *PostingsList) (*PostingsList, error) {
|
func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap, rv *PostingsList) (*PostingsList, error) {
|
||||||
|
if d.fst == nil {
|
||||||
|
return d.postingsListInit(rv, except), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
postingsOffset, exists, err := d.fst.Get(term)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("vellum err: %v", err)
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
return d.postingsListInit(rv, except), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return d.postingsListFromOffset(postingsOffset, except, rv)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Dictionary) postingsListFromOffset(postingsOffset uint64, except *roaring.Bitmap, rv *PostingsList) (*PostingsList, error) {
|
||||||
|
rv = d.postingsListInit(rv, except)
|
||||||
|
|
||||||
|
err := rv.read(postingsOffset, d)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rv, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Dictionary) postingsListInit(rv *PostingsList, except *roaring.Bitmap) *PostingsList {
|
||||||
if rv == nil {
|
if rv == nil {
|
||||||
rv = &PostingsList{}
|
rv = &PostingsList{}
|
||||||
} else {
|
} else {
|
||||||
*rv = PostingsList{} // clear the struct
|
*rv = PostingsList{} // clear the struct
|
||||||
}
|
}
|
||||||
rv.sb = d.sb
|
rv.sb = d.sb
|
||||||
rv.term = term
|
|
||||||
rv.except = except
|
rv.except = except
|
||||||
|
return rv
|
||||||
if d.fst != nil {
|
|
||||||
postingsOffset, exists, err := d.fst.Get(term)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("vellum err: %v", err)
|
|
||||||
}
|
|
||||||
if exists {
|
|
||||||
err = rv.read(postingsOffset, d)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return rv, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterator returns an iterator for this dictionary
|
// Iterator returns an iterator for this dictionary
|
||||||
|
|
|
@ -0,0 +1,124 @@
|
||||||
|
// Copyright (c) 2018 Couchbase, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package zap
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
|
||||||
|
"github.com/couchbase/vellum"
|
||||||
|
)
|
||||||
|
|
||||||
|
// enumerator provides an ordered traversal of multiple vellum
|
||||||
|
// iterators. Like JOIN of iterators, the enumerator produces a
|
||||||
|
// sequence of (key, iteratorIndex, value) tuples, sorted by key ASC,
|
||||||
|
// then iteratorIndex ASC, where the same key might be seen or
|
||||||
|
// repeated across multiple child iterators.
|
||||||
|
type enumerator struct {
|
||||||
|
itrs []vellum.Iterator
|
||||||
|
currKs [][]byte
|
||||||
|
currVs []uint64
|
||||||
|
|
||||||
|
lowK []byte
|
||||||
|
lowIdxs []int
|
||||||
|
lowCurr int
|
||||||
|
}
|
||||||
|
|
||||||
|
// newEnumerator returns a new enumerator over the vellum Iterators
|
||||||
|
func newEnumerator(itrs []vellum.Iterator) (*enumerator, error) {
|
||||||
|
rv := &enumerator{
|
||||||
|
itrs: itrs,
|
||||||
|
currKs: make([][]byte, len(itrs)),
|
||||||
|
currVs: make([]uint64, len(itrs)),
|
||||||
|
lowIdxs: make([]int, 0, len(itrs)),
|
||||||
|
}
|
||||||
|
for i, itr := range rv.itrs {
|
||||||
|
rv.currKs[i], rv.currVs[i] = itr.Current()
|
||||||
|
}
|
||||||
|
rv.updateMatches()
|
||||||
|
if rv.lowK == nil {
|
||||||
|
return rv, vellum.ErrIteratorDone
|
||||||
|
}
|
||||||
|
return rv, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateMatches maintains the low key matches based on the currKs
|
||||||
|
func (m *enumerator) updateMatches() {
|
||||||
|
m.lowK = nil
|
||||||
|
m.lowIdxs = m.lowIdxs[:0]
|
||||||
|
m.lowCurr = 0
|
||||||
|
|
||||||
|
for i, key := range m.currKs {
|
||||||
|
if key == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
cmp := bytes.Compare(key, m.lowK)
|
||||||
|
if cmp < 0 || m.lowK == nil {
|
||||||
|
// reached a new low
|
||||||
|
m.lowK = key
|
||||||
|
m.lowIdxs = m.lowIdxs[:0]
|
||||||
|
m.lowIdxs = append(m.lowIdxs, i)
|
||||||
|
} else if cmp == 0 {
|
||||||
|
m.lowIdxs = append(m.lowIdxs, i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Current returns the enumerator's current key, iterator-index, and
|
||||||
|
// value. If the enumerator is not pointing at a valid value (because
|
||||||
|
// Next returned an error previously), Current will return nil,0,0.
|
||||||
|
func (m *enumerator) Current() ([]byte, int, uint64) {
|
||||||
|
var i int
|
||||||
|
var v uint64
|
||||||
|
if m.lowCurr < len(m.lowIdxs) {
|
||||||
|
i = m.lowIdxs[m.lowCurr]
|
||||||
|
v = m.currVs[i]
|
||||||
|
}
|
||||||
|
return m.lowK, i, v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next advances the enumerator to the next key/iterator/value result,
|
||||||
|
// else vellum.ErrIteratorDone is returned.
|
||||||
|
func (m *enumerator) Next() error {
|
||||||
|
m.lowCurr += 1
|
||||||
|
if m.lowCurr >= len(m.lowIdxs) {
|
||||||
|
// move all the current low iterators forwards
|
||||||
|
for _, vi := range m.lowIdxs {
|
||||||
|
err := m.itrs[vi].Next()
|
||||||
|
if err != nil && err != vellum.ErrIteratorDone {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m.currKs[vi], m.currVs[vi] = m.itrs[vi].Current()
|
||||||
|
}
|
||||||
|
m.updateMatches()
|
||||||
|
}
|
||||||
|
if m.lowK == nil {
|
||||||
|
return vellum.ErrIteratorDone
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close all the underlying Iterators. The first error, if any, will
|
||||||
|
// be returned.
|
||||||
|
func (m *enumerator) Close() error {
|
||||||
|
var rv error
|
||||||
|
for _, itr := range m.itrs {
|
||||||
|
err := itr.Close()
|
||||||
|
if rv == nil {
|
||||||
|
rv = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rv
|
||||||
|
}
|
|
@ -0,0 +1,233 @@
|
||||||
|
// Copyright (c) 2018 Couchbase, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
// implied. See the License for the specific language governing
|
||||||
|
// permissions and limitations under the License.
|
||||||
|
|
||||||
|
package zap
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/couchbase/vellum"
|
||||||
|
)
|
||||||
|
|
||||||
|
type enumTestEntry struct {
|
||||||
|
key string
|
||||||
|
val uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type enumTestWant struct {
|
||||||
|
key string
|
||||||
|
idx int
|
||||||
|
val uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEnumerator(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
desc string
|
||||||
|
in [][]enumTestEntry
|
||||||
|
want []enumTestWant
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "two non-empty enumerators with no duplicate keys",
|
||||||
|
in: [][]enumTestEntry{
|
||||||
|
[]enumTestEntry{
|
||||||
|
{"a", 1},
|
||||||
|
{"c", 3},
|
||||||
|
{"e", 5},
|
||||||
|
},
|
||||||
|
[]enumTestEntry{
|
||||||
|
{"b", 2},
|
||||||
|
{"d", 4},
|
||||||
|
{"f", 6},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: []enumTestWant{
|
||||||
|
{"a", 0, 1},
|
||||||
|
{"b", 1, 2},
|
||||||
|
{"c", 0, 3},
|
||||||
|
{"d", 1, 4},
|
||||||
|
{"e", 0, 5},
|
||||||
|
{"f", 1, 6},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "two non-empty enumerators with duplicate keys",
|
||||||
|
in: [][]enumTestEntry{
|
||||||
|
[]enumTestEntry{
|
||||||
|
{"a", 1},
|
||||||
|
{"c", 3},
|
||||||
|
{"e", 5},
|
||||||
|
},
|
||||||
|
[]enumTestEntry{
|
||||||
|
{"a", 2},
|
||||||
|
{"c", 4},
|
||||||
|
{"e", 6},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: []enumTestWant{
|
||||||
|
{"a", 0, 1},
|
||||||
|
{"a", 1, 2},
|
||||||
|
{"c", 0, 3},
|
||||||
|
{"c", 1, 4},
|
||||||
|
{"e", 0, 5},
|
||||||
|
{"e", 1, 6},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "first iterator is empty",
|
||||||
|
in: [][]enumTestEntry{
|
||||||
|
[]enumTestEntry{},
|
||||||
|
[]enumTestEntry{
|
||||||
|
{"a", 2},
|
||||||
|
{"c", 4},
|
||||||
|
{"e", 6},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: []enumTestWant{
|
||||||
|
{"a", 1, 2},
|
||||||
|
{"c", 1, 4},
|
||||||
|
{"e", 1, 6},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "last iterator is empty",
|
||||||
|
in: [][]enumTestEntry{
|
||||||
|
[]enumTestEntry{
|
||||||
|
{"a", 1},
|
||||||
|
{"c", 3},
|
||||||
|
{"e", 5},
|
||||||
|
},
|
||||||
|
[]enumTestEntry{},
|
||||||
|
},
|
||||||
|
want: []enumTestWant{
|
||||||
|
{"a", 0, 1},
|
||||||
|
{"c", 0, 3},
|
||||||
|
{"e", 0, 5},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "two different length enumerators with duplicate keys",
|
||||||
|
in: [][]enumTestEntry{
|
||||||
|
[]enumTestEntry{
|
||||||
|
{"a", 1},
|
||||||
|
{"c", 3},
|
||||||
|
{"e", 5},
|
||||||
|
},
|
||||||
|
[]enumTestEntry{
|
||||||
|
{"a", 2},
|
||||||
|
{"b", 4},
|
||||||
|
{"d", 1000},
|
||||||
|
{"e", 6},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: []enumTestWant{
|
||||||
|
{"a", 0, 1},
|
||||||
|
{"a", 1, 2},
|
||||||
|
{"b", 1, 4},
|
||||||
|
{"c", 0, 3},
|
||||||
|
{"d", 1, 1000},
|
||||||
|
{"e", 0, 5},
|
||||||
|
{"e", 1, 6},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
var itrs []vellum.Iterator
|
||||||
|
for _, entries := range test.in {
|
||||||
|
itrs = append(itrs, &testIterator{entries: entries})
|
||||||
|
}
|
||||||
|
|
||||||
|
enumerator, err := newEnumerator(itrs)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%s - expected no err on newNumerator, got: %v", test.desc, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wanti := 0
|
||||||
|
for wanti < len(test.want) {
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%s - wanted no err, got: %v", test.desc, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
currK, currIdx, currV := enumerator.Current()
|
||||||
|
|
||||||
|
want := test.want[wanti]
|
||||||
|
if want.key != string(currK) {
|
||||||
|
t.Fatalf("%s - wrong key, wanted: %#v, got: %q, %d, %d", test.desc,
|
||||||
|
want, currK, currIdx, currV)
|
||||||
|
}
|
||||||
|
if want.idx != currIdx {
|
||||||
|
t.Fatalf("%s - wrong idx, wanted: %#v, got: %q, %d, %d", test.desc,
|
||||||
|
want, currK, currIdx, currV)
|
||||||
|
}
|
||||||
|
if want.val != currV {
|
||||||
|
t.Fatalf("%s - wrong val, wanted: %#v, got: %q, %d, %d", test.desc,
|
||||||
|
want, currK, currIdx, currV)
|
||||||
|
}
|
||||||
|
|
||||||
|
wanti += 1
|
||||||
|
|
||||||
|
err = enumerator.Next()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != vellum.ErrIteratorDone {
|
||||||
|
t.Fatalf("%s - expected ErrIteratorDone, got: %v", test.desc, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = enumerator.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%s - expected nil err on close, got: %v", test.desc, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, itr := range itrs {
|
||||||
|
if itr.(*testIterator).curr != 654321 {
|
||||||
|
t.Fatalf("%s - expected child iter to be closed", test.desc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type testIterator struct {
|
||||||
|
entries []enumTestEntry
|
||||||
|
curr int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testIterator) Current() ([]byte, uint64) {
|
||||||
|
if m.curr >= len(m.entries) {
|
||||||
|
return nil, 0
|
||||||
|
}
|
||||||
|
return []byte(m.entries[m.curr].key), m.entries[m.curr].val
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testIterator) Next() error {
|
||||||
|
m.curr++
|
||||||
|
if m.curr >= len(m.entries) {
|
||||||
|
return vellum.ErrIteratorDone
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testIterator) Seek(key []byte) error {
|
||||||
|
return fmt.Errorf("not implemented for enumerator unit tests")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testIterator) Reset(f *vellum.FST,
|
||||||
|
startKeyInclusive, endKeyExclusive []byte, aut vellum.Automaton) error {
|
||||||
|
return fmt.Errorf("not implemented for enumerator unit tests")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testIterator) Close() error {
|
||||||
|
m.curr = 654321
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -154,8 +154,8 @@ func computeNewDocCount(segments []*SegmentBase, drops []*roaring.Bitmap) uint64
|
||||||
return newDocCount
|
return newDocCount
|
||||||
}
|
}
|
||||||
|
|
||||||
func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap,
|
func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
|
||||||
fieldsInv []string, fieldsMap map[string]uint16, newDocNums [][]uint64,
|
fieldsInv []string, fieldsMap map[string]uint16, newDocNumsIn [][]uint64,
|
||||||
newSegDocCount uint64, chunkFactor uint32,
|
newSegDocCount uint64, chunkFactor uint32,
|
||||||
w *CountHashWriter) ([]uint64, uint64, error) {
|
w *CountHashWriter) ([]uint64, uint64, error) {
|
||||||
|
|
||||||
|
@ -164,10 +164,10 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap,
|
||||||
var bufLoc []uint64
|
var bufLoc []uint64
|
||||||
|
|
||||||
var postings *PostingsList
|
var postings *PostingsList
|
||||||
|
var postItr *PostingsIterator
|
||||||
|
|
||||||
rv := make([]uint64, len(fieldsInv))
|
rv := make([]uint64, len(fieldsInv))
|
||||||
fieldDvLocs := make([]uint64, len(fieldsInv))
|
fieldDvLocs := make([]uint64, len(fieldsInv))
|
||||||
fieldDvLocsOffset := uint64(fieldNotUninverted)
|
|
||||||
|
|
||||||
// docTermMap is keyed by docNum, where the array impl provides
|
// docTermMap is keyed by docNum, where the array impl provides
|
||||||
// better memory usage behavior than a sparse-friendlier hashmap
|
// better memory usage behavior than a sparse-friendlier hashmap
|
||||||
|
@ -187,36 +187,31 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap,
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// collect FST iterators from all segments for this field
|
// collect FST iterators from all active segments for this field
|
||||||
|
var newDocNums [][]uint64
|
||||||
|
var drops []*roaring.Bitmap
|
||||||
var dicts []*Dictionary
|
var dicts []*Dictionary
|
||||||
var itrs []vellum.Iterator
|
var itrs []vellum.Iterator
|
||||||
for _, segment := range segments {
|
|
||||||
|
for segmentI, segment := range segments {
|
||||||
dict, err2 := segment.dictionary(fieldName)
|
dict, err2 := segment.dictionary(fieldName)
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
return nil, 0, err2
|
return nil, 0, err2
|
||||||
}
|
}
|
||||||
dicts = append(dicts, dict)
|
|
||||||
|
|
||||||
if dict != nil && dict.fst != nil {
|
if dict != nil && dict.fst != nil {
|
||||||
itr, err2 := dict.fst.Iterator(nil, nil)
|
itr, err2 := dict.fst.Iterator(nil, nil)
|
||||||
if err2 != nil && err2 != vellum.ErrIteratorDone {
|
if err2 != nil && err2 != vellum.ErrIteratorDone {
|
||||||
return nil, 0, err2
|
return nil, 0, err2
|
||||||
}
|
}
|
||||||
if itr != nil {
|
if itr != nil {
|
||||||
|
newDocNums = append(newDocNums, newDocNumsIn[segmentI])
|
||||||
|
drops = append(drops, dropsIn[segmentI])
|
||||||
|
dicts = append(dicts, dict)
|
||||||
itrs = append(itrs, itr)
|
itrs = append(itrs, itr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// create merging iterator
|
|
||||||
mergeItr, err := vellum.NewMergeIterator(itrs, func(postingOffsets []uint64) uint64 {
|
|
||||||
// we don't actually use the merged value
|
|
||||||
return 0
|
|
||||||
})
|
|
||||||
|
|
||||||
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
|
|
||||||
locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
|
|
||||||
|
|
||||||
if uint64(cap(docTermMap)) < newSegDocCount {
|
if uint64(cap(docTermMap)) < newSegDocCount {
|
||||||
docTermMap = make([][]byte, newSegDocCount)
|
docTermMap = make([][]byte, newSegDocCount)
|
||||||
} else {
|
} else {
|
||||||
|
@ -226,71 +221,17 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for err == nil {
|
var prevTerm []byte
|
||||||
term, _ := mergeItr.Current()
|
|
||||||
|
|
||||||
newRoaring := roaring.NewBitmap()
|
newRoaring := roaring.NewBitmap()
|
||||||
newRoaringLocs := roaring.NewBitmap()
|
newRoaringLocs := roaring.NewBitmap()
|
||||||
|
|
||||||
tfEncoder.Reset()
|
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
|
||||||
locEncoder.Reset()
|
locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
|
||||||
|
|
||||||
// now go back and get posting list for this term
|
finishTerm := func(term []byte) error {
|
||||||
// but pass in the deleted docs for that segment
|
if term == nil {
|
||||||
for dictI, dict := range dicts {
|
return nil
|
||||||
if dict == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var err2 error
|
|
||||||
postings, err2 = dict.postingsList(term, drops[dictI], postings)
|
|
||||||
if err2 != nil {
|
|
||||||
return nil, 0, err2
|
|
||||||
}
|
|
||||||
|
|
||||||
postItr := postings.Iterator()
|
|
||||||
next, err2 := postItr.Next()
|
|
||||||
for next != nil && err2 == nil {
|
|
||||||
hitNewDocNum := newDocNums[dictI][next.Number()]
|
|
||||||
if hitNewDocNum == docDropped {
|
|
||||||
return nil, 0, fmt.Errorf("see hit with dropped doc num")
|
|
||||||
}
|
|
||||||
newRoaring.Add(uint32(hitNewDocNum))
|
|
||||||
// encode norm bits
|
|
||||||
norm := next.Norm()
|
|
||||||
normBits := math.Float32bits(float32(norm))
|
|
||||||
err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits))
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
locs := next.Locations()
|
|
||||||
if len(locs) > 0 {
|
|
||||||
newRoaringLocs.Add(uint32(hitNewDocNum))
|
|
||||||
for _, loc := range locs {
|
|
||||||
if cap(bufLoc) < 5+len(loc.ArrayPositions()) {
|
|
||||||
bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions()))
|
|
||||||
}
|
|
||||||
args := bufLoc[0:5]
|
|
||||||
args[0] = uint64(fieldsMap[loc.Field()])
|
|
||||||
args[1] = loc.Pos()
|
|
||||||
args[2] = loc.Start()
|
|
||||||
args[3] = loc.End()
|
|
||||||
args[4] = uint64(len(loc.ArrayPositions()))
|
|
||||||
args = append(args, loc.ArrayPositions()...)
|
|
||||||
err = locEncoder.Add(hitNewDocNum, args...)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
docTermMap[hitNewDocNum] =
|
|
||||||
append(append(docTermMap[hitNewDocNum], term...), termSeparator)
|
|
||||||
|
|
||||||
next, err2 = postItr.Next()
|
|
||||||
}
|
|
||||||
if err2 != nil {
|
|
||||||
return nil, 0, err2
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tfEncoder.Close()
|
tfEncoder.Close()
|
||||||
|
@ -299,59 +240,140 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap,
|
||||||
if newRoaring.GetCardinality() > 0 {
|
if newRoaring.GetCardinality() > 0 {
|
||||||
// this field/term actually has hits in the new segment, lets write it down
|
// this field/term actually has hits in the new segment, lets write it down
|
||||||
freqOffset := uint64(w.Count())
|
freqOffset := uint64(w.Count())
|
||||||
_, err = tfEncoder.Write(w)
|
_, err := tfEncoder.Write(w)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return err
|
||||||
}
|
}
|
||||||
locOffset := uint64(w.Count())
|
locOffset := uint64(w.Count())
|
||||||
_, err = locEncoder.Write(w)
|
_, err = locEncoder.Write(w)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return err
|
||||||
}
|
}
|
||||||
postingLocOffset := uint64(w.Count())
|
postingLocOffset := uint64(w.Count())
|
||||||
_, err = writeRoaringWithLen(newRoaringLocs, w, &bufReuse, bufMaxVarintLen64)
|
_, err = writeRoaringWithLen(newRoaringLocs, w, &bufReuse, bufMaxVarintLen64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return err
|
||||||
}
|
}
|
||||||
postingOffset := uint64(w.Count())
|
postingOffset := uint64(w.Count())
|
||||||
|
|
||||||
// write out the start of the term info
|
// write out the start of the term info
|
||||||
buf := bufMaxVarintLen64
|
n := binary.PutUvarint(bufMaxVarintLen64, freqOffset)
|
||||||
n := binary.PutUvarint(buf, freqOffset)
|
_, err = w.Write(bufMaxVarintLen64[:n])
|
||||||
_, err = w.Write(buf[:n])
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// write out the start of the loc info
|
// write out the start of the loc info
|
||||||
n = binary.PutUvarint(buf, locOffset)
|
n = binary.PutUvarint(bufMaxVarintLen64, locOffset)
|
||||||
_, err = w.Write(buf[:n])
|
_, err = w.Write(bufMaxVarintLen64[:n])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return err
|
||||||
}
|
}
|
||||||
|
// write out the start of the posting locs
|
||||||
// write out the start of the loc posting list
|
n = binary.PutUvarint(bufMaxVarintLen64, postingLocOffset)
|
||||||
n = binary.PutUvarint(buf, postingLocOffset)
|
_, err = w.Write(bufMaxVarintLen64[:n])
|
||||||
_, err = w.Write(buf[:n])
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return err
|
||||||
}
|
}
|
||||||
_, err = writeRoaringWithLen(newRoaring, w, &bufReuse, bufMaxVarintLen64)
|
_, err = writeRoaringWithLen(newRoaring, w, &bufReuse, bufMaxVarintLen64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = newVellum.Insert(term, postingOffset)
|
err = newVellum.Insert(term, postingOffset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = mergeItr.Next()
|
newRoaring = roaring.NewBitmap()
|
||||||
|
newRoaringLocs = roaring.NewBitmap()
|
||||||
|
|
||||||
|
tfEncoder.Reset()
|
||||||
|
locEncoder.Reset()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
enumerator, err := newEnumerator(itrs)
|
||||||
|
|
||||||
|
for err == nil {
|
||||||
|
term, itrI, postingsOffset := enumerator.Current()
|
||||||
|
|
||||||
|
if !bytes.Equal(prevTerm, term) {
|
||||||
|
// if the term changed, write out the info collected
|
||||||
|
// for the previous term
|
||||||
|
err2 := finishTerm(prevTerm)
|
||||||
|
if err2 != nil {
|
||||||
|
return nil, 0, err2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var err2 error
|
||||||
|
postings, err2 = dicts[itrI].postingsListFromOffset(
|
||||||
|
postingsOffset, drops[itrI], postings)
|
||||||
|
if err2 != nil {
|
||||||
|
return nil, 0, err2
|
||||||
|
}
|
||||||
|
|
||||||
|
postItr = postings.iterator(postItr)
|
||||||
|
next, err2 := postItr.Next()
|
||||||
|
for next != nil && err2 == nil {
|
||||||
|
hitNewDocNum := newDocNums[itrI][next.Number()]
|
||||||
|
if hitNewDocNum == docDropped {
|
||||||
|
return nil, 0, fmt.Errorf("see hit with dropped doc num")
|
||||||
|
}
|
||||||
|
newRoaring.Add(uint32(hitNewDocNum))
|
||||||
|
// encode norm bits
|
||||||
|
norm := next.Norm()
|
||||||
|
normBits := math.Float32bits(float32(norm))
|
||||||
|
err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits))
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
locs := next.Locations()
|
||||||
|
if len(locs) > 0 {
|
||||||
|
newRoaringLocs.Add(uint32(hitNewDocNum))
|
||||||
|
for _, loc := range locs {
|
||||||
|
if cap(bufLoc) < 5+len(loc.ArrayPositions()) {
|
||||||
|
bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions()))
|
||||||
|
}
|
||||||
|
args := bufLoc[0:5]
|
||||||
|
args[0] = uint64(fieldsMap[loc.Field()])
|
||||||
|
args[1] = loc.Pos()
|
||||||
|
args[2] = loc.Start()
|
||||||
|
args[3] = loc.End()
|
||||||
|
args[4] = uint64(len(loc.ArrayPositions()))
|
||||||
|
args = append(args, loc.ArrayPositions()...)
|
||||||
|
err = locEncoder.Add(hitNewDocNum, args...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
docTermMap[hitNewDocNum] =
|
||||||
|
append(append(docTermMap[hitNewDocNum], term...), termSeparator)
|
||||||
|
|
||||||
|
next, err2 = postItr.Next()
|
||||||
|
}
|
||||||
|
if err2 != nil {
|
||||||
|
return nil, 0, err2
|
||||||
|
}
|
||||||
|
|
||||||
|
prevTerm = prevTerm[:0] // copy to prevTerm in case Next() reuses term mem
|
||||||
|
prevTerm = append(prevTerm, term...)
|
||||||
|
|
||||||
|
err = enumerator.Next()
|
||||||
}
|
}
|
||||||
if err != nil && err != vellum.ErrIteratorDone {
|
if err != nil && err != vellum.ErrIteratorDone {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = finishTerm(prevTerm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
dictOffset := uint64(w.Count())
|
dictOffset := uint64(w.Count())
|
||||||
|
|
||||||
err = newVellum.Close()
|
err = newVellum.Close()
|
||||||
|
@ -400,7 +422,7 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fieldDvLocsOffset = uint64(w.Count())
|
fieldDvLocsOffset := uint64(w.Count())
|
||||||
|
|
||||||
buf := bufMaxVarintLen64
|
buf := bufMaxVarintLen64
|
||||||
for _, offset := range fieldDvLocs {
|
for _, offset := range fieldDvLocs {
|
||||||
|
|
|
@ -28,21 +28,27 @@ import (
|
||||||
// PostingsList is an in-memory represenation of a postings list
|
// PostingsList is an in-memory represenation of a postings list
|
||||||
type PostingsList struct {
|
type PostingsList struct {
|
||||||
sb *SegmentBase
|
sb *SegmentBase
|
||||||
term []byte
|
|
||||||
postingsOffset uint64
|
postingsOffset uint64
|
||||||
freqOffset uint64
|
freqOffset uint64
|
||||||
locOffset uint64
|
locOffset uint64
|
||||||
locBitmap *roaring.Bitmap
|
locBitmap *roaring.Bitmap
|
||||||
postings *roaring.Bitmap
|
postings *roaring.Bitmap
|
||||||
except *roaring.Bitmap
|
except *roaring.Bitmap
|
||||||
postingKey []byte
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterator returns an iterator for this postings list
|
// Iterator returns an iterator for this postings list
|
||||||
func (p *PostingsList) Iterator() segment.PostingsIterator {
|
func (p *PostingsList) Iterator() segment.PostingsIterator {
|
||||||
rv := &PostingsIterator{
|
return p.iterator(nil)
|
||||||
postings: p,
|
}
|
||||||
|
|
||||||
|
func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator {
|
||||||
|
if rv == nil {
|
||||||
|
rv = &PostingsIterator{}
|
||||||
|
} else {
|
||||||
|
*rv = PostingsIterator{} // clear the struct
|
||||||
}
|
}
|
||||||
|
rv.postings = p
|
||||||
|
|
||||||
if p.postings != nil {
|
if p.postings != nil {
|
||||||
// prepare the freq chunk details
|
// prepare the freq chunk details
|
||||||
var n uint64
|
var n uint64
|
||||||
|
|
Loading…
Reference in New Issue