scorch zap merge optimization to byte-copy storedDocs
The optimization to byte-copy all the storedDocs for a given segment during merging kicks in when the fields are the same across all segments and when there are no deletions for that given segment. This can happen, for example, during data loading or insert-only scenarios. As part of this commit, the Segment.copyStoredDocs() method was added, which uses a single Write() call to copy all the stored docs bytes of a segment to a writer in one shot. And, getDocStoredMetaAndCompressed() was refactored into a related helper function, getDocStoredOffsets(), which provides the storedDocs metadata (offsets & lengths) for a doc.
This commit is contained in:
parent
0b50a20cac
commit
ed4826b189
|
@ -437,6 +437,24 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
|
|||
for segI, segment := range segments {
|
||||
segNewDocNums := make([]uint64, segment.numDocs)
|
||||
|
||||
// optimize when the field mapping is the same across all
|
||||
// segments and there are no deletions, via byte-copying
|
||||
// of stored docs bytes directly to the writer
|
||||
if fieldsSame && (drops[segI] == nil || drops[segI].GetCardinality() == 0) {
|
||||
err := segment.copyStoredDocs(newDocNum, docNumOffsets, w)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
for i := uint64(0); i < segment.numDocs; i++ {
|
||||
segNewDocNums[i] = newDocNum
|
||||
newDocNum++
|
||||
}
|
||||
rv = append(rv, segNewDocNums)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// for each doc num
|
||||
for docNum := uint64(0); docNum < segment.numDocs; docNum++ {
|
||||
// TODO: roaring's API limits docNums to 32-bits?
|
||||
|
@ -528,6 +546,41 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
|
|||
return storedIndexOffset, rv, nil
|
||||
}
|
||||
|
||||
// copyStoredDocs writes out a segment's stored doc info, optimized by
|
||||
// using a single Write() call for the entire set of bytes. The
|
||||
// newDocNumOffsets is filled with the new offsets for each doc.
|
||||
func (s *SegmentBase) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64,
|
||||
w *CountHashWriter) error {
|
||||
if s.numDocs <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
indexOffset0, storedOffset0, _, _, _ :=
|
||||
s.getDocStoredOffsets(0) // the segment's first doc
|
||||
|
||||
indexOffsetN, storedOffsetN, readN, metaLenN, dataLenN :=
|
||||
s.getDocStoredOffsets(s.numDocs - 1) // the segment's last doc
|
||||
|
||||
storedOffset0New := uint64(w.Count())
|
||||
|
||||
storedBytes := s.mem[storedOffset0 : storedOffsetN+readN+metaLenN+dataLenN]
|
||||
_, err := w.Write(storedBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// remap the storedOffset's for the docs into new offsets relative
|
||||
// to storedOffset0New, filling the given docNumOffsetsOut array
|
||||
for indexOffset := indexOffset0; indexOffset <= indexOffsetN; indexOffset += 8 {
|
||||
storedOffset := binary.BigEndian.Uint64(s.mem[indexOffset : indexOffset+8])
|
||||
storedOffsetNew := storedOffset - storedOffset0 + storedOffset0New
|
||||
newDocNumOffsets[newDocNum] = storedOffsetNew
|
||||
newDocNum += 1
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// mergeFields builds a unified list of fields used across all the
|
||||
// input segments, and computes whether the fields are the same across
|
||||
// segments (which depends on fields to be sorted in the same way
|
||||
|
|
|
@ -398,6 +398,40 @@ func compareSegments(a, b *Segment) string {
|
|||
fieldName, next.Term, aloc, bloc))
|
||||
}
|
||||
}
|
||||
|
||||
if fieldName == "_id" {
|
||||
docId := next.Term
|
||||
docNumA := apitrn.Number()
|
||||
docNumB := bpitrn.Number()
|
||||
afields := map[string]interface{}{}
|
||||
err = a.VisitDocument(apitrn.Number(),
|
||||
func(field string, typ byte, value []byte, pos []uint64) bool {
|
||||
afields[field+"-typ"] = typ
|
||||
afields[field+"-value"] = value
|
||||
afields[field+"-pos"] = pos
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
rv = append(rv, fmt.Sprintf("a.VisitDocument err: %v", err))
|
||||
}
|
||||
bfields := map[string]interface{}{}
|
||||
err = b.VisitDocument(bpitrn.Number(),
|
||||
func(field string, typ byte, value []byte, pos []uint64) bool {
|
||||
bfields[field+"-typ"] = typ
|
||||
bfields[field+"-value"] = value
|
||||
bfields[field+"-pos"] = pos
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
rv = append(rv, fmt.Sprintf("b.VisitDocument err: %v", err))
|
||||
}
|
||||
if !reflect.DeepEqual(afields, bfields) {
|
||||
rv = append(rv, fmt.Sprintf("afields != bfields,"+
|
||||
" id: %s, docNumA: %d, docNumB: %d,"+
|
||||
" afields: %#v, bfields: %#v",
|
||||
docId, docNumA, docNumB, afields, bfields))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,15 +17,27 @@ package zap
|
|||
import "encoding/binary"
|
||||
|
||||
func (s *SegmentBase) getDocStoredMetaAndCompressed(docNum uint64) ([]byte, []byte) {
|
||||
docStoredStartAddr := s.storedIndexOffset + (8 * docNum)
|
||||
docStoredStart := binary.BigEndian.Uint64(s.mem[docStoredStartAddr : docStoredStartAddr+8])
|
||||
var n uint64
|
||||
metaLen, read := binary.Uvarint(s.mem[docStoredStart : docStoredStart+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
var dataLen uint64
|
||||
dataLen, read = binary.Uvarint(s.mem[docStoredStart+n : docStoredStart+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
meta := s.mem[docStoredStart+n : docStoredStart+n+metaLen]
|
||||
data := s.mem[docStoredStart+n+metaLen : docStoredStart+n+metaLen+dataLen]
|
||||
_, storedOffset, n, metaLen, dataLen := s.getDocStoredOffsets(docNum)
|
||||
|
||||
meta := s.mem[storedOffset+n : storedOffset+n+metaLen]
|
||||
data := s.mem[storedOffset+n+metaLen : storedOffset+n+metaLen+dataLen]
|
||||
|
||||
return meta, data
|
||||
}
|
||||
|
||||
func (s *SegmentBase) getDocStoredOffsets(docNum uint64) (
|
||||
uint64, uint64, uint64, uint64, uint64) {
|
||||
indexOffset := s.storedIndexOffset + (8 * docNum)
|
||||
|
||||
storedOffset := binary.BigEndian.Uint64(s.mem[indexOffset : indexOffset+8])
|
||||
|
||||
var n uint64
|
||||
|
||||
metaLen, read := binary.Uvarint(s.mem[storedOffset : storedOffset+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
|
||||
dataLen, read := binary.Uvarint(s.mem[storedOffset+n : storedOffset+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
|
||||
return indexOffset, storedOffset, n, metaLen, dataLen
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue