0
0
Fork 0

Merge pull request #748 from steveyen/master

scorch zap merge related refactorings / optimizations
This commit is contained in:
Steve Yen 2018-02-06 07:52:17 -08:00 committed by GitHub
commit eb1d269521
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 139 additions and 150 deletions

View File

@ -165,7 +165,7 @@ var docvalueCmd = &cobra.Command{
/* /*
TODO => dump all chunk headers?? TODO => dump all chunk headers??
if len(args) == 3 && args[2] == ">" { if len(args) == 3 && args[2] == ">" {
dumpChunkDocIDs(data, ) dumpChunkDocNums(data, )
}*/ }*/
} }
@ -187,7 +187,7 @@ var docvalueCmd = &cobra.Command{
docInChunk := uint64(localDocNum) / uint64(segment.ChunkFactor()) docInChunk := uint64(localDocNum) / uint64(segment.ChunkFactor())
if numChunks < docInChunk { if numChunks < docInChunk {
return fmt.Errorf("no chunk exists for chunk number: %d for docID: %d", docInChunk, localDocNum) return fmt.Errorf("no chunk exists for chunk number: %d for localDocNum: %d", docInChunk, localDocNum)
} }
destChunkDataLoc := fieldDvLoc + offset destChunkDataLoc := fieldDvLoc + offset
@ -207,7 +207,7 @@ var docvalueCmd = &cobra.Command{
offset = uint64(0) offset = uint64(0)
curChunkHeader := make([]zap.MetaData, int(numDocs)) curChunkHeader := make([]zap.MetaData, int(numDocs))
for i := 0; i < int(numDocs); i++ { for i := 0; i < int(numDocs); i++ {
curChunkHeader[i].DocID, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) curChunkHeader[i].DocNum, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(nread) offset += uint64(nread)
curChunkHeader[i].DocDvLoc, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) curChunkHeader[i].DocDvLoc, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(nread) offset += uint64(nread)
@ -221,8 +221,8 @@ var docvalueCmd = &cobra.Command{
start, length := getDocValueLocs(uint64(localDocNum), curChunkHeader) start, length := getDocValueLocs(uint64(localDocNum), curChunkHeader)
if start == math.MaxUint64 || length == math.MaxUint64 { if start == math.MaxUint64 || length == math.MaxUint64 {
fmt.Printf("no field values found for docID %d\n", localDocNum) fmt.Printf("no field values found for localDocNum: %d\n", localDocNum)
fmt.Printf("Try docIDs present in chunk: %s\n", assortDocID(curChunkHeader)) fmt.Printf("Try docNums present in chunk: %s\n", metaDataDocNums(curChunkHeader))
return nil return nil
} }
// uncompress the already loaded data // uncompress the already loaded data
@ -234,7 +234,7 @@ var docvalueCmd = &cobra.Command{
var termSeparator byte = 0xff var termSeparator byte = 0xff
var termSeparatorSplitSlice = []byte{termSeparator} var termSeparatorSplitSlice = []byte{termSeparator}
// pick the terms for the given docID // pick the terms for the given docNum
uncompressed = uncompressed[start : start+length] uncompressed = uncompressed[start : start+length]
for { for {
i := bytes.Index(uncompressed, termSeparatorSplitSlice) i := bytes.Index(uncompressed, termSeparatorSplitSlice)
@ -250,23 +250,22 @@ var docvalueCmd = &cobra.Command{
}, },
} }
func getDocValueLocs(docID uint64, metaHeader []zap.MetaData) (uint64, uint64) { func getDocValueLocs(docNum uint64, metaHeader []zap.MetaData) (uint64, uint64) {
i := sort.Search(len(metaHeader), func(i int) bool { i := sort.Search(len(metaHeader), func(i int) bool {
return metaHeader[i].DocID >= docID return metaHeader[i].DocNum >= docNum
}) })
if i < len(metaHeader) && metaHeader[i].DocID == docID { if i < len(metaHeader) && metaHeader[i].DocNum == docNum {
return metaHeader[i].DocDvLoc, metaHeader[i].DocDvLen return metaHeader[i].DocDvLoc, metaHeader[i].DocDvLen
} }
return math.MaxUint64, math.MaxUint64 return math.MaxUint64, math.MaxUint64
} }
func assortDocID(metaHeader []zap.MetaData) string { func metaDataDocNums(metaHeader []zap.MetaData) string {
docIDs := "" docNums := ""
for _, meta := range metaHeader { for _, meta := range metaHeader {
id := fmt.Sprintf("%d", meta.DocID) docNums += fmt.Sprintf("%d", meta.DocNum) + ", "
docIDs += id + ", "
} }
return docIDs return docNums
} }
func init() { func init() {

View File

@ -310,17 +310,21 @@ func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string,
introduction.persisted = make(chan error, 1) introduction.persisted = make(chan error, 1)
} }
// get read lock, to optimistically prepare obsoleted info // optimistically prepare obsoletes outside of rootLock
s.rootLock.RLock() s.rootLock.RLock()
for _, seg := range s.root.segment { root := s.root
root.AddRef()
s.rootLock.RUnlock()
for _, seg := range root.segment {
delta, err := seg.segment.DocNumbers(ids) delta, err := seg.segment.DocNumbers(ids)
if err != nil { if err != nil {
s.rootLock.RUnlock()
return err return err
} }
introduction.obsoletes[seg.id] = delta introduction.obsoletes[seg.id] = delta
} }
s.rootLock.RUnlock()
_ = root.DecRef()
s.introductions <- introduction s.introductions <- introduction

View File

@ -187,79 +187,42 @@ func persistBase(memSegment *mem.Segment, cr *CountHashWriter, chunkFactor uint3
} }
func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) { func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) {
var curr int var curr int
var metaBuf bytes.Buffer var metaBuf bytes.Buffer
var data, compressed []byte var data, compressed []byte
metaEncoder := govarint.NewU64Base128Encoder(&metaBuf)
docNumOffsets := make(map[int]uint64, len(memSegment.Stored)) docNumOffsets := make(map[int]uint64, len(memSegment.Stored))
for docNum, storedValues := range memSegment.Stored { for docNum, storedValues := range memSegment.Stored {
if docNum != 0 { if docNum != 0 {
// reset buffer if necessary // reset buffer if necessary
curr = 0
metaBuf.Reset() metaBuf.Reset()
data = data[:0] data = data[:0]
compressed = compressed[:0] compressed = compressed[:0]
curr = 0
} }
metaEncoder := govarint.NewU64Base128Encoder(&metaBuf)
st := memSegment.StoredTypes[docNum] st := memSegment.StoredTypes[docNum]
sp := memSegment.StoredPos[docNum] sp := memSegment.StoredPos[docNum]
// encode fields in order // encode fields in order
for fieldID := range memSegment.FieldsInv { for fieldID := range memSegment.FieldsInv {
if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok { if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok {
// has stored values for this field
num := len(storedFieldValues)
stf := st[uint16(fieldID)] stf := st[uint16(fieldID)]
spf := sp[uint16(fieldID)] spf := sp[uint16(fieldID)]
// process each value var err2 error
for i := 0; i < num; i++ { curr, data, err2 = persistStoredFieldValues(fieldID,
// encode field storedFieldValues, stf, spf, curr, metaEncoder, data)
_, err2 := metaEncoder.PutU64(uint64(fieldID)) if err2 != nil {
if err2 != nil { return 0, err2
return 0, err2
}
// encode type
_, err2 = metaEncoder.PutU64(uint64(stf[i]))
if err2 != nil {
return 0, err2
}
// encode start offset
_, err2 = metaEncoder.PutU64(uint64(curr))
if err2 != nil {
return 0, err2
}
// end len
_, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i])))
if err2 != nil {
return 0, err2
}
// encode number of array pos
_, err2 = metaEncoder.PutU64(uint64(len(spf[i])))
if err2 != nil {
return 0, err2
}
// encode all array positions
for _, pos := range spf[i] {
_, err2 = metaEncoder.PutU64(pos)
if err2 != nil {
return 0, err2
}
}
// append data
data = append(data, storedFieldValues[i]...)
// update curr
curr += len(storedFieldValues[i])
} }
} }
} }
metaEncoder.Close()
metaEncoder.Close()
metaBytes := metaBuf.Bytes() metaBytes := metaBuf.Bytes()
// compress the data // compress the data
@ -299,6 +262,51 @@ func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error)
return rv, nil return rv, nil
} }
func persistStoredFieldValues(fieldID int,
storedFieldValues [][]byte, stf []byte, spf [][]uint64,
curr int, metaEncoder *govarint.Base128Encoder, data []byte) (
int, []byte, error) {
for i := 0; i < len(storedFieldValues); i++ {
// encode field
_, err := metaEncoder.PutU64(uint64(fieldID))
if err != nil {
return 0, nil, err
}
// encode type
_, err = metaEncoder.PutU64(uint64(stf[i]))
if err != nil {
return 0, nil, err
}
// encode start offset
_, err = metaEncoder.PutU64(uint64(curr))
if err != nil {
return 0, nil, err
}
// end len
_, err = metaEncoder.PutU64(uint64(len(storedFieldValues[i])))
if err != nil {
return 0, nil, err
}
// encode number of array pos
_, err = metaEncoder.PutU64(uint64(len(spf[i])))
if err != nil {
return 0, nil, err
}
// encode all array positions
for _, pos := range spf[i] {
_, err = metaEncoder.PutU64(pos)
if err != nil {
return 0, nil, err
}
}
data = append(data, storedFieldValues[i]...)
curr += len(storedFieldValues[i])
}
return curr, data, nil
}
func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) { func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) {
var freqOffsets, locOfffsets []uint64 var freqOffsets, locOfffsets []uint64
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1)) tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
@ -580,7 +588,7 @@ func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
if err != nil { if err != nil {
return nil, err return nil, err
} }
// resetting encoder for the next field // reseting encoder for the next field
fdvEncoder.Reset() fdvEncoder.Reset()
} }

View File

@ -39,7 +39,7 @@ type chunkedContentCoder struct {
// MetaData represents the data information inside a // MetaData represents the data information inside a
// chunk. // chunk.
type MetaData struct { type MetaData struct {
DocID uint64 // docid of the data inside the chunk DocNum uint64 // docNum of the data inside the chunk
DocDvLoc uint64 // starting offset for a given docid DocDvLoc uint64 // starting offset for a given docid
DocDvLen uint64 // length of data inside the chunk for the given docid DocDvLen uint64 // length of data inside the chunk for the given docid
} }
@ -52,7 +52,7 @@ func newChunkedContentCoder(chunkSize uint64,
rv := &chunkedContentCoder{ rv := &chunkedContentCoder{
chunkSize: chunkSize, chunkSize: chunkSize,
chunkLens: make([]uint64, total), chunkLens: make([]uint64, total),
chunkMeta: []MetaData{}, chunkMeta: make([]MetaData, 0, total),
} }
return rv return rv
@ -68,7 +68,7 @@ func (c *chunkedContentCoder) Reset() {
for i := range c.chunkLens { for i := range c.chunkLens {
c.chunkLens[i] = 0 c.chunkLens[i] = 0
} }
c.chunkMeta = []MetaData{} c.chunkMeta = c.chunkMeta[:0]
} }
// Close indicates you are done calling Add() this allows // Close indicates you are done calling Add() this allows
@ -88,7 +88,7 @@ func (c *chunkedContentCoder) flushContents() error {
// write out the metaData slice // write out the metaData slice
for _, meta := range c.chunkMeta { for _, meta := range c.chunkMeta {
_, err := writeUvarints(&c.chunkMetaBuf, meta.DocID, meta.DocDvLoc, meta.DocDvLen) _, err := writeUvarints(&c.chunkMetaBuf, meta.DocNum, meta.DocDvLoc, meta.DocDvLen)
if err != nil { if err != nil {
return err return err
} }
@ -118,7 +118,7 @@ func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error {
// clearing the chunk specific meta for next chunk // clearing the chunk specific meta for next chunk
c.chunkBuf.Reset() c.chunkBuf.Reset()
c.chunkMetaBuf.Reset() c.chunkMetaBuf.Reset()
c.chunkMeta = []MetaData{} c.chunkMeta = c.chunkMeta[:0]
c.currChunk = chunk c.currChunk = chunk
} }
@ -130,7 +130,7 @@ func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error {
} }
c.chunkMeta = append(c.chunkMeta, MetaData{ c.chunkMeta = append(c.chunkMeta, MetaData{
DocID: docNum, DocNum: docNum,
DocDvLoc: uint64(dvOffset), DocDvLoc: uint64(dvOffset),
DocDvLen: uint64(dvSize), DocDvLen: uint64(dvSize),
}) })

View File

@ -99,7 +99,7 @@ func (s *SegmentBase) loadFieldDocValueIterator(field string,
func (di *docValueIterator) loadDvChunk(chunkNumber, func (di *docValueIterator) loadDvChunk(chunkNumber,
localDocNum uint64, s *SegmentBase) error { localDocNum uint64, s *SegmentBase) error {
// advance to the chunk where the docValues // advance to the chunk where the docValues
// reside for the given docID // reside for the given docNum
destChunkDataLoc := di.dvDataLoc destChunkDataLoc := di.dvDataLoc
for i := 0; i < int(chunkNumber); i++ { for i := 0; i < int(chunkNumber); i++ {
destChunkDataLoc += di.chunkLens[i] destChunkDataLoc += di.chunkLens[i]
@ -116,7 +116,7 @@ func (di *docValueIterator) loadDvChunk(chunkNumber,
offset := uint64(0) offset := uint64(0)
di.curChunkHeader = make([]MetaData, int(numDocs)) di.curChunkHeader = make([]MetaData, int(numDocs))
for i := 0; i < int(numDocs); i++ { for i := 0; i < int(numDocs); i++ {
di.curChunkHeader[i].DocID, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) di.curChunkHeader[i].DocNum, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(read) offset += uint64(read)
di.curChunkHeader[i].DocDvLoc, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) di.curChunkHeader[i].DocDvLoc, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(read) offset += uint64(read)
@ -131,10 +131,10 @@ func (di *docValueIterator) loadDvChunk(chunkNumber,
return nil return nil
} }
func (di *docValueIterator) visitDocValues(docID uint64, func (di *docValueIterator) visitDocValues(docNum uint64,
visitor index.DocumentFieldTermVisitor) error { visitor index.DocumentFieldTermVisitor) error {
// binary search the term locations for the docID // binary search the term locations for the docNum
start, length := di.getDocValueLocs(docID) start, length := di.getDocValueLocs(docNum)
if start == math.MaxUint64 || length == math.MaxUint64 { if start == math.MaxUint64 || length == math.MaxUint64 {
return nil return nil
} }
@ -144,7 +144,7 @@ func (di *docValueIterator) visitDocValues(docID uint64,
return err return err
} }
// pick the terms for the given docID // pick the terms for the given docNum
uncompressed = uncompressed[start : start+length] uncompressed = uncompressed[start : start+length]
for { for {
i := bytes.Index(uncompressed, termSeparatorSplitSlice) i := bytes.Index(uncompressed, termSeparatorSplitSlice)
@ -159,11 +159,11 @@ func (di *docValueIterator) visitDocValues(docID uint64,
return nil return nil
} }
func (di *docValueIterator) getDocValueLocs(docID uint64) (uint64, uint64) { func (di *docValueIterator) getDocValueLocs(docNum uint64) (uint64, uint64) {
i := sort.Search(len(di.curChunkHeader), func(i int) bool { i := sort.Search(len(di.curChunkHeader), func(i int) bool {
return di.curChunkHeader[i].DocID >= docID return di.curChunkHeader[i].DocNum >= docNum
}) })
if i < len(di.curChunkHeader) && di.curChunkHeader[i].DocID == docID { if i < len(di.curChunkHeader) && di.curChunkHeader[i].DocNum == docNum {
return di.curChunkHeader[i].DocDvLoc, di.curChunkHeader[i].DocDvLen return di.curChunkHeader[i].DocDvLoc, di.curChunkHeader[i].DocDvLen
} }
return math.MaxUint64, math.MaxUint64 return math.MaxUint64, math.MaxUint64

View File

@ -52,41 +52,15 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
// wrap it for counting (tracking offsets) // wrap it for counting (tracking offsets)
cr := NewCountHashWriter(br) cr := NewCountHashWriter(br)
fieldsInv := mergeFields(segments) newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, err :=
fieldsMap := mapFields(fieldsInv) mergeToWriter(segments, drops, chunkFactor, cr)
var newDocNums [][]uint64
var storedIndexOffset uint64
fieldDvLocsOffset := uint64(fieldNotUninverted)
var dictLocs []uint64
newSegDocCount := computeNewDocCount(segments, drops)
if newSegDocCount > 0 {
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
fieldsMap, fieldsInv, newSegDocCount, cr)
if err != nil {
cleanup()
return nil, err
}
dictLocs, fieldDvLocsOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap,
newDocNums, newSegDocCount, chunkFactor, cr)
if err != nil {
cleanup()
return nil, err
}
} else {
dictLocs = make([]uint64, len(fieldsInv))
}
fieldsIndexOffset, err := persistFields(fieldsInv, cr, dictLocs)
if err != nil { if err != nil {
cleanup() cleanup()
return nil, err return nil, err
} }
err = persistFooter(newSegDocCount, storedIndexOffset, err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset,
fieldsIndexOffset, fieldDvLocsOffset, chunkFactor, cr.Sum32(), cr) docValueOffset, chunkFactor, cr.Sum32(), cr)
if err != nil { if err != nil {
cleanup() cleanup()
return nil, err return nil, err
@ -113,6 +87,43 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
return newDocNums, nil return newDocNums, nil
} }
func mergeToWriter(segments []*Segment, drops []*roaring.Bitmap,
chunkFactor uint32, cr *CountHashWriter) (
newDocNums [][]uint64,
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64,
err error) {
docValueOffset = uint64(fieldNotUninverted)
var dictLocs []uint64
fieldsInv := mergeFields(segments)
fieldsMap := mapFields(fieldsInv)
numDocs = computeNewDocCount(segments, drops)
if numDocs > 0 {
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
fieldsMap, fieldsInv, numDocs, cr)
if err != nil {
return nil, 0, 0, 0, 0, err
}
dictLocs, docValueOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap,
newDocNums, numDocs, chunkFactor, cr)
if err != nil {
return nil, 0, 0, 0, 0, err
}
} else {
dictLocs = make([]uint64, len(fieldsInv))
}
fieldsIndexOffset, err = persistFields(fieldsInv, cr, dictLocs)
if err != nil {
return nil, 0, 0, 0, 0, err
}
return newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, nil
}
// mapFields takes the fieldsInv list and builds the map // mapFields takes the fieldsInv list and builds the map
func mapFields(fields []string) map[string]uint16 { func mapFields(fields []string) map[string]uint16 {
rv := make(map[string]uint16, len(fields)) rv := make(map[string]uint16, len(fields))
@ -453,47 +464,14 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap,
for fieldID := range fieldsInv { for fieldID := range fieldsInv {
storedFieldValues := vals[int(fieldID)] storedFieldValues := vals[int(fieldID)]
// has stored values for this field stf := typs[int(fieldID)]
num := len(storedFieldValues) spf := poss[int(fieldID)]
// process each value var err2 error
for i := 0; i < num; i++ { curr, data, err2 = persistStoredFieldValues(fieldID,
// encode field storedFieldValues, stf, spf, curr, metaEncoder, data)
_, err2 := metaEncoder.PutU64(uint64(fieldID)) if err2 != nil {
if err2 != nil { return 0, nil, err2
return 0, nil, err2
}
// encode type
_, err2 = metaEncoder.PutU64(uint64(typs[int(fieldID)][i]))
if err2 != nil {
return 0, nil, err2
}
// encode start offset
_, err2 = metaEncoder.PutU64(uint64(curr))
if err2 != nil {
return 0, nil, err2
}
// end len
_, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i])))
if err2 != nil {
return 0, nil, err2
}
// encode number of array pos
_, err2 = metaEncoder.PutU64(uint64(len(poss[int(fieldID)][i])))
if err2 != nil {
return 0, nil, err2
}
// encode all array positions
for j := 0; j < len(poss[int(fieldID)][i]); j++ {
_, err2 = metaEncoder.PutU64(poss[int(fieldID)][i][j])
if err2 != nil {
return 0, nil, err2
}
}
// append data
data = append(data, storedFieldValues[i]...)
// update curr
curr += len(storedFieldValues[i])
} }
} }