From 530a3d24cf0768f4c7a82e9a61dd9a0eff3ec8a2 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Tue, 6 Mar 2018 07:58:42 -0800 Subject: [PATCH] scorch zap optimize merge by byte copying freq/norm/loc's This change adds a zap PostingsIterator.nextBytes() method, which is similar to Next(), but instead of returning a Posting instance, nextBytes() returns the encoded freq/norm and location byte slices. The zap merge code then provides those byte slices directly to the intCoder's via a new method, intCoder.AddBytes(), thereby avoiding having to encode many uvarint's. --- index/scorch/segment/zap/intcoder.go | 13 ++++++++ index/scorch/segment/zap/merge.go | 42 +++++++++----------------- index/scorch/segment/zap/posting.go | 44 +++++++++++++++++++++++++--- 3 files changed, 67 insertions(+), 32 deletions(-) diff --git a/index/scorch/segment/zap/intcoder.go b/index/scorch/segment/zap/intcoder.go index 247e36fb..8d1f9453 100644 --- a/index/scorch/segment/zap/intcoder.go +++ b/index/scorch/segment/zap/intcoder.go @@ -82,6 +82,19 @@ func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error { return nil } +func (c *chunkedIntCoder) AddBytes(docNum uint64, buf []byte) error { + chunk := docNum / c.chunkSize + if chunk != c.currChunk { + // starting a new chunk + c.Close() + c.chunkBuf.Reset() + c.currChunk = chunk + } + + _, err := c.chunkBuf.Write(buf) + return err +} + // Close indicates you are done calling Add() this allows the final chunk // to be encoded. func (c *chunkedIntCoder) Close() { diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index 33ce16c5..5066dfb9 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -162,7 +162,6 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, var bufReuse bytes.Buffer var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64) - var bufLoc []uint64 var postings *PostingsList var postItr *PostingsIterator @@ -316,45 +315,32 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, newDocNumsI := newDocNums[itrI] postItr = postings.iterator(postItr) - next, err2 := postItr.Next() - for next != nil && err2 == nil { - hitNewDocNum := newDocNumsI[next.Number()] + + nextDocNum, nextFreqNormBytes, nextLocBytes, err2 := postItr.nextBytes() + for err2 == nil && len(nextFreqNormBytes) > 0 { + hitNewDocNum := newDocNumsI[nextDocNum] if hitNewDocNum == docDropped { return nil, 0, fmt.Errorf("see hit with dropped doc num") } + newRoaring.Add(uint32(hitNewDocNum)) - // encode norm bits - norm := next.Norm() - normBits := math.Float32bits(float32(norm)) - err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits)) - if err != nil { - return nil, 0, err + err2 = tfEncoder.AddBytes(hitNewDocNum, nextFreqNormBytes) + if err2 != nil { + return nil, 0, err2 } - locs := next.Locations() - if len(locs) > 0 { + + if len(nextLocBytes) > 0 { newRoaringLocs.Add(uint32(hitNewDocNum)) - for _, loc := range locs { - if cap(bufLoc) < 5+len(loc.ArrayPositions()) { - bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions())) - } - args := bufLoc[0:5] - args[0] = uint64(fieldsMap[loc.Field()] - 1) - args[1] = loc.Pos() - args[2] = loc.Start() - args[3] = loc.End() - args[4] = uint64(len(loc.ArrayPositions())) - args = append(args, loc.ArrayPositions()...) - err = locEncoder.Add(hitNewDocNum, args...) - if err != nil { - return nil, 0, err - } + err2 = locEncoder.AddBytes(hitNewDocNum, nextLocBytes) + if err2 != nil { + return nil, 0, err2 } } docTermMap[hitNewDocNum] = append(append(docTermMap[hitNewDocNum], term...), termSeparator) - next, err2 = postItr.Next() + nextDocNum, nextFreqNormBytes, nextLocBytes, err2 = postItr.nextBytes() } if err2 != nil { return nil, 0, err2 diff --git a/index/scorch/segment/zap/posting.go b/index/scorch/segment/zap/posting.go index adc399ea..2dab4166 100644 --- a/index/scorch/segment/zap/posting.go +++ b/index/scorch/segment/zap/posting.go @@ -280,12 +280,9 @@ func (i *PostingsIterator) readLocation(l *Location) error { // Next returns the next posting on the postings list, or nil at the end func (i *PostingsIterator) Next() (segment.Posting, error) { docNum, exists, err := i.nextDocNum() - if err != nil { + if err != nil || !exists { return nil, err } - if !exists { - return nil, nil - } reuseLocs := i.next.locs // hold for reuse before struct clearing i.next = Posting{} // clear the struct @@ -322,6 +319,45 @@ func (i *PostingsIterator) Next() (segment.Posting, error) { return rv, nil } +// nextBytes returns the docNum and the encoded freq & loc bytes for +// the next posting +func (i *PostingsIterator) nextBytes() (uint64, []byte, []byte, error) { + docNum, exists, err := i.nextDocNum() + if err != nil { + return 0, nil, nil, err + } + if !exists { + return 0, nil, nil, nil + } + + startFreqNorm := len(i.currChunkFreqNorm) - i.freqNormReader.Len() + + freq, _, err := i.readFreqNorm() + if err != nil { + return 0, nil, nil, err + } + + endFreqNorm := len(i.currChunkFreqNorm) - i.freqNormReader.Len() + bytesFreqNorm := i.currChunkFreqNorm[startFreqNorm:endFreqNorm] + + var bytesLoc []byte + if i.locBitmap.Contains(uint32(docNum)) { + startLoc := len(i.currChunkLoc) - i.locReader.Len() + + for j := uint64(0); j < freq; j++ { + err := i.readLocation(nil) + if err != nil { + return 0, nil, nil, err + } + } + + endLoc := len(i.currChunkLoc) - i.locReader.Len() + bytesLoc = i.currChunkLoc[startLoc:endLoc] + } + + return docNum, bytesFreqNorm, bytesLoc, nil +} + // nextDocNum returns the next docNum on the postings list, and also // sets up the currChunk / loc related fields of the iterator. func (i *PostingsIterator) nextDocNum() (uint64, bool, error) {