0
0
Fork 0

scorch zap merge & build share persistStoredFieldValues()

Refactored out a helper func, persistStoredFieldValues(), that both
the persistence and merge codepaths now share.
This commit is contained in:
Steve Yen 2018-01-31 15:08:31 -08:00
parent 714f5321e0
commit eb21bf8315
2 changed files with 59 additions and 87 deletions

View File

@ -187,79 +187,42 @@ func persistBase(memSegment *mem.Segment, cr *CountHashWriter, chunkFactor uint3
} }
func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) { func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) {
var curr int var curr int
var metaBuf bytes.Buffer var metaBuf bytes.Buffer
var data, compressed []byte var data, compressed []byte
metaEncoder := govarint.NewU64Base128Encoder(&metaBuf)
docNumOffsets := make(map[int]uint64, len(memSegment.Stored)) docNumOffsets := make(map[int]uint64, len(memSegment.Stored))
for docNum, storedValues := range memSegment.Stored { for docNum, storedValues := range memSegment.Stored {
if docNum != 0 { if docNum != 0 {
// reset buffer if necessary // reset buffer if necessary
curr = 0
metaBuf.Reset() metaBuf.Reset()
data = data[:0] data = data[:0]
compressed = compressed[:0] compressed = compressed[:0]
curr = 0
} }
metaEncoder := govarint.NewU64Base128Encoder(&metaBuf)
st := memSegment.StoredTypes[docNum] st := memSegment.StoredTypes[docNum]
sp := memSegment.StoredPos[docNum] sp := memSegment.StoredPos[docNum]
// encode fields in order // encode fields in order
for fieldID := range memSegment.FieldsInv { for fieldID := range memSegment.FieldsInv {
if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok { if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok {
// has stored values for this field
num := len(storedFieldValues)
stf := st[uint16(fieldID)] stf := st[uint16(fieldID)]
spf := sp[uint16(fieldID)] spf := sp[uint16(fieldID)]
// process each value var err2 error
for i := 0; i < num; i++ { curr, data, err2 = persistStoredFieldValues(fieldID,
// encode field storedFieldValues, stf, spf, curr, metaEncoder, data)
_, err2 := metaEncoder.PutU64(uint64(fieldID)) if err2 != nil {
if err2 != nil { return 0, err2
return 0, err2
}
// encode type
_, err2 = metaEncoder.PutU64(uint64(stf[i]))
if err2 != nil {
return 0, err2
}
// encode start offset
_, err2 = metaEncoder.PutU64(uint64(curr))
if err2 != nil {
return 0, err2
}
// end len
_, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i])))
if err2 != nil {
return 0, err2
}
// encode number of array pos
_, err2 = metaEncoder.PutU64(uint64(len(spf[i])))
if err2 != nil {
return 0, err2
}
// encode all array positions
for _, pos := range spf[i] {
_, err2 = metaEncoder.PutU64(pos)
if err2 != nil {
return 0, err2
}
}
// append data
data = append(data, storedFieldValues[i]...)
// update curr
curr += len(storedFieldValues[i])
} }
} }
} }
metaEncoder.Close()
metaEncoder.Close()
metaBytes := metaBuf.Bytes() metaBytes := metaBuf.Bytes()
// compress the data // compress the data
@ -299,6 +262,51 @@ func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error)
return rv, nil return rv, nil
} }
func persistStoredFieldValues(fieldID int,
storedFieldValues [][]byte, stf []byte, spf [][]uint64,
curr int, metaEncoder *govarint.Base128Encoder, data []byte) (
int, []byte, error) {
for i := 0; i < len(storedFieldValues); i++ {
// encode field
_, err := metaEncoder.PutU64(uint64(fieldID))
if err != nil {
return 0, nil, err
}
// encode type
_, err = metaEncoder.PutU64(uint64(stf[i]))
if err != nil {
return 0, nil, err
}
// encode start offset
_, err = metaEncoder.PutU64(uint64(curr))
if err != nil {
return 0, nil, err
}
// end len
_, err = metaEncoder.PutU64(uint64(len(storedFieldValues[i])))
if err != nil {
return 0, nil, err
}
// encode number of array pos
_, err = metaEncoder.PutU64(uint64(len(spf[i])))
if err != nil {
return 0, nil, err
}
// encode all array positions
for _, pos := range spf[i] {
_, err = metaEncoder.PutU64(pos)
if err != nil {
return 0, nil, err
}
}
data = append(data, storedFieldValues[i]...)
curr += len(storedFieldValues[i])
}
return curr, data, nil
}
func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) { func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) {
var freqOffsets, locOfffsets []uint64 var freqOffsets, locOfffsets []uint64
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1)) tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))

View File

@ -453,50 +453,14 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap,
for fieldID := range fieldsInv { for fieldID := range fieldsInv {
storedFieldValues := vals[int(fieldID)] storedFieldValues := vals[int(fieldID)]
// has stored values for this field
num := len(storedFieldValues)
stf := typs[int(fieldID)] stf := typs[int(fieldID)]
spf := poss[int(fieldID)] spf := poss[int(fieldID)]
// process each value var err2 error
for i := 0; i < num; i++ { curr, data, err2 = persistStoredFieldValues(fieldID,
// encode field storedFieldValues, stf, spf, curr, metaEncoder, data)
_, err2 := metaEncoder.PutU64(uint64(fieldID)) if err2 != nil {
if err2 != nil { return 0, nil, err2
return 0, nil, err2
}
// encode type
_, err2 = metaEncoder.PutU64(uint64(stf[i]))
if err2 != nil {
return 0, nil, err2
}
// encode start offset
_, err2 = metaEncoder.PutU64(uint64(curr))
if err2 != nil {
return 0, nil, err2
}
// end len
_, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i])))
if err2 != nil {
return 0, nil, err2
}
// encode number of array pos
_, err2 = metaEncoder.PutU64(uint64(len(spf[i])))
if err2 != nil {
return 0, nil, err2
}
// encode all array positions
for _, pos := range spf[i] {
_, err2 = metaEncoder.PutU64(pos)
if err2 != nil {
return 0, nil, err2
}
}
// append data
data = append(data, storedFieldValues[i]...)
// update curr
curr += len(storedFieldValues[i])
} }
} }