diff --git a/index/scorch/segment/zap/intcoder.go b/index/scorch/segment/zap/intcoder.go index 247e36fb..8d1f9453 100644 --- a/index/scorch/segment/zap/intcoder.go +++ b/index/scorch/segment/zap/intcoder.go @@ -82,6 +82,19 @@ func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error { return nil } +func (c *chunkedIntCoder) AddBytes(docNum uint64, buf []byte) error { + chunk := docNum / c.chunkSize + if chunk != c.currChunk { + // starting a new chunk + c.Close() + c.chunkBuf.Reset() + c.currChunk = chunk + } + + _, err := c.chunkBuf.Write(buf) + return err +} + // Close indicates you are done calling Add() this allows the final chunk // to be encoded. func (c *chunkedIntCoder) Close() { diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index 33ce16c5..5066dfb9 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -162,7 +162,6 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, var bufReuse bytes.Buffer var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64) - var bufLoc []uint64 var postings *PostingsList var postItr *PostingsIterator @@ -316,45 +315,32 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, newDocNumsI := newDocNums[itrI] postItr = postings.iterator(postItr) - next, err2 := postItr.Next() - for next != nil && err2 == nil { - hitNewDocNum := newDocNumsI[next.Number()] + + nextDocNum, nextFreqNormBytes, nextLocBytes, err2 := postItr.nextBytes() + for err2 == nil && len(nextFreqNormBytes) > 0 { + hitNewDocNum := newDocNumsI[nextDocNum] if hitNewDocNum == docDropped { return nil, 0, fmt.Errorf("see hit with dropped doc num") } + newRoaring.Add(uint32(hitNewDocNum)) - // encode norm bits - norm := next.Norm() - normBits := math.Float32bits(float32(norm)) - err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits)) - if err != nil { - return nil, 0, err + err2 = tfEncoder.AddBytes(hitNewDocNum, nextFreqNormBytes) + if err2 != nil { + return nil, 0, err2 } - locs := next.Locations() - if len(locs) > 0 { + + if len(nextLocBytes) > 0 { newRoaringLocs.Add(uint32(hitNewDocNum)) - for _, loc := range locs { - if cap(bufLoc) < 5+len(loc.ArrayPositions()) { - bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions())) - } - args := bufLoc[0:5] - args[0] = uint64(fieldsMap[loc.Field()] - 1) - args[1] = loc.Pos() - args[2] = loc.Start() - args[3] = loc.End() - args[4] = uint64(len(loc.ArrayPositions())) - args = append(args, loc.ArrayPositions()...) - err = locEncoder.Add(hitNewDocNum, args...) - if err != nil { - return nil, 0, err - } + err2 = locEncoder.AddBytes(hitNewDocNum, nextLocBytes) + if err2 != nil { + return nil, 0, err2 } } docTermMap[hitNewDocNum] = append(append(docTermMap[hitNewDocNum], term...), termSeparator) - next, err2 = postItr.Next() + nextDocNum, nextFreqNormBytes, nextLocBytes, err2 = postItr.nextBytes() } if err2 != nil { return nil, 0, err2 diff --git a/index/scorch/segment/zap/posting.go b/index/scorch/segment/zap/posting.go index ada39b43..589c7cb8 100644 --- a/index/scorch/segment/zap/posting.go +++ b/index/scorch/segment/zap/posting.go @@ -45,7 +45,25 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator { if rv == nil { rv = &PostingsIterator{} } else { + freqNormReader := rv.freqNormReader + if freqNormReader != nil { + freqNormReader.Reset([]byte(nil)) + } + freqNormDecoder := rv.freqNormDecoder + + locReader := rv.locReader + if locReader != nil { + locReader.Reset([]byte(nil)) + } + locDecoder := rv.locDecoder + *rv = PostingsIterator{} // clear the struct + + rv.freqNormReader = freqNormReader + rv.freqNormDecoder = freqNormDecoder + + rv.locReader = locReader + rv.locDecoder = locDecoder } rv.postings = p @@ -279,75 +297,23 @@ func (i *PostingsIterator) readLocation(l *Location) error { // Next returns the next posting on the postings list, or nil at the end func (i *PostingsIterator) Next() (segment.Posting, error) { - if i.actual == nil || !i.actual.HasNext() { - return nil, nil - } - n := i.actual.Next() - nChunk := n / i.postings.sb.chunkFactor - allN := i.all.Next() - allNChunk := allN / i.postings.sb.chunkFactor - - // n is the next actual hit (excluding some postings) - // allN is the next hit in the full postings - // if they don't match, adjust offsets to factor in item we're skipping over - // incr the all iterator, and check again - for allN != n { - - // in different chunks, reset offsets - if allNChunk != nChunk { - i.locoffset = 0 - i.offset = 0 - } else { - - if i.currChunk != nChunk || i.currChunkFreqNorm == nil { - err := i.loadChunk(int(nChunk)) - if err != nil { - return nil, fmt.Errorf("error loading chunk: %v", err) - } - } - - // read off freq/offsets even though we don't care about them - freq, _, err := i.readFreqNorm() - if err != nil { - return nil, err - } - if i.locBitmap.Contains(allN) { - for j := 0; j < int(freq); j++ { - err := i.readLocation(nil) - if err != nil { - return nil, err - } - } - } - - // in same chunk, need to account for offsets - i.offset++ - } - - allN = i.all.Next() - } - - if i.currChunk != nChunk || i.currChunkFreqNorm == nil { - err := i.loadChunk(int(nChunk)) - if err != nil { - return nil, fmt.Errorf("error loading chunk: %v", err) - } + docNum, exists, err := i.nextDocNum() + if err != nil || !exists { + return nil, err } reuseLocs := i.next.locs // hold for reuse before struct clearing i.next = Posting{} // clear the struct rv := &i.next - rv.iterator = i - rv.docNum = uint64(n) + rv.docNum = docNum - var err error var normBits uint64 rv.freq, normBits, err = i.readFreqNorm() if err != nil { return nil, err } rv.norm = math.Float32frombits(uint32(normBits)) - if i.locBitmap.Contains(n) { + if i.locBitmap.Contains(uint32(docNum)) { // read off 'freq' locations, into reused slices if cap(i.nextLocs) >= int(rv.freq) { i.nextLocs = i.nextLocs[0:rv.freq] @@ -371,14 +337,111 @@ func (i *PostingsIterator) Next() (segment.Posting, error) { return rv, nil } +// nextBytes returns the docNum and the encoded freq & loc bytes for +// the next posting +func (i *PostingsIterator) nextBytes() (uint64, []byte, []byte, error) { + docNum, exists, err := i.nextDocNum() + if err != nil { + return 0, nil, nil, err + } + if !exists { + return 0, nil, nil, nil + } + + startFreqNorm := len(i.currChunkFreqNorm) - i.freqNormReader.Len() + + freq, _, err := i.readFreqNorm() + if err != nil { + return 0, nil, nil, err + } + + endFreqNorm := len(i.currChunkFreqNorm) - i.freqNormReader.Len() + bytesFreqNorm := i.currChunkFreqNorm[startFreqNorm:endFreqNorm] + + var bytesLoc []byte + if i.locBitmap.Contains(uint32(docNum)) { + startLoc := len(i.currChunkLoc) - i.locReader.Len() + + for j := uint64(0); j < freq; j++ { + err := i.readLocation(nil) + if err != nil { + return 0, nil, nil, err + } + } + + endLoc := len(i.currChunkLoc) - i.locReader.Len() + bytesLoc = i.currChunkLoc[startLoc:endLoc] + } + + return docNum, bytesFreqNorm, bytesLoc, nil +} + +// nextDocNum returns the next docNum on the postings list, and also +// sets up the currChunk / loc related fields of the iterator. +func (i *PostingsIterator) nextDocNum() (uint64, bool, error) { + if i.actual == nil || !i.actual.HasNext() { + return 0, false, nil + } + + n := i.actual.Next() + nChunk := n / i.postings.sb.chunkFactor + allN := i.all.Next() + allNChunk := allN / i.postings.sb.chunkFactor + + // n is the next actual hit (excluding some postings) + // allN is the next hit in the full postings + // if they don't match, adjust offsets to factor in item we're skipping over + // incr the all iterator, and check again + for allN != n { + // in different chunks, reset offsets + if allNChunk != nChunk { + i.locoffset = 0 + i.offset = 0 + } else { + if i.currChunk != nChunk || i.currChunkFreqNorm == nil { + err := i.loadChunk(int(nChunk)) + if err != nil { + return 0, false, fmt.Errorf("error loading chunk: %v", err) + } + } + + // read off freq/offsets even though we don't care about them + freq, _, err := i.readFreqNorm() + if err != nil { + return 0, false, err + } + if i.locBitmap.Contains(allN) { + for j := 0; j < int(freq); j++ { + err := i.readLocation(nil) + if err != nil { + return 0, false, err + } + } + } + + // in same chunk, need to account for offsets + i.offset++ + } + + allN = i.all.Next() + } + + if i.currChunk != nChunk || i.currChunkFreqNorm == nil { + err := i.loadChunk(int(nChunk)) + if err != nil { + return 0, false, fmt.Errorf("error loading chunk: %v", err) + } + } + + return uint64(n), true, nil +} + // Posting is a single entry in a postings list type Posting struct { - iterator *PostingsIterator - docNum uint64 - - freq uint64 - norm float32 - locs []segment.Location + docNum uint64 + freq uint64 + norm float32 + locs []segment.Location } // Number returns the document number of this posting in this segment