scorch zap DictIterator term count fixed and more merge unit tests
The zap DictionaryIterator Next() was incorrectly returning the postingsList offset as the term count. As part of this, refactored out a PostingsList.read() helper method. Also added more merge unit test scenarios, including merging a segment for a few rounds to see if there are differences before/after merging.
This commit is contained in:
parent
a3b125508b
commit
684ee3c0e7
|
@ -15,7 +15,6 @@
|
|||
package zap
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
|
@ -51,43 +50,10 @@ func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap) (*Posting
|
|||
return nil, fmt.Errorf("vellum err: %v", err)
|
||||
}
|
||||
if exists {
|
||||
rv.postingsOffset = postingsOffset
|
||||
// read the location of the freq/norm details
|
||||
var n uint64
|
||||
var read int
|
||||
|
||||
rv.freqOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
rv.locOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
|
||||
var locBitmapOffset uint64
|
||||
locBitmapOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
|
||||
// go ahead and load loc bitmap
|
||||
var locBitmapLen uint64
|
||||
locBitmapLen, read = binary.Uvarint(d.sb.mem[locBitmapOffset : locBitmapOffset+binary.MaxVarintLen64])
|
||||
locRoaringBytes := d.sb.mem[locBitmapOffset+uint64(read) : locBitmapOffset+uint64(read)+locBitmapLen]
|
||||
rv.locBitmap = roaring.NewBitmap()
|
||||
_, err := rv.locBitmap.FromBuffer(locRoaringBytes)
|
||||
err = rv.read(postingsOffset, d)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error loading roaring bitmap of locations with hits: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var postingsLen uint64
|
||||
postingsLen, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
|
||||
roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen]
|
||||
|
||||
bitmap := roaring.NewBitmap()
|
||||
_, err = bitmap.FromBuffer(roaringBytes)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error loading roaring bitmap: %v", err)
|
||||
}
|
||||
|
||||
rv.postings = bitmap
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -160,6 +126,7 @@ type DictionaryIterator struct {
|
|||
d *Dictionary
|
||||
itr vellum.Iterator
|
||||
err error
|
||||
tmp PostingsList
|
||||
}
|
||||
|
||||
// Next returns the next entry in the dictionary
|
||||
|
@ -169,10 +136,14 @@ func (i *DictionaryIterator) Next() (*index.DictEntry, error) {
|
|||
} else if i.err != nil {
|
||||
return nil, i.err
|
||||
}
|
||||
term, count := i.itr.Current()
|
||||
term, postingsOffset := i.itr.Current()
|
||||
i.err = i.tmp.read(postingsOffset, i.d)
|
||||
if i.err != nil {
|
||||
return nil, i.err
|
||||
}
|
||||
rv := &index.DictEntry{
|
||||
Term: string(term),
|
||||
Count: count,
|
||||
Count: i.tmp.Count(),
|
||||
}
|
||||
i.err = i.itr.Next()
|
||||
return rv, nil
|
||||
|
|
|
@ -15,7 +15,11 @@
|
|||
package zap
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
|
@ -72,9 +76,251 @@ func TestMerge(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
segm, err := Open("/tmp/scorch3.zap")
|
||||
if err != nil {
|
||||
t.Fatalf("error opening merged segment: %v", err)
|
||||
}
|
||||
seg3 := segm.(*Segment)
|
||||
defer func() {
|
||||
cerr := seg3.Close()
|
||||
if cerr != nil {
|
||||
t.Fatalf("error closing segment: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if seg3.Path() != "/tmp/scorch3.zap" {
|
||||
t.Fatalf("wrong path")
|
||||
}
|
||||
if seg3.Count() != 4 {
|
||||
t.Fatalf("wrong count")
|
||||
}
|
||||
if len(seg3.Fields()) != 5 {
|
||||
t.Fatalf("wrong # fields: %#v\n", seg3.Fields())
|
||||
}
|
||||
|
||||
testMergeWithSelf(t, seg3, 4)
|
||||
}
|
||||
|
||||
func testMergeWithSelf(t *testing.T, segCur *Segment, expectedCount uint64) {
|
||||
// trying merging the segment with itself for a few rounds
|
||||
var diffs []string
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
fname := fmt.Sprintf("scorch-self-%d.zap", i)
|
||||
|
||||
_ = os.RemoveAll("/tmp/" + fname)
|
||||
|
||||
segsToMerge := make([]*Segment, 1)
|
||||
segsToMerge[0] = segCur
|
||||
|
||||
_, err := Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/"+fname, 1024)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
segm, err := Open("/tmp/" + fname)
|
||||
if err != nil {
|
||||
t.Fatalf("error opening merged segment: %v", err)
|
||||
}
|
||||
segNew := segm.(*Segment)
|
||||
defer func(s *Segment) {
|
||||
cerr := s.Close()
|
||||
if cerr != nil {
|
||||
t.Fatalf("error closing segment: %v", err)
|
||||
}
|
||||
}(segNew)
|
||||
|
||||
if segNew.Count() != expectedCount {
|
||||
t.Fatalf("wrong count")
|
||||
}
|
||||
if len(segNew.Fields()) != 5 {
|
||||
t.Fatalf("wrong # fields: %#v\n", segNew.Fields())
|
||||
}
|
||||
|
||||
diff := compareSegments(segCur, segNew)
|
||||
if diff != "" {
|
||||
diffs = append(diffs, fname+" is different than previous:\n"+diff)
|
||||
}
|
||||
|
||||
segCur = segNew
|
||||
}
|
||||
|
||||
if len(diffs) > 0 {
|
||||
t.Errorf("mismatches after repeated self-merging: %v", strings.Join(diffs, "\n"))
|
||||
}
|
||||
}
|
||||
|
||||
func compareSegments(a, b *Segment) string {
|
||||
var rv []string
|
||||
|
||||
if a.Count() != b.Count() {
|
||||
return "counts"
|
||||
}
|
||||
|
||||
afields := append([]string(nil), a.Fields()...)
|
||||
bfields := append([]string(nil), b.Fields()...)
|
||||
sort.Strings(afields)
|
||||
sort.Strings(bfields)
|
||||
if !reflect.DeepEqual(afields, bfields) {
|
||||
return "fields"
|
||||
}
|
||||
|
||||
for _, fieldName := range afields {
|
||||
adict, err := a.Dictionary(fieldName)
|
||||
if err != nil {
|
||||
return fmt.Sprintf("adict err: %v", err)
|
||||
}
|
||||
bdict, err := b.Dictionary(fieldName)
|
||||
if err != nil {
|
||||
return fmt.Sprintf("bdict err: %v", err)
|
||||
}
|
||||
|
||||
if adict.(*Dictionary).fst.Len() != bdict.(*Dictionary).fst.Len() {
|
||||
rv = append(rv, fmt.Sprintf("field %s, dict fst Len()'s different: %v %v",
|
||||
fieldName, adict.(*Dictionary).fst.Len(), bdict.(*Dictionary).fst.Len()))
|
||||
}
|
||||
|
||||
aitr := adict.Iterator()
|
||||
bitr := bdict.Iterator()
|
||||
for {
|
||||
anext, aerr := aitr.Next()
|
||||
bnext, berr := bitr.Next()
|
||||
if aerr != berr {
|
||||
rv = append(rv, fmt.Sprintf("field %s, dict iterator Next() errors different: %v %v",
|
||||
fieldName, aerr, berr))
|
||||
break
|
||||
}
|
||||
if !reflect.DeepEqual(anext, bnext) {
|
||||
rv = append(rv, fmt.Sprintf("field %s, dict iterator Next() results different: %#v %#v",
|
||||
fieldName, anext, bnext))
|
||||
// keep going to try to see more diff details at the postingsList level
|
||||
}
|
||||
if aerr != nil || anext == nil ||
|
||||
berr != nil || bnext == nil {
|
||||
break
|
||||
}
|
||||
|
||||
for _, next := range []*index.DictEntry{anext, bnext} {
|
||||
if next == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
aplist, aerr := adict.(*Dictionary).postingsList([]byte(next.Term), nil)
|
||||
bplist, berr := bdict.(*Dictionary).postingsList([]byte(next.Term), nil)
|
||||
if aerr != berr {
|
||||
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsList() errors different: %v %v",
|
||||
fieldName, next.Term, aerr, berr))
|
||||
}
|
||||
|
||||
if (aplist != nil) != (bplist != nil) {
|
||||
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsList() results different: %v %v",
|
||||
fieldName, next.Term, aplist, bplist))
|
||||
break
|
||||
}
|
||||
|
||||
if aerr != nil || aplist == nil ||
|
||||
berr != nil || bplist == nil {
|
||||
break
|
||||
}
|
||||
|
||||
if aplist.Count() != bplist.Count() {
|
||||
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsList().Count()'s different: %v %v",
|
||||
fieldName, next.Term, aplist.Count(), bplist.Count()))
|
||||
}
|
||||
|
||||
apitr := aplist.Iterator()
|
||||
bpitr := bplist.Iterator()
|
||||
if (apitr != nil) != (bpitr != nil) {
|
||||
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsList.Iterator() results different: %v %v",
|
||||
fieldName, next.Term, apitr, bpitr))
|
||||
break
|
||||
}
|
||||
|
||||
for {
|
||||
apitrn, aerr := apitr.Next()
|
||||
bpitrn, aerr := bpitr.Next()
|
||||
if aerr != berr {
|
||||
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() errors different: %v %v",
|
||||
fieldName, next.Term, aerr, berr))
|
||||
}
|
||||
|
||||
if (apitrn != nil) != (bpitrn != nil) {
|
||||
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() results different: %v %v",
|
||||
fieldName, next.Term, apitrn, bpitrn))
|
||||
break
|
||||
}
|
||||
|
||||
if aerr != nil || apitrn == nil ||
|
||||
berr != nil || bpitrn == nil {
|
||||
break
|
||||
}
|
||||
|
||||
if apitrn.Number() != bpitrn.Number() {
|
||||
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() Number()'s different: %v %v",
|
||||
fieldName, next.Term, apitrn.Number(), bpitrn.Number()))
|
||||
}
|
||||
|
||||
if apitrn.Frequency() != bpitrn.Frequency() {
|
||||
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() Frequency()'s different: %v %v",
|
||||
fieldName, next.Term, apitrn.Frequency(), bpitrn.Frequency()))
|
||||
}
|
||||
|
||||
if apitrn.Norm() != bpitrn.Norm() {
|
||||
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() Norm()'s different: %v %v",
|
||||
fieldName, next.Term, apitrn.Norm(), bpitrn.Norm()))
|
||||
}
|
||||
|
||||
if len(apitrn.Locations()) != len(bpitrn.Locations()) {
|
||||
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() Locations() len's different: %v %v",
|
||||
fieldName, next.Term, len(apitrn.Locations()), len(bpitrn.Locations())))
|
||||
}
|
||||
|
||||
for loci, aloc := range apitrn.Locations() {
|
||||
bloc := bpitrn.Locations()[loci]
|
||||
|
||||
if (aloc != nil) != (bloc != nil) {
|
||||
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() loc different: %v %v",
|
||||
fieldName, next.Term, aloc, bloc))
|
||||
break
|
||||
}
|
||||
|
||||
if aloc.Field() != bloc.Field() ||
|
||||
aloc.Start() != bloc.Start() ||
|
||||
aloc.End() != bloc.End() ||
|
||||
aloc.Pos() != bloc.Pos() ||
|
||||
!reflect.DeepEqual(aloc.ArrayPositions(), bloc.ArrayPositions()) {
|
||||
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() loc details different: %v %v",
|
||||
fieldName, next.Term, aloc, bloc))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return strings.Join(rv, "\n")
|
||||
}
|
||||
|
||||
func TestMergeAndDrop(t *testing.T) {
|
||||
docsToDrop := make([]*roaring.Bitmap, 2)
|
||||
docsToDrop[0] = roaring.NewBitmap()
|
||||
docsToDrop[0].AddInt(1)
|
||||
docsToDrop[1] = roaring.NewBitmap()
|
||||
docsToDrop[1].AddInt(1)
|
||||
testMergeAndDrop(t, docsToDrop)
|
||||
}
|
||||
|
||||
func TestMergeAndDropAllFromOneSegment(t *testing.T) {
|
||||
docsToDrop := make([]*roaring.Bitmap, 2)
|
||||
docsToDrop[0] = roaring.NewBitmap()
|
||||
docsToDrop[0].AddInt(0)
|
||||
docsToDrop[0].AddInt(1)
|
||||
docsToDrop[1] = roaring.NewBitmap()
|
||||
testMergeAndDrop(t, docsToDrop)
|
||||
}
|
||||
|
||||
func testMergeAndDrop(t *testing.T, docsToDrop []*roaring.Bitmap) {
|
||||
_ = os.RemoveAll("/tmp/scorch.zap")
|
||||
_ = os.RemoveAll("/tmp/scorch2.zap")
|
||||
_ = os.RemoveAll("/tmp/scorch3.zap")
|
||||
|
@ -117,16 +363,30 @@ func TestMergeAndDrop(t *testing.T) {
|
|||
segsToMerge[0] = segment.(*Segment)
|
||||
segsToMerge[1] = segment2.(*Segment)
|
||||
|
||||
docsToDrop := make([]*roaring.Bitmap, 2)
|
||||
docsToDrop[0] = roaring.NewBitmap()
|
||||
docsToDrop[0].AddInt(1)
|
||||
docsToDrop[1] = roaring.NewBitmap()
|
||||
docsToDrop[1].AddInt(1)
|
||||
|
||||
_, err = Merge(segsToMerge, docsToDrop, "/tmp/scorch3.zap", 1024)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
segm, err := Open("/tmp/scorch3.zap")
|
||||
if err != nil {
|
||||
t.Fatalf("error opening merged segment: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
cerr := segm.Close()
|
||||
if cerr != nil {
|
||||
t.Fatalf("error closing segment: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if segm.Count() != 2 {
|
||||
t.Fatalf("wrong count, got: %d", segm.Count())
|
||||
}
|
||||
if len(segm.Fields()) != 5 {
|
||||
t.Fatalf("wrong # fields: %#v\n", segm.Fields())
|
||||
}
|
||||
|
||||
testMergeWithSelf(t, segm.(*Segment), 2)
|
||||
}
|
||||
|
||||
func buildMemSegmentMulti2() *mem.Segment {
|
||||
|
|
|
@ -98,6 +98,49 @@ func (p *PostingsList) Count() uint64 {
|
|||
return 0
|
||||
}
|
||||
|
||||
func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
|
||||
rv.postingsOffset = postingsOffset
|
||||
|
||||
// read the location of the freq/norm details
|
||||
var n uint64
|
||||
var read int
|
||||
|
||||
rv.freqOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
|
||||
rv.locOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
|
||||
var locBitmapOffset uint64
|
||||
locBitmapOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
|
||||
var locBitmapLen uint64
|
||||
locBitmapLen, read = binary.Uvarint(d.sb.mem[locBitmapOffset : locBitmapOffset+binary.MaxVarintLen64])
|
||||
|
||||
locRoaringBytes := d.sb.mem[locBitmapOffset+uint64(read) : locBitmapOffset+uint64(read)+locBitmapLen]
|
||||
|
||||
rv.locBitmap = roaring.NewBitmap()
|
||||
_, err := rv.locBitmap.FromBuffer(locRoaringBytes)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error loading roaring bitmap of locations with hits: %v", err)
|
||||
}
|
||||
|
||||
var postingsLen uint64
|
||||
postingsLen, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
|
||||
roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen]
|
||||
|
||||
rv.postings = roaring.NewBitmap()
|
||||
_, err = rv.postings.FromBuffer(roaringBytes)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error loading roaring bitmap: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PostingsIterator provides a way to iterate through the postings list
|
||||
type PostingsIterator struct {
|
||||
postings *PostingsList
|
||||
|
|
Loading…
Reference in New Issue