From 714f5321e0dec8e1ff3b48ea0f7411775de9dcc7 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 31 Jan 2018 14:46:28 -0800 Subject: [PATCH 1/5] scorch zap merge storedFieldVals inner loop optimization --- index/scorch/segment/zap/merge.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index cc348d72..1afe99f4 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -456,6 +456,9 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, // has stored values for this field num := len(storedFieldValues) + stf := typs[int(fieldID)] + spf := poss[int(fieldID)] + // process each value for i := 0; i < num; i++ { // encode field @@ -464,7 +467,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, return 0, nil, err2 } // encode type - _, err2 = metaEncoder.PutU64(uint64(typs[int(fieldID)][i])) + _, err2 = metaEncoder.PutU64(uint64(stf[i])) if err2 != nil { return 0, nil, err2 } @@ -479,13 +482,13 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, return 0, nil, err2 } // encode number of array pos - _, err2 = metaEncoder.PutU64(uint64(len(poss[int(fieldID)][i]))) + _, err2 = metaEncoder.PutU64(uint64(len(spf[i]))) if err2 != nil { return 0, nil, err2 } // encode all array positions - for j := 0; j < len(poss[int(fieldID)][i]); j++ { - _, err2 = metaEncoder.PutU64(poss[int(fieldID)][i][j]) + for _, pos := range spf[i] { + _, err2 = metaEncoder.PutU64(pos) if err2 != nil { return 0, nil, err2 } From eb21bf83154344c183df51c35f67159df1f2383b Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 31 Jan 2018 15:08:31 -0800 Subject: [PATCH 2/5] scorch zap merge & build share persistStoredFieldValues() Refactored out a helper func, persistStoredFieldValues(), that both the persistence and merge codepaths now share. --- index/scorch/segment/zap/build.go | 100 ++++++++++++++++-------------- index/scorch/segment/zap/merge.go | 46 ++------------ 2 files changed, 59 insertions(+), 87 deletions(-) diff --git a/index/scorch/segment/zap/build.go b/index/scorch/segment/zap/build.go index 58f9faea..e6625528 100644 --- a/index/scorch/segment/zap/build.go +++ b/index/scorch/segment/zap/build.go @@ -187,79 +187,42 @@ func persistBase(memSegment *mem.Segment, cr *CountHashWriter, chunkFactor uint3 } func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) { - var curr int var metaBuf bytes.Buffer var data, compressed []byte + metaEncoder := govarint.NewU64Base128Encoder(&metaBuf) + docNumOffsets := make(map[int]uint64, len(memSegment.Stored)) for docNum, storedValues := range memSegment.Stored { if docNum != 0 { // reset buffer if necessary + curr = 0 metaBuf.Reset() data = data[:0] compressed = compressed[:0] - curr = 0 } - metaEncoder := govarint.NewU64Base128Encoder(&metaBuf) - st := memSegment.StoredTypes[docNum] sp := memSegment.StoredPos[docNum] // encode fields in order for fieldID := range memSegment.FieldsInv { if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok { - // has stored values for this field - num := len(storedFieldValues) - stf := st[uint16(fieldID)] spf := sp[uint16(fieldID)] - // process each value - for i := 0; i < num; i++ { - // encode field - _, err2 := metaEncoder.PutU64(uint64(fieldID)) - if err2 != nil { - return 0, err2 - } - // encode type - _, err2 = metaEncoder.PutU64(uint64(stf[i])) - if err2 != nil { - return 0, err2 - } - // encode start offset - _, err2 = metaEncoder.PutU64(uint64(curr)) - if err2 != nil { - return 0, err2 - } - // end len - _, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) - if err2 != nil { - return 0, err2 - } - // encode number of array pos - _, err2 = metaEncoder.PutU64(uint64(len(spf[i]))) - if err2 != nil { - return 0, err2 - } - // encode all array positions - for _, pos := range spf[i] { - _, err2 = metaEncoder.PutU64(pos) - if err2 != nil { - return 0, err2 - } - } - // append data - data = append(data, storedFieldValues[i]...) - // update curr - curr += len(storedFieldValues[i]) + var err2 error + curr, data, err2 = persistStoredFieldValues(fieldID, + storedFieldValues, stf, spf, curr, metaEncoder, data) + if err2 != nil { + return 0, err2 } } } - metaEncoder.Close() + metaEncoder.Close() metaBytes := metaBuf.Bytes() // compress the data @@ -299,6 +262,51 @@ func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) return rv, nil } +func persistStoredFieldValues(fieldID int, + storedFieldValues [][]byte, stf []byte, spf [][]uint64, + curr int, metaEncoder *govarint.Base128Encoder, data []byte) ( + int, []byte, error) { + for i := 0; i < len(storedFieldValues); i++ { + // encode field + _, err := metaEncoder.PutU64(uint64(fieldID)) + if err != nil { + return 0, nil, err + } + // encode type + _, err = metaEncoder.PutU64(uint64(stf[i])) + if err != nil { + return 0, nil, err + } + // encode start offset + _, err = metaEncoder.PutU64(uint64(curr)) + if err != nil { + return 0, nil, err + } + // end len + _, err = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) + if err != nil { + return 0, nil, err + } + // encode number of array pos + _, err = metaEncoder.PutU64(uint64(len(spf[i]))) + if err != nil { + return 0, nil, err + } + // encode all array positions + for _, pos := range spf[i] { + _, err = metaEncoder.PutU64(pos) + if err != nil { + return 0, nil, err + } + } + + data = append(data, storedFieldValues[i]...) + curr += len(storedFieldValues[i]) + } + + return curr, data, nil +} + func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) { var freqOffsets, locOfffsets []uint64 tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1)) diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index 1afe99f4..8fdb07af 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -453,50 +453,14 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, for fieldID := range fieldsInv { storedFieldValues := vals[int(fieldID)] - // has stored values for this field - num := len(storedFieldValues) - stf := typs[int(fieldID)] spf := poss[int(fieldID)] - // process each value - for i := 0; i < num; i++ { - // encode field - _, err2 := metaEncoder.PutU64(uint64(fieldID)) - if err2 != nil { - return 0, nil, err2 - } - // encode type - _, err2 = metaEncoder.PutU64(uint64(stf[i])) - if err2 != nil { - return 0, nil, err2 - } - // encode start offset - _, err2 = metaEncoder.PutU64(uint64(curr)) - if err2 != nil { - return 0, nil, err2 - } - // end len - _, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) - if err2 != nil { - return 0, nil, err2 - } - // encode number of array pos - _, err2 = metaEncoder.PutU64(uint64(len(spf[i]))) - if err2 != nil { - return 0, nil, err2 - } - // encode all array positions - for _, pos := range spf[i] { - _, err2 = metaEncoder.PutU64(pos) - if err2 != nil { - return 0, nil, err2 - } - } - // append data - data = append(data, storedFieldValues[i]...) - // update curr - curr += len(storedFieldValues[i]) + var err2 error + curr, data, err2 = persistStoredFieldValues(fieldID, + storedFieldValues, stf, spf, curr, metaEncoder, data) + if err2 != nil { + return 0, nil, err2 } } From 65786557584031cb74749681939f673245842b42 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 31 Jan 2018 15:48:03 -0800 Subject: [PATCH 3/5] scorch zap refactored out mergeToWriter() func This is a step towards supporting in-memory zap segment merging. --- index/scorch/segment/zap/merge.go | 71 ++++++++++++++++++------------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index 8fdb07af..327446c5 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -52,41 +52,15 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string, // wrap it for counting (tracking offsets) cr := NewCountHashWriter(br) - fieldsInv := mergeFields(segments) - fieldsMap := mapFields(fieldsInv) - - var newDocNums [][]uint64 - var storedIndexOffset uint64 - fieldDvLocsOffset := uint64(fieldNotUninverted) - var dictLocs []uint64 - - newSegDocCount := computeNewDocCount(segments, drops) - if newSegDocCount > 0 { - storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, - fieldsMap, fieldsInv, newSegDocCount, cr) - if err != nil { - cleanup() - return nil, err - } - - dictLocs, fieldDvLocsOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap, - newDocNums, newSegDocCount, chunkFactor, cr) - if err != nil { - cleanup() - return nil, err - } - } else { - dictLocs = make([]uint64, len(fieldsInv)) - } - - fieldsIndexOffset, err := persistFields(fieldsInv, cr, dictLocs) + newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, err := + mergeToWriter(segments, drops, chunkFactor, cr) if err != nil { cleanup() return nil, err } - err = persistFooter(newSegDocCount, storedIndexOffset, - fieldsIndexOffset, fieldDvLocsOffset, chunkFactor, cr.Sum32(), cr) + err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, + docValueOffset, chunkFactor, cr.Sum32(), cr) if err != nil { cleanup() return nil, err @@ -113,6 +87,43 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string, return newDocNums, nil } +func mergeToWriter(segments []*Segment, drops []*roaring.Bitmap, + chunkFactor uint32, cr *CountHashWriter) ( + newDocNums [][]uint64, + numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64, + err error) { + docValueOffset = uint64(fieldNotUninverted) + + var dictLocs []uint64 + + fieldsInv := mergeFields(segments) + fieldsMap := mapFields(fieldsInv) + + numDocs = computeNewDocCount(segments, drops) + if numDocs > 0 { + storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, + fieldsMap, fieldsInv, numDocs, cr) + if err != nil { + return nil, 0, 0, 0, 0, err + } + + dictLocs, docValueOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap, + newDocNums, numDocs, chunkFactor, cr) + if err != nil { + return nil, 0, 0, 0, 0, err + } + } else { + dictLocs = make([]uint64, len(fieldsInv)) + } + + fieldsIndexOffset, err = persistFields(fieldsInv, cr, dictLocs) + if err != nil { + return nil, 0, 0, 0, 0, err + } + + return newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, nil +} + // mapFields takes the fieldsInv list and builds the map func mapFields(fields []string) map[string]uint16 { rv := make(map[string]uint16, len(fields)) From 3da191852de9b0aa860532de7ec39256008d8252 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Thu, 1 Feb 2018 16:59:59 -0800 Subject: [PATCH 4/5] scorch zap tighten up prepareSegment()'s lock area --- index/scorch/scorch.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 31107765..08fffa25 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -310,17 +310,21 @@ func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string, introduction.persisted = make(chan error, 1) } - // get read lock, to optimistically prepare obsoleted info + // optimistically prepare obsoletes outside of rootLock s.rootLock.RLock() - for _, seg := range s.root.segment { + root := s.root + root.AddRef() + s.rootLock.RUnlock() + + for _, seg := range root.segment { delta, err := seg.segment.DocNumbers(ids) if err != nil { - s.rootLock.RUnlock() return err } introduction.obsoletes[seg.id] = delta } - s.rootLock.RUnlock() + + _ = root.DecRef() s.introductions <- introduction From c09e2a08cadda4973bd062baf9a19fcc07b86a5e Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Sat, 3 Feb 2018 10:51:24 -0800 Subject: [PATCH 5/5] scorch zap chunkedContentCoder reuses chunk metadata slice memory And, renamed the chunk MetaData.DocID field to DocNum for naming correctness, where much of this commit is the mechanical effect of that rename. --- cmd/bleve/cmd/zap/docvalue.go | 27 ++++++++++++------------ index/scorch/segment/zap/build.go | 2 +- index/scorch/segment/zap/contentcoder.go | 12 +++++------ index/scorch/segment/zap/docvalues.go | 18 ++++++++-------- 4 files changed, 29 insertions(+), 30 deletions(-) diff --git a/cmd/bleve/cmd/zap/docvalue.go b/cmd/bleve/cmd/zap/docvalue.go index 165829fd..74397495 100644 --- a/cmd/bleve/cmd/zap/docvalue.go +++ b/cmd/bleve/cmd/zap/docvalue.go @@ -165,7 +165,7 @@ var docvalueCmd = &cobra.Command{ /* TODO => dump all chunk headers?? if len(args) == 3 && args[2] == ">" { - dumpChunkDocIDs(data, ) + dumpChunkDocNums(data, ) }*/ } @@ -187,7 +187,7 @@ var docvalueCmd = &cobra.Command{ docInChunk := uint64(localDocNum) / uint64(segment.ChunkFactor()) if numChunks < docInChunk { - return fmt.Errorf("no chunk exists for chunk number: %d for docID: %d", docInChunk, localDocNum) + return fmt.Errorf("no chunk exists for chunk number: %d for localDocNum: %d", docInChunk, localDocNum) } destChunkDataLoc := fieldDvLoc + offset @@ -207,7 +207,7 @@ var docvalueCmd = &cobra.Command{ offset = uint64(0) curChunkHeader := make([]zap.MetaData, int(numDocs)) for i := 0; i < int(numDocs); i++ { - curChunkHeader[i].DocID, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) + curChunkHeader[i].DocNum, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) offset += uint64(nread) curChunkHeader[i].DocDvLoc, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) offset += uint64(nread) @@ -221,8 +221,8 @@ var docvalueCmd = &cobra.Command{ start, length := getDocValueLocs(uint64(localDocNum), curChunkHeader) if start == math.MaxUint64 || length == math.MaxUint64 { - fmt.Printf("no field values found for docID %d\n", localDocNum) - fmt.Printf("Try docIDs present in chunk: %s\n", assortDocID(curChunkHeader)) + fmt.Printf("no field values found for localDocNum: %d\n", localDocNum) + fmt.Printf("Try docNums present in chunk: %s\n", metaDataDocNums(curChunkHeader)) return nil } // uncompress the already loaded data @@ -234,7 +234,7 @@ var docvalueCmd = &cobra.Command{ var termSeparator byte = 0xff var termSeparatorSplitSlice = []byte{termSeparator} - // pick the terms for the given docID + // pick the terms for the given docNum uncompressed = uncompressed[start : start+length] for { i := bytes.Index(uncompressed, termSeparatorSplitSlice) @@ -250,23 +250,22 @@ var docvalueCmd = &cobra.Command{ }, } -func getDocValueLocs(docID uint64, metaHeader []zap.MetaData) (uint64, uint64) { +func getDocValueLocs(docNum uint64, metaHeader []zap.MetaData) (uint64, uint64) { i := sort.Search(len(metaHeader), func(i int) bool { - return metaHeader[i].DocID >= docID + return metaHeader[i].DocNum >= docNum }) - if i < len(metaHeader) && metaHeader[i].DocID == docID { + if i < len(metaHeader) && metaHeader[i].DocNum == docNum { return metaHeader[i].DocDvLoc, metaHeader[i].DocDvLen } return math.MaxUint64, math.MaxUint64 } -func assortDocID(metaHeader []zap.MetaData) string { - docIDs := "" +func metaDataDocNums(metaHeader []zap.MetaData) string { + docNums := "" for _, meta := range metaHeader { - id := fmt.Sprintf("%d", meta.DocID) - docIDs += id + ", " + docNums += fmt.Sprintf("%d", meta.DocNum) + ", " } - return docIDs + return docNums } func init() { diff --git a/index/scorch/segment/zap/build.go b/index/scorch/segment/zap/build.go index e6625528..60d168e6 100644 --- a/index/scorch/segment/zap/build.go +++ b/index/scorch/segment/zap/build.go @@ -588,7 +588,7 @@ func persistDocValues(memSegment *mem.Segment, w *CountHashWriter, if err != nil { return nil, err } - // resetting encoder for the next field + // reseting encoder for the next field fdvEncoder.Reset() } diff --git a/index/scorch/segment/zap/contentcoder.go b/index/scorch/segment/zap/contentcoder.go index b0394049..83457146 100644 --- a/index/scorch/segment/zap/contentcoder.go +++ b/index/scorch/segment/zap/contentcoder.go @@ -39,7 +39,7 @@ type chunkedContentCoder struct { // MetaData represents the data information inside a // chunk. type MetaData struct { - DocID uint64 // docid of the data inside the chunk + DocNum uint64 // docNum of the data inside the chunk DocDvLoc uint64 // starting offset for a given docid DocDvLen uint64 // length of data inside the chunk for the given docid } @@ -52,7 +52,7 @@ func newChunkedContentCoder(chunkSize uint64, rv := &chunkedContentCoder{ chunkSize: chunkSize, chunkLens: make([]uint64, total), - chunkMeta: []MetaData{}, + chunkMeta: make([]MetaData, 0, total), } return rv @@ -68,7 +68,7 @@ func (c *chunkedContentCoder) Reset() { for i := range c.chunkLens { c.chunkLens[i] = 0 } - c.chunkMeta = []MetaData{} + c.chunkMeta = c.chunkMeta[:0] } // Close indicates you are done calling Add() this allows @@ -88,7 +88,7 @@ func (c *chunkedContentCoder) flushContents() error { // write out the metaData slice for _, meta := range c.chunkMeta { - _, err := writeUvarints(&c.chunkMetaBuf, meta.DocID, meta.DocDvLoc, meta.DocDvLen) + _, err := writeUvarints(&c.chunkMetaBuf, meta.DocNum, meta.DocDvLoc, meta.DocDvLen) if err != nil { return err } @@ -118,7 +118,7 @@ func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error { // clearing the chunk specific meta for next chunk c.chunkBuf.Reset() c.chunkMetaBuf.Reset() - c.chunkMeta = []MetaData{} + c.chunkMeta = c.chunkMeta[:0] c.currChunk = chunk } @@ -130,7 +130,7 @@ func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error { } c.chunkMeta = append(c.chunkMeta, MetaData{ - DocID: docNum, + DocNum: docNum, DocDvLoc: uint64(dvOffset), DocDvLen: uint64(dvSize), }) diff --git a/index/scorch/segment/zap/docvalues.go b/index/scorch/segment/zap/docvalues.go index fb5b348a..0514bd30 100644 --- a/index/scorch/segment/zap/docvalues.go +++ b/index/scorch/segment/zap/docvalues.go @@ -99,7 +99,7 @@ func (s *SegmentBase) loadFieldDocValueIterator(field string, func (di *docValueIterator) loadDvChunk(chunkNumber, localDocNum uint64, s *SegmentBase) error { // advance to the chunk where the docValues - // reside for the given docID + // reside for the given docNum destChunkDataLoc := di.dvDataLoc for i := 0; i < int(chunkNumber); i++ { destChunkDataLoc += di.chunkLens[i] @@ -116,7 +116,7 @@ func (di *docValueIterator) loadDvChunk(chunkNumber, offset := uint64(0) di.curChunkHeader = make([]MetaData, int(numDocs)) for i := 0; i < int(numDocs); i++ { - di.curChunkHeader[i].DocID, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) + di.curChunkHeader[i].DocNum, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) offset += uint64(read) di.curChunkHeader[i].DocDvLoc, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) offset += uint64(read) @@ -131,10 +131,10 @@ func (di *docValueIterator) loadDvChunk(chunkNumber, return nil } -func (di *docValueIterator) visitDocValues(docID uint64, +func (di *docValueIterator) visitDocValues(docNum uint64, visitor index.DocumentFieldTermVisitor) error { - // binary search the term locations for the docID - start, length := di.getDocValueLocs(docID) + // binary search the term locations for the docNum + start, length := di.getDocValueLocs(docNum) if start == math.MaxUint64 || length == math.MaxUint64 { return nil } @@ -144,7 +144,7 @@ func (di *docValueIterator) visitDocValues(docID uint64, return err } - // pick the terms for the given docID + // pick the terms for the given docNum uncompressed = uncompressed[start : start+length] for { i := bytes.Index(uncompressed, termSeparatorSplitSlice) @@ -159,11 +159,11 @@ func (di *docValueIterator) visitDocValues(docID uint64, return nil } -func (di *docValueIterator) getDocValueLocs(docID uint64) (uint64, uint64) { +func (di *docValueIterator) getDocValueLocs(docNum uint64) (uint64, uint64) { i := sort.Search(len(di.curChunkHeader), func(i int) bool { - return di.curChunkHeader[i].DocID >= docID + return di.curChunkHeader[i].DocNum >= docNum }) - if i < len(di.curChunkHeader) && di.curChunkHeader[i].DocID == docID { + if i < len(di.curChunkHeader) && di.curChunkHeader[i].DocNum == docNum { return di.curChunkHeader[i].DocDvLoc, di.curChunkHeader[i].DocDvLen } return math.MaxUint64, math.MaxUint64