scorch zap optimize merge by byte copying freq/norm/loc's
This change adds a zap PostingsIterator.nextBytes() method, which is similar to Next(), but instead of returning a Posting instance, nextBytes() returns the encoded freq/norm and location byte slices. The zap merge code then provides those byte slices directly to the intCoder's via a new method, intCoder.AddBytes(), thereby avoiding having to encode many uvarint's.
This commit is contained in:
parent
655268bec8
commit
530a3d24cf
|
@ -82,6 +82,19 @@ func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *chunkedIntCoder) AddBytes(docNum uint64, buf []byte) error {
|
||||
chunk := docNum / c.chunkSize
|
||||
if chunk != c.currChunk {
|
||||
// starting a new chunk
|
||||
c.Close()
|
||||
c.chunkBuf.Reset()
|
||||
c.currChunk = chunk
|
||||
}
|
||||
|
||||
_, err := c.chunkBuf.Write(buf)
|
||||
return err
|
||||
}
|
||||
|
||||
// Close indicates you are done calling Add() this allows the final chunk
|
||||
// to be encoded.
|
||||
func (c *chunkedIntCoder) Close() {
|
||||
|
|
|
@ -162,7 +162,6 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
|
|||
|
||||
var bufReuse bytes.Buffer
|
||||
var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64)
|
||||
var bufLoc []uint64
|
||||
|
||||
var postings *PostingsList
|
||||
var postItr *PostingsIterator
|
||||
|
@ -316,45 +315,32 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
|
|||
newDocNumsI := newDocNums[itrI]
|
||||
|
||||
postItr = postings.iterator(postItr)
|
||||
next, err2 := postItr.Next()
|
||||
for next != nil && err2 == nil {
|
||||
hitNewDocNum := newDocNumsI[next.Number()]
|
||||
|
||||
nextDocNum, nextFreqNormBytes, nextLocBytes, err2 := postItr.nextBytes()
|
||||
for err2 == nil && len(nextFreqNormBytes) > 0 {
|
||||
hitNewDocNum := newDocNumsI[nextDocNum]
|
||||
if hitNewDocNum == docDropped {
|
||||
return nil, 0, fmt.Errorf("see hit with dropped doc num")
|
||||
}
|
||||
|
||||
newRoaring.Add(uint32(hitNewDocNum))
|
||||
// encode norm bits
|
||||
norm := next.Norm()
|
||||
normBits := math.Float32bits(float32(norm))
|
||||
err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits))
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
err2 = tfEncoder.AddBytes(hitNewDocNum, nextFreqNormBytes)
|
||||
if err2 != nil {
|
||||
return nil, 0, err2
|
||||
}
|
||||
locs := next.Locations()
|
||||
if len(locs) > 0 {
|
||||
|
||||
if len(nextLocBytes) > 0 {
|
||||
newRoaringLocs.Add(uint32(hitNewDocNum))
|
||||
for _, loc := range locs {
|
||||
if cap(bufLoc) < 5+len(loc.ArrayPositions()) {
|
||||
bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions()))
|
||||
}
|
||||
args := bufLoc[0:5]
|
||||
args[0] = uint64(fieldsMap[loc.Field()] - 1)
|
||||
args[1] = loc.Pos()
|
||||
args[2] = loc.Start()
|
||||
args[3] = loc.End()
|
||||
args[4] = uint64(len(loc.ArrayPositions()))
|
||||
args = append(args, loc.ArrayPositions()...)
|
||||
err = locEncoder.Add(hitNewDocNum, args...)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
err2 = locEncoder.AddBytes(hitNewDocNum, nextLocBytes)
|
||||
if err2 != nil {
|
||||
return nil, 0, err2
|
||||
}
|
||||
}
|
||||
|
||||
docTermMap[hitNewDocNum] =
|
||||
append(append(docTermMap[hitNewDocNum], term...), termSeparator)
|
||||
|
||||
next, err2 = postItr.Next()
|
||||
nextDocNum, nextFreqNormBytes, nextLocBytes, err2 = postItr.nextBytes()
|
||||
}
|
||||
if err2 != nil {
|
||||
return nil, 0, err2
|
||||
|
|
|
@ -280,12 +280,9 @@ func (i *PostingsIterator) readLocation(l *Location) error {
|
|||
// Next returns the next posting on the postings list, or nil at the end
|
||||
func (i *PostingsIterator) Next() (segment.Posting, error) {
|
||||
docNum, exists, err := i.nextDocNum()
|
||||
if err != nil {
|
||||
if err != nil || !exists {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
reuseLocs := i.next.locs // hold for reuse before struct clearing
|
||||
i.next = Posting{} // clear the struct
|
||||
|
@ -322,6 +319,45 @@ func (i *PostingsIterator) Next() (segment.Posting, error) {
|
|||
return rv, nil
|
||||
}
|
||||
|
||||
// nextBytes returns the docNum and the encoded freq & loc bytes for
|
||||
// the next posting
|
||||
func (i *PostingsIterator) nextBytes() (uint64, []byte, []byte, error) {
|
||||
docNum, exists, err := i.nextDocNum()
|
||||
if err != nil {
|
||||
return 0, nil, nil, err
|
||||
}
|
||||
if !exists {
|
||||
return 0, nil, nil, nil
|
||||
}
|
||||
|
||||
startFreqNorm := len(i.currChunkFreqNorm) - i.freqNormReader.Len()
|
||||
|
||||
freq, _, err := i.readFreqNorm()
|
||||
if err != nil {
|
||||
return 0, nil, nil, err
|
||||
}
|
||||
|
||||
endFreqNorm := len(i.currChunkFreqNorm) - i.freqNormReader.Len()
|
||||
bytesFreqNorm := i.currChunkFreqNorm[startFreqNorm:endFreqNorm]
|
||||
|
||||
var bytesLoc []byte
|
||||
if i.locBitmap.Contains(uint32(docNum)) {
|
||||
startLoc := len(i.currChunkLoc) - i.locReader.Len()
|
||||
|
||||
for j := uint64(0); j < freq; j++ {
|
||||
err := i.readLocation(nil)
|
||||
if err != nil {
|
||||
return 0, nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
endLoc := len(i.currChunkLoc) - i.locReader.Len()
|
||||
bytesLoc = i.currChunkLoc[startLoc:endLoc]
|
||||
}
|
||||
|
||||
return docNum, bytesFreqNorm, bytesLoc, nil
|
||||
}
|
||||
|
||||
// nextDocNum returns the next docNum on the postings list, and also
|
||||
// sets up the currChunk / loc related fields of the iterator.
|
||||
func (i *PostingsIterator) nextDocNum() (uint64, bool, error) {
|
||||
|
|
Loading…
Reference in New Issue