0
0
Fork 0

Merge pull request #747 from steveyen/master

scorch zap DictIterator term count fixed and more merge unit tests
This commit is contained in:
Steve Yen 2018-02-01 10:13:18 -08:00 committed by GitHub
commit 175f80403a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 525 additions and 55 deletions

View File

@ -15,7 +15,6 @@
package zap
import (
"encoding/binary"
"fmt"
"github.com/RoaringBitmap/roaring"
@ -51,43 +50,10 @@ func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap) (*Posting
return nil, fmt.Errorf("vellum err: %v", err)
}
if exists {
rv.postingsOffset = postingsOffset
// read the location of the freq/norm details
var n uint64
var read int
rv.freqOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+binary.MaxVarintLen64])
n += uint64(read)
rv.locOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
n += uint64(read)
var locBitmapOffset uint64
locBitmapOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
n += uint64(read)
// go ahead and load loc bitmap
var locBitmapLen uint64
locBitmapLen, read = binary.Uvarint(d.sb.mem[locBitmapOffset : locBitmapOffset+binary.MaxVarintLen64])
locRoaringBytes := d.sb.mem[locBitmapOffset+uint64(read) : locBitmapOffset+uint64(read)+locBitmapLen]
rv.locBitmap = roaring.NewBitmap()
_, err := rv.locBitmap.FromBuffer(locRoaringBytes)
err = rv.read(postingsOffset, d)
if err != nil {
return nil, fmt.Errorf("error loading roaring bitmap of locations with hits: %v", err)
return nil, err
}
var postingsLen uint64
postingsLen, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
n += uint64(read)
roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen]
bitmap := roaring.NewBitmap()
_, err = bitmap.FromBuffer(roaringBytes)
if err != nil {
return nil, fmt.Errorf("error loading roaring bitmap: %v", err)
}
rv.postings = bitmap
}
}
@ -160,6 +126,7 @@ type DictionaryIterator struct {
d *Dictionary
itr vellum.Iterator
err error
tmp PostingsList
}
// Next returns the next entry in the dictionary
@ -169,10 +136,14 @@ func (i *DictionaryIterator) Next() (*index.DictEntry, error) {
} else if i.err != nil {
return nil, i.err
}
term, count := i.itr.Current()
term, postingsOffset := i.itr.Current()
i.err = i.tmp.read(postingsOffset, i.d)
if i.err != nil {
return nil, i.err
}
rv := &index.DictEntry{
Term: string(term),
Count: count,
Count: i.tmp.Count(),
}
i.err = i.itr.Next()
return rv, nil

View File

@ -15,7 +15,11 @@
package zap
import (
"fmt"
"os"
"reflect"
"sort"
"strings"
"testing"
"github.com/RoaringBitmap/roaring"
@ -72,25 +76,56 @@ func TestMerge(t *testing.T) {
if err != nil {
t.Fatal(err)
}
segm, err := Open("/tmp/scorch3.zap")
if err != nil {
t.Fatalf("error opening merged segment: %v", err)
}
seg3 := segm.(*Segment)
defer func() {
cerr := seg3.Close()
if cerr != nil {
t.Fatalf("error closing segment: %v", err)
}
}()
if seg3.Path() != "/tmp/scorch3.zap" {
t.Fatalf("wrong path")
}
if seg3.Count() != 4 {
t.Fatalf("wrong count")
}
if len(seg3.Fields()) != 5 {
t.Fatalf("wrong # fields: %#v\n", seg3.Fields())
}
testMergeWithSelf(t, seg3, 4)
}
func TestMergeAndDrop(t *testing.T) {
func TestMergeWithEmptySegment(t *testing.T) {
testMergeWithEmptySegments(t, true, 1)
}
func TestMergeWithEmptySegments(t *testing.T) {
testMergeWithEmptySegments(t, true, 5)
}
func TestMergeWithEmptySegmentFirst(t *testing.T) {
testMergeWithEmptySegments(t, false, 1)
}
func TestMergeWithEmptySegmentsFirst(t *testing.T) {
testMergeWithEmptySegments(t, false, 5)
}
func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int) {
_ = os.RemoveAll("/tmp/scorch.zap")
_ = os.RemoveAll("/tmp/scorch2.zap")
_ = os.RemoveAll("/tmp/scorch3.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
if err != nil {
t.Fatal(err)
}
memSegment2 := buildMemSegmentMulti2()
err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024)
if err != nil {
t.Fatal(err)
}
segment, err := Open("/tmp/scorch.zap")
if err != nil {
t.Fatalf("error opening segment: %v", err)
@ -102,6 +137,319 @@ func TestMergeAndDrop(t *testing.T) {
}
}()
var segsToMerge []*Segment
if before {
segsToMerge = append(segsToMerge, segment.(*Segment))
}
for i := 0; i < numEmptySegments; i++ {
fname := fmt.Sprintf("scorch-empty-%d.zap", i)
_ = os.RemoveAll("/tmp/" + fname)
emptySegment := mem.NewFromAnalyzedDocs([]*index.AnalysisResult{})
err = PersistSegment(emptySegment, "/tmp/"+fname, 1024)
if err != nil {
t.Fatal(err)
}
emptyFileSegment, err := Open("/tmp/" + fname)
if err != nil {
t.Fatalf("error opening segment: %v", err)
}
defer func(emptyFileSegment *Segment) {
cerr := emptyFileSegment.Close()
if cerr != nil {
t.Fatalf("error closing segment: %v", err)
}
}(emptyFileSegment.(*Segment))
segsToMerge = append(segsToMerge, emptyFileSegment.(*Segment))
}
if !before {
segsToMerge = append(segsToMerge, segment.(*Segment))
}
_ = os.RemoveAll("/tmp/scorch3.zap")
drops := make([]*roaring.Bitmap, len(segsToMerge))
_, err = Merge(segsToMerge, drops, "/tmp/scorch3.zap", 1024)
if err != nil {
t.Fatal(err)
}
segm, err := Open("/tmp/scorch3.zap")
if err != nil {
t.Fatalf("error opening merged segment: %v", err)
}
segCur := segm.(*Segment)
defer func() {
cerr := segCur.Close()
if cerr != nil {
t.Fatalf("error closing segment: %v", err)
}
}()
if segCur.Path() != "/tmp/scorch3.zap" {
t.Fatalf("wrong path")
}
if segCur.Count() != 2 {
t.Fatalf("wrong count, numEmptySegments: %d, got count: %d", numEmptySegments, segCur.Count())
}
if len(segCur.Fields()) != 5 {
t.Fatalf("wrong # fields: %#v\n", segCur.Fields())
}
testMergeWithSelf(t, segCur, 2)
}
func testMergeWithSelf(t *testing.T, segCur *Segment, expectedCount uint64) {
// trying merging the segment with itself for a few rounds
var diffs []string
for i := 0; i < 10; i++ {
fname := fmt.Sprintf("scorch-self-%d.zap", i)
_ = os.RemoveAll("/tmp/" + fname)
segsToMerge := make([]*Segment, 1)
segsToMerge[0] = segCur
_, err := Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/"+fname, 1024)
if err != nil {
t.Fatal(err)
}
segm, err := Open("/tmp/" + fname)
if err != nil {
t.Fatalf("error opening merged segment: %v", err)
}
segNew := segm.(*Segment)
defer func(s *Segment) {
cerr := s.Close()
if cerr != nil {
t.Fatalf("error closing segment: %v", err)
}
}(segNew)
if segNew.Count() != expectedCount {
t.Fatalf("wrong count")
}
if len(segNew.Fields()) != 5 {
t.Fatalf("wrong # fields: %#v\n", segNew.Fields())
}
diff := compareSegments(segCur, segNew)
if diff != "" {
diffs = append(diffs, fname+" is different than previous:\n"+diff)
}
segCur = segNew
}
if len(diffs) > 0 {
t.Errorf("mismatches after repeated self-merging: %v", strings.Join(diffs, "\n"))
}
}
func compareSegments(a, b *Segment) string {
var rv []string
if a.Count() != b.Count() {
return "counts"
}
afields := append([]string(nil), a.Fields()...)
bfields := append([]string(nil), b.Fields()...)
sort.Strings(afields)
sort.Strings(bfields)
if !reflect.DeepEqual(afields, bfields) {
return "fields"
}
for _, fieldName := range afields {
adict, err := a.Dictionary(fieldName)
if err != nil {
return fmt.Sprintf("adict err: %v", err)
}
bdict, err := b.Dictionary(fieldName)
if err != nil {
return fmt.Sprintf("bdict err: %v", err)
}
if adict.(*Dictionary).fst.Len() != bdict.(*Dictionary).fst.Len() {
rv = append(rv, fmt.Sprintf("field %s, dict fst Len()'s different: %v %v",
fieldName, adict.(*Dictionary).fst.Len(), bdict.(*Dictionary).fst.Len()))
}
aitr := adict.Iterator()
bitr := bdict.Iterator()
for {
anext, aerr := aitr.Next()
bnext, berr := bitr.Next()
if aerr != berr {
rv = append(rv, fmt.Sprintf("field %s, dict iterator Next() errors different: %v %v",
fieldName, aerr, berr))
break
}
if !reflect.DeepEqual(anext, bnext) {
rv = append(rv, fmt.Sprintf("field %s, dict iterator Next() results different: %#v %#v",
fieldName, anext, bnext))
// keep going to try to see more diff details at the postingsList level
}
if aerr != nil || anext == nil ||
berr != nil || bnext == nil {
break
}
for _, next := range []*index.DictEntry{anext, bnext} {
if next == nil {
continue
}
aplist, aerr := adict.(*Dictionary).postingsList([]byte(next.Term), nil)
bplist, berr := bdict.(*Dictionary).postingsList([]byte(next.Term), nil)
if aerr != berr {
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsList() errors different: %v %v",
fieldName, next.Term, aerr, berr))
}
if (aplist != nil) != (bplist != nil) {
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsList() results different: %v %v",
fieldName, next.Term, aplist, bplist))
break
}
if aerr != nil || aplist == nil ||
berr != nil || bplist == nil {
break
}
if aplist.Count() != bplist.Count() {
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsList().Count()'s different: %v %v",
fieldName, next.Term, aplist.Count(), bplist.Count()))
}
apitr := aplist.Iterator()
bpitr := bplist.Iterator()
if (apitr != nil) != (bpitr != nil) {
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsList.Iterator() results different: %v %v",
fieldName, next.Term, apitr, bpitr))
break
}
for {
apitrn, aerr := apitr.Next()
bpitrn, aerr := bpitr.Next()
if aerr != berr {
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() errors different: %v %v",
fieldName, next.Term, aerr, berr))
}
if (apitrn != nil) != (bpitrn != nil) {
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() results different: %v %v",
fieldName, next.Term, apitrn, bpitrn))
break
}
if aerr != nil || apitrn == nil ||
berr != nil || bpitrn == nil {
break
}
if apitrn.Number() != bpitrn.Number() {
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() Number()'s different: %v %v",
fieldName, next.Term, apitrn.Number(), bpitrn.Number()))
}
if apitrn.Frequency() != bpitrn.Frequency() {
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() Frequency()'s different: %v %v",
fieldName, next.Term, apitrn.Frequency(), bpitrn.Frequency()))
}
if apitrn.Norm() != bpitrn.Norm() {
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() Norm()'s different: %v %v",
fieldName, next.Term, apitrn.Norm(), bpitrn.Norm()))
}
if len(apitrn.Locations()) != len(bpitrn.Locations()) {
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() Locations() len's different: %v %v",
fieldName, next.Term, len(apitrn.Locations()), len(bpitrn.Locations())))
}
for loci, aloc := range apitrn.Locations() {
bloc := bpitrn.Locations()[loci]
if (aloc != nil) != (bloc != nil) {
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() loc different: %v %v",
fieldName, next.Term, aloc, bloc))
break
}
if aloc.Field() != bloc.Field() ||
aloc.Start() != bloc.Start() ||
aloc.End() != bloc.End() ||
aloc.Pos() != bloc.Pos() ||
!reflect.DeepEqual(aloc.ArrayPositions(), bloc.ArrayPositions()) {
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsListIterator Next() loc details different: %v %v",
fieldName, next.Term, aloc, bloc))
}
}
}
}
}
}
return strings.Join(rv, "\n")
}
func TestMergeAndDrop(t *testing.T) {
docsToDrop := make([]*roaring.Bitmap, 2)
docsToDrop[0] = roaring.NewBitmap()
docsToDrop[0].AddInt(1)
docsToDrop[1] = roaring.NewBitmap()
docsToDrop[1].AddInt(1)
testMergeAndDrop(t, docsToDrop)
}
func TestMergeAndDropAllFromOneSegment(t *testing.T) {
docsToDrop := make([]*roaring.Bitmap, 2)
docsToDrop[0] = roaring.NewBitmap()
docsToDrop[0].AddInt(0)
docsToDrop[0].AddInt(1)
docsToDrop[1] = roaring.NewBitmap()
testMergeAndDrop(t, docsToDrop)
}
func testMergeAndDrop(t *testing.T, docsToDrop []*roaring.Bitmap) {
_ = os.RemoveAll("/tmp/scorch.zap")
_ = os.RemoveAll("/tmp/scorch2.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
if err != nil {
t.Fatal(err)
}
segment, err := Open("/tmp/scorch.zap")
if err != nil {
t.Fatalf("error opening segment: %v", err)
}
defer func() {
cerr := segment.Close()
if cerr != nil {
t.Fatalf("error closing segment: %v", err)
}
}()
memSegment2 := buildMemSegmentMulti2()
err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024)
if err != nil {
t.Fatal(err)
}
segment2, err := Open("/tmp/scorch2.zap")
if err != nil {
t.Fatalf("error opening segment: %v", err)
@ -117,24 +465,132 @@ func TestMergeAndDrop(t *testing.T) {
segsToMerge[0] = segment.(*Segment)
segsToMerge[1] = segment2.(*Segment)
testMergeAndDropSegments(t, segsToMerge, docsToDrop, 2)
}
func TestMergeWithUpdates(t *testing.T) {
segmentDocIds := [][]string{
[]string{"a", "b"},
[]string{"b", "c"}, // doc "b" updated
}
docsToDrop := make([]*roaring.Bitmap, 2)
docsToDrop[0] = roaring.NewBitmap()
docsToDrop[0].AddInt(1)
docsToDrop[0].AddInt(1) // doc "b" updated
docsToDrop[1] = roaring.NewBitmap()
docsToDrop[1].AddInt(1)
_, err = Merge(segsToMerge, docsToDrop, "/tmp/scorch3.zap", 1024)
testMergeWithUpdates(t, segmentDocIds, docsToDrop, 3)
}
func TestMergeWithUpdatesOnManySegments(t *testing.T) {
segmentDocIds := [][]string{
[]string{"a", "b"},
[]string{"b", "c"}, // doc "b" updated
[]string{"c", "d"}, // doc "c" updated
[]string{"d", "e"}, // doc "d" updated
}
docsToDrop := make([]*roaring.Bitmap, 4)
docsToDrop[0] = roaring.NewBitmap()
docsToDrop[0].AddInt(1) // doc "b" updated
docsToDrop[1] = roaring.NewBitmap()
docsToDrop[1].AddInt(1) // doc "c" updated
docsToDrop[2] = roaring.NewBitmap()
docsToDrop[2].AddInt(1) // doc "d" updated
docsToDrop[3] = roaring.NewBitmap()
testMergeWithUpdates(t, segmentDocIds, docsToDrop, 5)
}
func TestMergeWithUpdatesOnOneDoc(t *testing.T) {
segmentDocIds := [][]string{
[]string{"a", "b"},
[]string{"a", "c"}, // doc "a" updated
[]string{"a", "d"}, // doc "a" updated
[]string{"a", "e"}, // doc "a" updated
}
docsToDrop := make([]*roaring.Bitmap, 4)
docsToDrop[0] = roaring.NewBitmap()
docsToDrop[0].AddInt(0) // doc "a" updated
docsToDrop[1] = roaring.NewBitmap()
docsToDrop[1].AddInt(0) // doc "a" updated
docsToDrop[2] = roaring.NewBitmap()
docsToDrop[2].AddInt(0) // doc "a" updated
docsToDrop[3] = roaring.NewBitmap()
testMergeWithUpdates(t, segmentDocIds, docsToDrop, 5)
}
func testMergeWithUpdates(t *testing.T, segmentDocIds [][]string, docsToDrop []*roaring.Bitmap, expectedNumDocs uint64) {
var segsToMerge []*Segment
// convert segmentDocIds to segsToMerge
for i, docIds := range segmentDocIds {
fname := fmt.Sprintf("scorch%d.zap", i)
_ = os.RemoveAll("/tmp/" + fname)
memSegment := buildMemSegmentMultiHelper(docIds)
err := PersistSegment(memSegment, "/tmp/"+fname, 1024)
if err != nil {
t.Fatal(err)
}
segment, err := Open("/tmp/" + fname)
if err != nil {
t.Fatalf("error opening segment: %v", err)
}
defer func(segment *Segment) {
cerr := segment.Close()
if cerr != nil {
t.Fatalf("error closing segment: %v", err)
}
}(segment.(*Segment))
segsToMerge = append(segsToMerge, segment.(*Segment))
}
testMergeAndDropSegments(t, segsToMerge, docsToDrop, expectedNumDocs)
}
func testMergeAndDropSegments(t *testing.T, segsToMerge []*Segment, docsToDrop []*roaring.Bitmap, expectedNumDocs uint64) {
_ = os.RemoveAll("/tmp/scorch-merged.zap")
_, err := Merge(segsToMerge, docsToDrop, "/tmp/scorch-merged.zap", 1024)
if err != nil {
t.Fatal(err)
}
segm, err := Open("/tmp/scorch-merged.zap")
if err != nil {
t.Fatalf("error opening merged segment: %v", err)
}
defer func() {
cerr := segm.Close()
if cerr != nil {
t.Fatalf("error closing segment: %v", err)
}
}()
if segm.Count() != expectedNumDocs {
t.Fatalf("wrong count, got: %d, wanted: %d", segm.Count(), expectedNumDocs)
}
if len(segm.Fields()) != 5 {
t.Fatalf("wrong # fields: %#v\n", segm.Fields())
}
testMergeWithSelf(t, segm.(*Segment), expectedNumDocs)
}
func buildMemSegmentMulti2() *mem.Segment {
return buildMemSegmentMultiHelper([]string{"c", "d"})
}
func buildMemSegmentMultiHelper(docIds []string) *mem.Segment {
doc := &document.Document{
ID: "c",
Fields: []document.Field{
document.NewTextFieldCustom("_id", nil, []byte("c"), document.IndexField|document.StoreField, nil),
document.NewTextFieldCustom("_id", nil, []byte(docIds[0]), document.IndexField|document.StoreField, nil),
document.NewTextFieldCustom("name", nil, []byte("mat"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
document.NewTextFieldCustom("desc", nil, []byte("some thing"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
document.NewTextFieldCustom("tag", []uint64{0}, []byte("cold"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
@ -148,7 +604,7 @@ func buildMemSegmentMulti2() *mem.Segment {
doc2 := &document.Document{
ID: "d",
Fields: []document.Field{
document.NewTextFieldCustom("_id", nil, []byte("d"), document.IndexField|document.StoreField, nil),
document.NewTextFieldCustom("_id", nil, []byte(docIds[1]), document.IndexField|document.StoreField, nil),
document.NewTextFieldCustom("name", nil, []byte("joa"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
document.NewTextFieldCustom("desc", nil, []byte("some thing"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
document.NewTextFieldCustom("tag", []uint64{0}, []byte("cold"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil),
@ -169,7 +625,7 @@ func buildMemSegmentMulti2() *mem.Segment {
Start: 0,
End: 1,
Position: 1,
Term: []byte("c"),
Term: []byte(docIds[0]),
},
}, nil, false),
analysis.TokenFrequency(analysis.TokenStream{
@ -227,7 +683,7 @@ func buildMemSegmentMulti2() *mem.Segment {
Start: 0,
End: 1,
Position: 1,
Term: []byte("d"),
Term: []byte(docIds[1]),
},
}, nil, false),
analysis.TokenFrequency(analysis.TokenStream{

View File

@ -98,6 +98,49 @@ func (p *PostingsList) Count() uint64 {
return 0
}
func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
rv.postingsOffset = postingsOffset
// read the location of the freq/norm details
var n uint64
var read int
rv.freqOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+binary.MaxVarintLen64])
n += uint64(read)
rv.locOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
n += uint64(read)
var locBitmapOffset uint64
locBitmapOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
n += uint64(read)
var locBitmapLen uint64
locBitmapLen, read = binary.Uvarint(d.sb.mem[locBitmapOffset : locBitmapOffset+binary.MaxVarintLen64])
locRoaringBytes := d.sb.mem[locBitmapOffset+uint64(read) : locBitmapOffset+uint64(read)+locBitmapLen]
rv.locBitmap = roaring.NewBitmap()
_, err := rv.locBitmap.FromBuffer(locRoaringBytes)
if err != nil {
return fmt.Errorf("error loading roaring bitmap of locations with hits: %v", err)
}
var postingsLen uint64
postingsLen, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
n += uint64(read)
roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen]
rv.postings = roaring.NewBitmap()
_, err = rv.postings.FromBuffer(roaringBytes)
if err != nil {
return fmt.Errorf("error loading roaring bitmap: %v", err)
}
return nil
}
// PostingsIterator provides a way to iterate through the postings list
type PostingsIterator struct {
postings *PostingsList