Merge pull request #782 from steveyen/scorch-intcoder-optimizations
Various scorch optimizations around merge & chunkedIntCoder
This commit is contained in:
commit
b8bb7922eb
|
@ -130,14 +130,14 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
|||
segment: s.root.segment[i].segment,
|
||||
cachedDocs: s.root.segment[i].cachedDocs,
|
||||
}
|
||||
|
||||
|
||||
// apply new obsoletions
|
||||
if s.root.segment[i].deleted == nil {
|
||||
newss.deleted = delta
|
||||
} else {
|
||||
newss.deleted = roaring.Or(s.root.segment[i].deleted, delta)
|
||||
}
|
||||
|
||||
|
||||
// check for live size before copying
|
||||
if newss.LiveSize() > 0 {
|
||||
newSnapshot.segment = append(newSnapshot.segment, newss)
|
||||
|
@ -241,7 +241,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
|||
// the root segments would be the obsolete segment set
|
||||
delete(nextMerge.old, segmentID)
|
||||
|
||||
} else if s.root.segment[i].LiveSize() > 0 {
|
||||
} else if s.root.segment[i].LiveSize() > 0 {
|
||||
// this segment is staying
|
||||
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
|
||||
id: s.root.segment[i].id,
|
||||
|
@ -269,7 +269,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
|||
}
|
||||
}
|
||||
}
|
||||
// In case where all the docs in the newly merged segment getting
|
||||
// In case where all the docs in the newly merged segment getting
|
||||
// deleted by the time we reach here, can skip the introduction.
|
||||
if nextMerge.new != nil &&
|
||||
nextMerge.new.Count() > newSegmentDeleted.GetCardinality() {
|
||||
|
|
|
@ -16,7 +16,7 @@ package scorch
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"encoding/json"
|
||||
|
||||
"fmt"
|
||||
"os"
|
||||
|
|
|
@ -68,7 +68,6 @@ OUTER:
|
|||
persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch,
|
||||
&lastMergedEpoch, persistWatchers)
|
||||
|
||||
|
||||
var ourSnapshot *IndexSnapshot
|
||||
var ourPersisted []chan error
|
||||
|
||||
|
|
|
@ -30,6 +30,8 @@ type chunkedIntCoder struct {
|
|||
encoder *govarint.Base128Encoder
|
||||
chunkLens []uint64
|
||||
currChunk uint64
|
||||
|
||||
buf []byte
|
||||
}
|
||||
|
||||
// newChunkedIntCoder returns a new chunk int coder which packs data into
|
||||
|
@ -67,12 +69,8 @@ func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error {
|
|||
// starting a new chunk
|
||||
if c.encoder != nil {
|
||||
// close out last
|
||||
c.encoder.Close()
|
||||
encodingBytes := c.chunkBuf.Bytes()
|
||||
c.chunkLens[c.currChunk] = uint64(len(encodingBytes))
|
||||
c.final = append(c.final, encodingBytes...)
|
||||
c.Close()
|
||||
c.chunkBuf.Reset()
|
||||
c.encoder = govarint.NewU64Base128Encoder(&c.chunkBuf)
|
||||
}
|
||||
c.currChunk = chunk
|
||||
}
|
||||
|
@ -98,26 +96,25 @@ func (c *chunkedIntCoder) Close() {
|
|||
|
||||
// Write commits all the encoded chunked integers to the provided writer.
|
||||
func (c *chunkedIntCoder) Write(w io.Writer) (int, error) {
|
||||
var tw int
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
// write out the number of chunks
|
||||
bufNeeded := binary.MaxVarintLen64 * (1 + len(c.chunkLens))
|
||||
if len(c.buf) < bufNeeded {
|
||||
c.buf = make([]byte, bufNeeded)
|
||||
}
|
||||
buf := c.buf
|
||||
|
||||
// write out the number of chunks & each chunkLen
|
||||
n := binary.PutUvarint(buf, uint64(len(c.chunkLens)))
|
||||
nw, err := w.Write(buf[:n])
|
||||
tw += nw
|
||||
for _, chunkLen := range c.chunkLens {
|
||||
n += binary.PutUvarint(buf[n:], uint64(chunkLen))
|
||||
}
|
||||
|
||||
tw, err := w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return tw, err
|
||||
}
|
||||
// write out the chunk lens
|
||||
for _, chunkLen := range c.chunkLens {
|
||||
n := binary.PutUvarint(buf, uint64(chunkLen))
|
||||
nw, err = w.Write(buf[:n])
|
||||
tw += nw
|
||||
if err != nil {
|
||||
return tw, err
|
||||
}
|
||||
}
|
||||
|
||||
// write out the data
|
||||
nw, err = w.Write(c.final)
|
||||
nw, err := w.Write(c.final)
|
||||
tw += nw
|
||||
if err != nil {
|
||||
return tw, err
|
||||
|
|
|
@ -170,6 +170,9 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
|
|||
rv := make([]uint64, len(fieldsInv))
|
||||
fieldDvLocs := make([]uint64, len(fieldsInv))
|
||||
|
||||
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
|
||||
locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
|
||||
|
||||
// docTermMap is keyed by docNum, where the array impl provides
|
||||
// better memory usage behavior than a sparse-friendlier hashmap
|
||||
// for when docs have much structural similarity (i.e., every doc
|
||||
|
@ -227,9 +230,6 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
|
|||
newRoaring := roaring.NewBitmap()
|
||||
newRoaringLocs := roaring.NewBitmap()
|
||||
|
||||
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
|
||||
locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
|
||||
|
||||
finishTerm := func(term []byte) error {
|
||||
if term == nil {
|
||||
return nil
|
||||
|
@ -316,10 +316,12 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
|
|||
return nil, 0, err2
|
||||
}
|
||||
|
||||
newDocNumsI := newDocNums[itrI]
|
||||
|
||||
postItr = postings.iterator(postItr)
|
||||
next, err2 := postItr.Next()
|
||||
for next != nil && err2 == nil {
|
||||
hitNewDocNum := newDocNums[itrI][next.Number()]
|
||||
hitNewDocNum := newDocNumsI[next.Number()]
|
||||
if hitNewDocNum == docDropped {
|
||||
return nil, 0, fmt.Errorf("see hit with dropped doc num")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue