From eb21bf83154344c183df51c35f67159df1f2383b Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 31 Jan 2018 15:08:31 -0800 Subject: [PATCH] scorch zap merge & build share persistStoredFieldValues() Refactored out a helper func, persistStoredFieldValues(), that both the persistence and merge codepaths now share. --- index/scorch/segment/zap/build.go | 100 ++++++++++++++++-------------- index/scorch/segment/zap/merge.go | 46 ++------------ 2 files changed, 59 insertions(+), 87 deletions(-) diff --git a/index/scorch/segment/zap/build.go b/index/scorch/segment/zap/build.go index 58f9faea..e6625528 100644 --- a/index/scorch/segment/zap/build.go +++ b/index/scorch/segment/zap/build.go @@ -187,79 +187,42 @@ func persistBase(memSegment *mem.Segment, cr *CountHashWriter, chunkFactor uint3 } func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) { - var curr int var metaBuf bytes.Buffer var data, compressed []byte + metaEncoder := govarint.NewU64Base128Encoder(&metaBuf) + docNumOffsets := make(map[int]uint64, len(memSegment.Stored)) for docNum, storedValues := range memSegment.Stored { if docNum != 0 { // reset buffer if necessary + curr = 0 metaBuf.Reset() data = data[:0] compressed = compressed[:0] - curr = 0 } - metaEncoder := govarint.NewU64Base128Encoder(&metaBuf) - st := memSegment.StoredTypes[docNum] sp := memSegment.StoredPos[docNum] // encode fields in order for fieldID := range memSegment.FieldsInv { if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok { - // has stored values for this field - num := len(storedFieldValues) - stf := st[uint16(fieldID)] spf := sp[uint16(fieldID)] - // process each value - for i := 0; i < num; i++ { - // encode field - _, err2 := metaEncoder.PutU64(uint64(fieldID)) - if err2 != nil { - return 0, err2 - } - // encode type - _, err2 = metaEncoder.PutU64(uint64(stf[i])) - if err2 != nil { - return 0, err2 - } - // encode start offset - _, err2 = metaEncoder.PutU64(uint64(curr)) - if err2 != nil { - return 0, err2 - } - // end len - _, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) - if err2 != nil { - return 0, err2 - } - // encode number of array pos - _, err2 = metaEncoder.PutU64(uint64(len(spf[i]))) - if err2 != nil { - return 0, err2 - } - // encode all array positions - for _, pos := range spf[i] { - _, err2 = metaEncoder.PutU64(pos) - if err2 != nil { - return 0, err2 - } - } - // append data - data = append(data, storedFieldValues[i]...) - // update curr - curr += len(storedFieldValues[i]) + var err2 error + curr, data, err2 = persistStoredFieldValues(fieldID, + storedFieldValues, stf, spf, curr, metaEncoder, data) + if err2 != nil { + return 0, err2 } } } - metaEncoder.Close() + metaEncoder.Close() metaBytes := metaBuf.Bytes() // compress the data @@ -299,6 +262,51 @@ func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) return rv, nil } +func persistStoredFieldValues(fieldID int, + storedFieldValues [][]byte, stf []byte, spf [][]uint64, + curr int, metaEncoder *govarint.Base128Encoder, data []byte) ( + int, []byte, error) { + for i := 0; i < len(storedFieldValues); i++ { + // encode field + _, err := metaEncoder.PutU64(uint64(fieldID)) + if err != nil { + return 0, nil, err + } + // encode type + _, err = metaEncoder.PutU64(uint64(stf[i])) + if err != nil { + return 0, nil, err + } + // encode start offset + _, err = metaEncoder.PutU64(uint64(curr)) + if err != nil { + return 0, nil, err + } + // end len + _, err = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) + if err != nil { + return 0, nil, err + } + // encode number of array pos + _, err = metaEncoder.PutU64(uint64(len(spf[i]))) + if err != nil { + return 0, nil, err + } + // encode all array positions + for _, pos := range spf[i] { + _, err = metaEncoder.PutU64(pos) + if err != nil { + return 0, nil, err + } + } + + data = append(data, storedFieldValues[i]...) + curr += len(storedFieldValues[i]) + } + + return curr, data, nil +} + func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) { var freqOffsets, locOfffsets []uint64 tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1)) diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index 1afe99f4..8fdb07af 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -453,50 +453,14 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, for fieldID := range fieldsInv { storedFieldValues := vals[int(fieldID)] - // has stored values for this field - num := len(storedFieldValues) - stf := typs[int(fieldID)] spf := poss[int(fieldID)] - // process each value - for i := 0; i < num; i++ { - // encode field - _, err2 := metaEncoder.PutU64(uint64(fieldID)) - if err2 != nil { - return 0, nil, err2 - } - // encode type - _, err2 = metaEncoder.PutU64(uint64(stf[i])) - if err2 != nil { - return 0, nil, err2 - } - // encode start offset - _, err2 = metaEncoder.PutU64(uint64(curr)) - if err2 != nil { - return 0, nil, err2 - } - // end len - _, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) - if err2 != nil { - return 0, nil, err2 - } - // encode number of array pos - _, err2 = metaEncoder.PutU64(uint64(len(spf[i]))) - if err2 != nil { - return 0, nil, err2 - } - // encode all array positions - for _, pos := range spf[i] { - _, err2 = metaEncoder.PutU64(pos) - if err2 != nil { - return 0, nil, err2 - } - } - // append data - data = append(data, storedFieldValues[i]...) - // update curr - curr += len(storedFieldValues[i]) + var err2 error + curr, data, err2 = persistStoredFieldValues(fieldID, + storedFieldValues, stf, spf, curr, metaEncoder, data) + if err2 != nil { + return 0, nil, err2 } }