0
0
Fork 0

Merge pull request #850 from steveyen/more-reuse-optimizations

More buffer & slice reuse optimizations
This commit is contained in:
Steve Yen 2018-03-26 13:07:46 -07:00 committed by GitHub
commit e9ca76be78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 158 additions and 100 deletions

View File

@ -84,7 +84,7 @@ func (e *EmptyDictionaryIterator) Next() (*index.DictEntry, error) {
type EmptyPostingsList struct{}
func (e *EmptyPostingsList) Iterator() PostingsIterator {
func (e *EmptyPostingsList) Iterator(includeFreq, includeNorm, includeLocations bool) PostingsIterator {
return &EmptyPostingsIterator{}
}

View File

@ -78,7 +78,7 @@ func (p *PostingsList) Count() uint64 {
}
// Iterator returns an iterator for this postings list
func (p *PostingsList) Iterator() segment.PostingsIterator {
func (p *PostingsList) Iterator(includeFreq, includeNorm, includeLocations bool) segment.PostingsIterator {
return p.InitIterator(nil)
}
func (p *PostingsList) InitIterator(prealloc *PostingsIterator) *PostingsIterator {

View File

@ -48,7 +48,7 @@ func TestEmpty(t *testing.T) {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr := postingsList.Iterator()
postingsItr := postingsList.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
@ -211,7 +211,7 @@ func TestSingle(t *testing.T) {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr := postingsList.Iterator()
postingsItr := postingsList.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
@ -257,7 +257,7 @@ func TestSingle(t *testing.T) {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr = postingsList.Iterator()
postingsItr = postingsList.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
@ -325,7 +325,7 @@ func TestSingle(t *testing.T) {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr = postingsList.Iterator()
postingsItr = postingsList.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
@ -394,7 +394,7 @@ func TestSingle(t *testing.T) {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr = postingsList.Iterator()
postingsItr = postingsList.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
@ -638,7 +638,7 @@ func TestMultiple(t *testing.T) {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr := postingsList.Iterator()
postingsItr := postingsList.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
@ -677,7 +677,7 @@ func TestMultiple(t *testing.T) {
t.Errorf("expected count from postings list to be 1, got %d", postingsListExcludingCount)
}
postingsItrExcluding := postingsListExcluding.Iterator()
postingsItrExcluding := postingsListExcluding.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}

View File

@ -55,7 +55,7 @@ type DictionaryIterator interface {
}
type PostingsList interface {
Iterator() PostingsIterator
Iterator(includeFreq, includeNorm, includeLocations bool) PostingsIterator
Size() int

View File

@ -42,6 +42,8 @@ type chunkedContentCoder struct {
chunkBuf bytes.Buffer
chunkMeta []MetaData
compressed []byte // temp buf for snappy compression
}
// MetaData represents the data information inside a
@ -105,10 +107,10 @@ func (c *chunkedContentCoder) flushContents() error {
metaData := c.chunkMetaBuf.Bytes()
c.final = append(c.final, c.chunkMetaBuf.Bytes()...)
// write the compressed data to the final data
compressedData := snappy.Encode(nil, c.chunkBuf.Bytes())
c.final = append(c.final, compressedData...)
c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes())
c.final = append(c.final, c.compressed...)
c.chunkLens[c.currChunk] = uint64(len(compressedData) + len(metaData))
c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData))
return nil
}

View File

@ -42,6 +42,7 @@ type docValueIterator struct {
dvDataLoc uint64
curChunkHeader []MetaData
curChunkData []byte // compressed data cache
uncompressed []byte // temp buf for snappy decompression
}
func (di *docValueIterator) size() int {
@ -135,10 +136,11 @@ func (di *docValueIterator) visitDocValues(docNum uint64,
return nil
}
// uncompress the already loaded data
uncompressed, err := snappy.Decode(nil, di.curChunkData)
uncompressed, err := snappy.Decode(di.uncompressed[:cap(di.uncompressed)], di.curChunkData)
if err != nil {
return err
}
di.uncompressed = uncompressed
// pick the terms for the given docNum
uncompressed = uncompressed[start:end]

View File

@ -303,7 +303,7 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
return nil, 0, err2
}
postItr = postings.iterator(postItr)
postItr = postings.iterator(true, true, true, postItr)
if fieldsSame {
// can optimize by copying freq/norm/loc bytes directly
@ -604,7 +604,6 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
curr = 0
metaBuf.Reset()
data = data[:0]
compressed = compressed[:0]
// collect all the data
for i := 0; i < len(fieldsInv); i++ {
@ -641,7 +640,7 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
metaEncoder.Close()
metaBytes := metaBuf.Bytes()
compressed = snappy.Encode(compressed, data)
compressed = snappy.Encode(compressed[:cap(compressed)], data)
// record where we're about to start writing
docNumOffsets[newDocNum] = uint64(w.Count())

View File

@ -332,8 +332,8 @@ func compareSegments(a, b *Segment) string {
fieldName, next.Term, aplist.Count(), bplist.Count()))
}
apitr := aplist.Iterator()
bpitr := bplist.Iterator()
apitr := aplist.Iterator(true, true, true)
bpitr := bplist.Iterator(true, true, true)
if (apitr != nil) != (bpitr != nil) {
rv = append(rv, fmt.Sprintf("field %s, term: %s, postingsList.Iterator() results different: %v %v",
fieldName, next.Term, apitr, bpitr))

View File

@ -517,7 +517,6 @@ func (s *interim) writeStoredFields() (
s.metaBuf.Reset()
data = data[:0]
compressed = compressed[:0]
for fieldID := range s.FieldsInv {
isf, exists := docStoredFields[uint16(fieldID)]
@ -534,7 +533,7 @@ func (s *interim) writeStoredFields() (
metaEncoder.Close()
metaBytes := s.metaBuf.Bytes()
compressed = snappy.Encode(compressed, data)
compressed = snappy.Encode(compressed[:cap(compressed)], data)
docStoredOffsets[docNum] = uint64(s.w.Count())

View File

@ -131,11 +131,12 @@ func (p *PostingsList) OrInto(receiver *roaring.Bitmap) {
}
// Iterator returns an iterator for this postings list
func (p *PostingsList) Iterator() segment.PostingsIterator {
return p.iterator(nil)
func (p *PostingsList) Iterator(includeFreq, includeNorm, includeLocs bool) segment.PostingsIterator {
return p.iterator(includeFreq, includeNorm, includeLocs, nil)
}
func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator {
func (p *PostingsList) iterator(includeFreq, includeNorm, includeLocs bool,
rv *PostingsIterator) *PostingsIterator {
if rv == nil {
rv = &PostingsIterator{}
} else {
@ -154,6 +155,9 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator {
freqChunkOffsets := rv.freqChunkOffsets[:0]
locChunkOffsets := rv.locChunkOffsets[:0]
nextLocs := rv.nextLocs[:0]
nextSegmentLocs := rv.nextSegmentLocs[:0]
buf := rv.buf
*rv = PostingsIterator{} // clear the struct
@ -167,6 +171,9 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator {
rv.freqChunkOffsets = freqChunkOffsets
rv.locChunkOffsets = locChunkOffsets
rv.nextLocs = nextLocs
rv.nextSegmentLocs = nextSegmentLocs
rv.buf = buf
}
rv.postings = p
@ -188,38 +195,45 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator {
return rv
}
// prepare the freq chunk details
var n uint64
var read int
var numFreqChunks uint64
numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
n += uint64(read)
if cap(rv.freqChunkOffsets) >= int(numFreqChunks) {
rv.freqChunkOffsets = rv.freqChunkOffsets[:int(numFreqChunks)]
} else {
rv.freqChunkOffsets = make([]uint64, int(numFreqChunks))
}
for i := 0; i < int(numFreqChunks); i++ {
rv.freqChunkOffsets[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
// prepare the freq chunk details
rv.includeFreqNorm = includeFreq || includeNorm
if rv.includeFreqNorm {
var numFreqChunks uint64
numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
n += uint64(read)
if cap(rv.freqChunkOffsets) >= int(numFreqChunks) {
rv.freqChunkOffsets = rv.freqChunkOffsets[:int(numFreqChunks)]
} else {
rv.freqChunkOffsets = make([]uint64, int(numFreqChunks))
}
for i := 0; i < int(numFreqChunks); i++ {
rv.freqChunkOffsets[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
n += uint64(read)
}
rv.freqChunkStart = p.freqOffset + n
}
rv.freqChunkStart = p.freqOffset + n
// prepare the loc chunk details
n = 0
var numLocChunks uint64
numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
n += uint64(read)
if cap(rv.locChunkOffsets) >= int(numLocChunks) {
rv.locChunkOffsets = rv.locChunkOffsets[:int(numLocChunks)]
} else {
rv.locChunkOffsets = make([]uint64, int(numLocChunks))
}
for i := 0; i < int(numLocChunks); i++ {
rv.locChunkOffsets[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
rv.includeLocs = includeLocs
if rv.includeLocs {
n = 0
var numLocChunks uint64
numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
n += uint64(read)
if cap(rv.locChunkOffsets) >= int(numLocChunks) {
rv.locChunkOffsets = rv.locChunkOffsets[:int(numLocChunks)]
} else {
rv.locChunkOffsets = make([]uint64, int(numLocChunks))
}
for i := 0; i < int(numLocChunks); i++ {
rv.locChunkOffsets[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
n += uint64(read)
}
rv.locChunkStart = p.locOffset + n
}
rv.locChunkStart = p.locOffset + n
rv.all = p.postings.Iterator()
if p.except != nil {
@ -314,13 +328,17 @@ type PostingsIterator struct {
locChunkOffsets []uint64
locChunkStart uint64
next Posting // reused across Next() calls
nextLocs []Location // reused across Next() calls
next Posting // reused across Next() calls
nextLocs []Location // reused across Next() calls
nextSegmentLocs []segment.Location // reused across Next() calls
docNum1Hit uint64
normBits1Hit uint64
buf []byte
includeFreqNorm bool
includeLocs bool
}
func (i *PostingsIterator) Size() int {
@ -339,32 +357,42 @@ func (i *PostingsIterator) Size() int {
}
func (i *PostingsIterator) loadChunk(chunk int) error {
if chunk >= len(i.freqChunkOffsets) || chunk >= len(i.locChunkOffsets) {
return fmt.Errorf("tried to load chunk that doesn't exist %d/(%d %d)", chunk, len(i.freqChunkOffsets), len(i.locChunkOffsets))
if i.includeFreqNorm {
if chunk >= len(i.freqChunkOffsets) {
return fmt.Errorf("tried to load freq chunk that doesn't exist %d/(%d)",
chunk, len(i.freqChunkOffsets))
}
end, start := i.freqChunkStart, i.freqChunkStart
s, e := readChunkBoundary(chunk, i.freqChunkOffsets)
start += s
end += e
i.currChunkFreqNorm = i.postings.sb.mem[start:end]
if i.freqNormReader == nil {
i.freqNormReader = bytes.NewReader(i.currChunkFreqNorm)
i.freqNormDecoder = govarint.NewU64Base128Decoder(i.freqNormReader)
} else {
i.freqNormReader.Reset(i.currChunkFreqNorm)
}
}
end, start := i.freqChunkStart, i.freqChunkStart
s, e := readChunkBoundary(chunk, i.freqChunkOffsets)
start += s
end += e
i.currChunkFreqNorm = i.postings.sb.mem[start:end]
if i.freqNormReader == nil {
i.freqNormReader = bytes.NewReader(i.currChunkFreqNorm)
i.freqNormDecoder = govarint.NewU64Base128Decoder(i.freqNormReader)
} else {
i.freqNormReader.Reset(i.currChunkFreqNorm)
}
if i.includeLocs {
if chunk >= len(i.locChunkOffsets) {
return fmt.Errorf("tried to load loc chunk that doesn't exist %d/(%d)",
chunk, len(i.locChunkOffsets))
}
end, start = i.locChunkStart, i.locChunkStart
s, e = readChunkBoundary(chunk, i.locChunkOffsets)
start += s
end += e
i.currChunkLoc = i.postings.sb.mem[start:end]
if i.locReader == nil {
i.locReader = bytes.NewReader(i.currChunkLoc)
i.locDecoder = govarint.NewU64Base128Decoder(i.locReader)
} else {
i.locReader.Reset(i.currChunkLoc)
end, start := i.locChunkStart, i.locChunkStart
s, e := readChunkBoundary(chunk, i.locChunkOffsets)
start += s
end += e
i.currChunkLoc = i.postings.sb.mem[start:end]
if i.locReader == nil {
i.locReader = bytes.NewReader(i.currChunkLoc)
i.locDecoder = govarint.NewU64Base128Decoder(i.locReader)
} else {
i.locReader.Reset(i.currChunkLoc)
}
}
i.currChunk = uint32(chunk)
@ -441,10 +469,10 @@ func (i *PostingsIterator) readLocation(l *Location) error {
l.pos = pos
l.start = start
l.end = end
if numArrayPos > 0 {
if cap(l.ap) < int(numArrayPos) {
l.ap = make([]uint64, int(numArrayPos))
} else {
l.ap = l.ap[:0]
l.ap = l.ap[:int(numArrayPos)]
}
}
@ -469,11 +497,14 @@ func (i *PostingsIterator) Next() (segment.Posting, error) {
return nil, err
}
reuseLocs := i.next.locs // hold for reuse before struct clearing
i.next = Posting{} // clear the struct
i.next = Posting{} // clear the struct
rv := &i.next
rv.docNum = docNum
if !i.includeFreqNorm {
return rv, nil
}
var normBits uint64
var hasLocs bool
@ -484,18 +515,17 @@ func (i *PostingsIterator) Next() (segment.Posting, error) {
rv.norm = math.Float32frombits(uint32(normBits))
if hasLocs {
if i.includeLocs && hasLocs {
// read off 'freq' locations, into reused slices
if cap(i.nextLocs) >= int(rv.freq) {
i.nextLocs = i.nextLocs[0:rv.freq]
} else {
i.nextLocs = make([]Location, rv.freq)
i.nextLocs = make([]Location, rv.freq, rv.freq*2)
}
if cap(reuseLocs) >= int(rv.freq) {
rv.locs = reuseLocs[0:rv.freq]
} else {
rv.locs = make([]segment.Location, rv.freq)
if cap(i.nextSegmentLocs) < int(rv.freq) {
i.nextSegmentLocs = make([]segment.Location, rv.freq, rv.freq*2)
}
rv.locs = i.nextSegmentLocs[0:rv.freq]
for j := 0; j < int(rv.freq); j++ {
err := i.readLocation(&i.nextLocs[j])
if err != nil {
@ -585,7 +615,7 @@ func (i *PostingsIterator) nextDocNum() (uint64, bool, error) {
// if they don't match, move 'all' forwards until they do
for allN != n {
// in the same chunk, so move the freq/norm/loc decoders forward
if allNChunk == nChunk {
if i.includeFreqNorm && allNChunk == nChunk {
if i.currChunk != nChunk || i.currChunkFreqNorm == nil {
err := i.loadChunk(int(nChunk))
if err != nil {
@ -599,7 +629,7 @@ func (i *PostingsIterator) nextDocNum() (uint64, bool, error) {
return 0, false, err
}
if hasLocs {
if i.includeLocs && hasLocs {
for j := 0; j < int(freq); j++ {
err := i.readLocation(nil)
if err != nil {
@ -613,7 +643,7 @@ func (i *PostingsIterator) nextDocNum() (uint64, bool, error) {
allNChunk = allN / i.postings.sb.chunkFactor
}
if i.currChunk != nChunk || i.currChunkFreqNorm == nil {
if i.includeFreqNorm && (i.currChunk != nChunk || i.currChunkFreqNorm == nil) {
err := i.loadChunk(int(nChunk))
if err != nil {
return 0, false, fmt.Errorf("error loading chunk: %v", err)

View File

@ -273,19 +273,39 @@ func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) {
return rv, nil
}
// visitDocumentCtx holds data structures that are reusable across
// multiple VisitDocument() calls to avoid memory allocations
type visitDocumentCtx struct {
buf []byte
reader bytes.Reader
decoder *govarint.Base128Decoder
arrayPos []uint64
}
var visitDocumentCtxPool = sync.Pool{
New: func() interface{} {
reuse := &visitDocumentCtx{}
reuse.decoder = govarint.NewU64Base128Decoder(&reuse.reader)
return reuse
},
}
// VisitDocument invokes the DocFieldValueVistor for each stored field
// for the specified doc number
func (s *SegmentBase) VisitDocument(num uint64, visitor segment.DocumentFieldValueVisitor) error {
// first make sure this is a valid number in this segment
if num < s.numDocs {
vdc := visitDocumentCtxPool.Get().(*visitDocumentCtx)
meta, compressed := s.getDocStoredMetaAndCompressed(num)
uncompressed, err := snappy.Decode(nil, compressed)
uncompressed, err := snappy.Decode(vdc.buf[:cap(vdc.buf)], compressed)
if err != nil {
return err
}
// now decode meta and process
reader := bytes.NewReader(meta)
decoder := govarint.NewU64Base128Decoder(reader)
vdc.reader.Reset(meta)
decoder := vdc.decoder
keepGoing := true
for keepGoing {
@ -314,7 +334,10 @@ func (s *SegmentBase) VisitDocument(num uint64, visitor segment.DocumentFieldVal
}
var arrayPos []uint64
if numap > 0 {
arrayPos = make([]uint64, numap)
if cap(vdc.arrayPos) < int(numap) {
vdc.arrayPos = make([]uint64, numap)
}
arrayPos = vdc.arrayPos[:numap]
for i := 0; i < int(numap); i++ {
ap, err := decoder.GetU64()
if err != nil {
@ -327,6 +350,9 @@ func (s *SegmentBase) VisitDocument(num uint64, visitor segment.DocumentFieldVal
value := uncompressed[offset : offset+l]
keepGoing = visitor(s.fieldsInv[field], byte(typ), value, arrayPos)
}
vdc.buf = uncompressed
visitDocumentCtxPool.Put(vdc)
}
return nil
}

View File

@ -84,7 +84,7 @@ func TestOpen(t *testing.T) {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr := postingsList.Iterator()
postingsItr := postingsList.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
@ -130,7 +130,7 @@ func TestOpen(t *testing.T) {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr = postingsList.Iterator()
postingsItr = postingsList.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
@ -198,7 +198,7 @@ func TestOpen(t *testing.T) {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr = postingsList.Iterator()
postingsItr = postingsList.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
@ -267,7 +267,7 @@ func TestOpen(t *testing.T) {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr = postingsList.Iterator()
postingsItr = postingsList.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
@ -366,7 +366,7 @@ func TestOpenMulti(t *testing.T) {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr := postingsList.Iterator()
postingsItr := postingsList.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
@ -405,7 +405,7 @@ func TestOpenMulti(t *testing.T) {
t.Errorf("expected count from postings list to be 1, got %d", postingsListExcludingCount)
}
postingsItrExcluding := postingsListExcluding.Iterator()
postingsItrExcluding := postingsListExcluding.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
@ -466,7 +466,7 @@ func TestOpenMultiWithTwoChunks(t *testing.T) {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItr := postingsList.Iterator()
postingsItr := postingsList.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}
@ -500,7 +500,7 @@ func TestOpenMultiWithTwoChunks(t *testing.T) {
t.Fatal("got nil postings list, expected non-nil")
}
postingsItrExcluding := postingsListExcluding.Iterator()
postingsItrExcluding := postingsListExcluding.Iterator(true, true, true)
if postingsItr == nil {
t.Fatal("got nil iterator, expected non-nil")
}

View File

@ -394,7 +394,7 @@ func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
return nil, err
}
rv.postings[i] = pl
rv.iterators[i] = pl.Iterator()
rv.iterators[i] = pl.Iterator(includeFreq, includeNorm, includeTermVectors)
}
atomic.AddUint64(&i.parent.stats.TotTermSearchersStarted, uint64(1))
return rv, nil

View File

@ -165,7 +165,7 @@ func (cfd *cachedFieldDocs) prepareFields(field string, ss *SegmentSnapshot) {
}
cfd.size += uint64(size.SizeOfUint64) /* map key */
postingsItr := postings.Iterator()
postingsItr := postings.Iterator(false, false, false)
nextPosting, err2 := postingsItr.Next()
for err2 == nil && nextPosting != nil {
docNum := nextPosting.Number()