0
0
Fork 0

scorch zap merge uses enumerator for vellum.Iterator's

This commit is contained in:
Steve Yen 2018-02-12 17:48:49 -08:00
parent a073424e5a
commit fe544f3352
1 changed files with 109 additions and 92 deletions

View File

@ -198,7 +198,6 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
if err2 != nil {
return nil, 0, err2
}
if dict != nil && dict.fst != nil {
itr, err2 := dict.fst.Iterator(nil, nil)
if err2 != nil && err2 != vellum.ErrIteratorDone {
@ -213,15 +212,6 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
}
}
// create merging iterator
mergeItr, err := vellum.NewMergeIterator(itrs, func(postingOffsets []uint64) uint64 {
// we don't actually use the merged value
return 0
})
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
if uint64(cap(docTermMap)) < newSegDocCount {
docTermMap = make([][]byte, newSegDocCount)
} else {
@ -231,23 +221,96 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
}
}
for err == nil {
term, _ := mergeItr.Current()
var prevTerm []byte
newRoaring := roaring.NewBitmap()
newRoaringLocs := roaring.NewBitmap()
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
finishTerm := func(term []byte) error {
if term == nil {
return nil
}
tfEncoder.Close()
locEncoder.Close()
if newRoaring.GetCardinality() > 0 {
// this field/term actually has hits in the new segment, lets write it down
freqOffset := uint64(w.Count())
_, err := tfEncoder.Write(w)
if err != nil {
return err
}
locOffset := uint64(w.Count())
_, err = locEncoder.Write(w)
if err != nil {
return err
}
postingLocOffset := uint64(w.Count())
_, err = writeRoaringWithLen(newRoaringLocs, w, &bufReuse, bufMaxVarintLen64)
if err != nil {
return err
}
postingOffset := uint64(w.Count())
// write out the start of the term info
n := binary.PutUvarint(bufMaxVarintLen64, freqOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return err
}
// write out the start of the loc info
n = binary.PutUvarint(bufMaxVarintLen64, locOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return err
}
// write out the start of the posting locs
n = binary.PutUvarint(bufMaxVarintLen64, postingLocOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return err
}
_, err = writeRoaringWithLen(newRoaring, w, &bufReuse, bufMaxVarintLen64)
if err != nil {
return err
}
err = newVellum.Insert(term, postingOffset)
if err != nil {
return err
}
}
newRoaring = roaring.NewBitmap()
newRoaringLocs = roaring.NewBitmap()
tfEncoder.Reset()
locEncoder.Reset()
// now go back and get posting list for this term
// but pass in the deleted docs for that segment
for dictI, dict := range dicts {
if dict == nil {
continue
return nil
}
enumerator, err := newEnumerator(itrs)
for err == nil {
term, itrI, postingsOffset := enumerator.Current()
if !bytes.Equal(prevTerm, term) {
// if the term changed, write out the info collected
// for the previous term
err2 := finishTerm(prevTerm)
if err2 != nil {
return nil, 0, err2
}
}
var err2 error
postings, err2 = dict.postingsList(term, drops[dictI], postings)
postings, err2 = dicts[itrI].postingsListFromOffset(
postingsOffset, drops[itrI], postings)
if err2 != nil {
return nil, 0, err2
}
@ -255,7 +318,7 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
postItr = postings.iterator(postItr)
next, err2 := postItr.Next()
for next != nil && err2 == nil {
hitNewDocNum := newDocNums[dictI][next.Number()]
hitNewDocNum := newDocNums[itrI][next.Number()]
if hitNewDocNum == docDropped {
return nil, 0, fmt.Errorf("see hit with dropped doc num")
}
@ -296,67 +359,21 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
if err2 != nil {
return nil, 0, err2
}
}
tfEncoder.Close()
locEncoder.Close()
prevTerm = prevTerm[:0] // copy to prevTerm in case Next() reuses term mem
prevTerm = append(prevTerm, term...)
if newRoaring.GetCardinality() > 0 {
// this field/term actually has hits in the new segment, lets write it down
freqOffset := uint64(w.Count())
_, err = tfEncoder.Write(w)
if err != nil {
return nil, 0, err
}
locOffset := uint64(w.Count())
_, err = locEncoder.Write(w)
if err != nil {
return nil, 0, err
}
postingLocOffset := uint64(w.Count())
_, err = writeRoaringWithLen(newRoaringLocs, w, &bufReuse, bufMaxVarintLen64)
if err != nil {
return nil, 0, err
}
postingOffset := uint64(w.Count())
// write out the start of the term info
buf := bufMaxVarintLen64
n := binary.PutUvarint(buf, freqOffset)
_, err = w.Write(buf[:n])
if err != nil {
return nil, 0, err
}
// write out the start of the loc info
n = binary.PutUvarint(buf, locOffset)
_, err = w.Write(buf[:n])
if err != nil {
return nil, 0, err
}
// write out the start of the loc posting list
n = binary.PutUvarint(buf, postingLocOffset)
_, err = w.Write(buf[:n])
if err != nil {
return nil, 0, err
}
_, err = writeRoaringWithLen(newRoaring, w, &bufReuse, bufMaxVarintLen64)
if err != nil {
return nil, 0, err
}
err = newVellum.Insert(term, postingOffset)
if err != nil {
return nil, 0, err
}
}
err = mergeItr.Next()
err = enumerator.Next()
}
if err != nil && err != vellum.ErrIteratorDone {
return nil, 0, err
}
err = finishTerm(prevTerm)
if err != nil {
return nil, 0, err
}
dictOffset := uint64(w.Count())
err = newVellum.Close()