diff --git a/cmd/bleve/cmd/zap/docvalue.go b/cmd/bleve/cmd/zap/docvalue.go index 165829fd..74397495 100644 --- a/cmd/bleve/cmd/zap/docvalue.go +++ b/cmd/bleve/cmd/zap/docvalue.go @@ -165,7 +165,7 @@ var docvalueCmd = &cobra.Command{ /* TODO => dump all chunk headers?? if len(args) == 3 && args[2] == ">" { - dumpChunkDocIDs(data, ) + dumpChunkDocNums(data, ) }*/ } @@ -187,7 +187,7 @@ var docvalueCmd = &cobra.Command{ docInChunk := uint64(localDocNum) / uint64(segment.ChunkFactor()) if numChunks < docInChunk { - return fmt.Errorf("no chunk exists for chunk number: %d for docID: %d", docInChunk, localDocNum) + return fmt.Errorf("no chunk exists for chunk number: %d for localDocNum: %d", docInChunk, localDocNum) } destChunkDataLoc := fieldDvLoc + offset @@ -207,7 +207,7 @@ var docvalueCmd = &cobra.Command{ offset = uint64(0) curChunkHeader := make([]zap.MetaData, int(numDocs)) for i := 0; i < int(numDocs); i++ { - curChunkHeader[i].DocID, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) + curChunkHeader[i].DocNum, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) offset += uint64(nread) curChunkHeader[i].DocDvLoc, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) offset += uint64(nread) @@ -221,8 +221,8 @@ var docvalueCmd = &cobra.Command{ start, length := getDocValueLocs(uint64(localDocNum), curChunkHeader) if start == math.MaxUint64 || length == math.MaxUint64 { - fmt.Printf("no field values found for docID %d\n", localDocNum) - fmt.Printf("Try docIDs present in chunk: %s\n", assortDocID(curChunkHeader)) + fmt.Printf("no field values found for localDocNum: %d\n", localDocNum) + fmt.Printf("Try docNums present in chunk: %s\n", metaDataDocNums(curChunkHeader)) return nil } // uncompress the already loaded data @@ -234,7 +234,7 @@ var docvalueCmd = &cobra.Command{ var termSeparator byte = 0xff var termSeparatorSplitSlice = []byte{termSeparator} - // pick the terms for the given docID + // pick the terms for the given docNum uncompressed = uncompressed[start : start+length] for { i := bytes.Index(uncompressed, termSeparatorSplitSlice) @@ -250,23 +250,22 @@ var docvalueCmd = &cobra.Command{ }, } -func getDocValueLocs(docID uint64, metaHeader []zap.MetaData) (uint64, uint64) { +func getDocValueLocs(docNum uint64, metaHeader []zap.MetaData) (uint64, uint64) { i := sort.Search(len(metaHeader), func(i int) bool { - return metaHeader[i].DocID >= docID + return metaHeader[i].DocNum >= docNum }) - if i < len(metaHeader) && metaHeader[i].DocID == docID { + if i < len(metaHeader) && metaHeader[i].DocNum == docNum { return metaHeader[i].DocDvLoc, metaHeader[i].DocDvLen } return math.MaxUint64, math.MaxUint64 } -func assortDocID(metaHeader []zap.MetaData) string { - docIDs := "" +func metaDataDocNums(metaHeader []zap.MetaData) string { + docNums := "" for _, meta := range metaHeader { - id := fmt.Sprintf("%d", meta.DocID) - docIDs += id + ", " + docNums += fmt.Sprintf("%d", meta.DocNum) + ", " } - return docIDs + return docNums } func init() { diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 31107765..08fffa25 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -310,17 +310,21 @@ func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string, introduction.persisted = make(chan error, 1) } - // get read lock, to optimistically prepare obsoleted info + // optimistically prepare obsoletes outside of rootLock s.rootLock.RLock() - for _, seg := range s.root.segment { + root := s.root + root.AddRef() + s.rootLock.RUnlock() + + for _, seg := range root.segment { delta, err := seg.segment.DocNumbers(ids) if err != nil { - s.rootLock.RUnlock() return err } introduction.obsoletes[seg.id] = delta } - s.rootLock.RUnlock() + + _ = root.DecRef() s.introductions <- introduction diff --git a/index/scorch/segment/zap/build.go b/index/scorch/segment/zap/build.go index 58f9faea..60d168e6 100644 --- a/index/scorch/segment/zap/build.go +++ b/index/scorch/segment/zap/build.go @@ -187,79 +187,42 @@ func persistBase(memSegment *mem.Segment, cr *CountHashWriter, chunkFactor uint3 } func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) { - var curr int var metaBuf bytes.Buffer var data, compressed []byte + metaEncoder := govarint.NewU64Base128Encoder(&metaBuf) + docNumOffsets := make(map[int]uint64, len(memSegment.Stored)) for docNum, storedValues := range memSegment.Stored { if docNum != 0 { // reset buffer if necessary + curr = 0 metaBuf.Reset() data = data[:0] compressed = compressed[:0] - curr = 0 } - metaEncoder := govarint.NewU64Base128Encoder(&metaBuf) - st := memSegment.StoredTypes[docNum] sp := memSegment.StoredPos[docNum] // encode fields in order for fieldID := range memSegment.FieldsInv { if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok { - // has stored values for this field - num := len(storedFieldValues) - stf := st[uint16(fieldID)] spf := sp[uint16(fieldID)] - // process each value - for i := 0; i < num; i++ { - // encode field - _, err2 := metaEncoder.PutU64(uint64(fieldID)) - if err2 != nil { - return 0, err2 - } - // encode type - _, err2 = metaEncoder.PutU64(uint64(stf[i])) - if err2 != nil { - return 0, err2 - } - // encode start offset - _, err2 = metaEncoder.PutU64(uint64(curr)) - if err2 != nil { - return 0, err2 - } - // end len - _, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) - if err2 != nil { - return 0, err2 - } - // encode number of array pos - _, err2 = metaEncoder.PutU64(uint64(len(spf[i]))) - if err2 != nil { - return 0, err2 - } - // encode all array positions - for _, pos := range spf[i] { - _, err2 = metaEncoder.PutU64(pos) - if err2 != nil { - return 0, err2 - } - } - // append data - data = append(data, storedFieldValues[i]...) - // update curr - curr += len(storedFieldValues[i]) + var err2 error + curr, data, err2 = persistStoredFieldValues(fieldID, + storedFieldValues, stf, spf, curr, metaEncoder, data) + if err2 != nil { + return 0, err2 } } } - metaEncoder.Close() + metaEncoder.Close() metaBytes := metaBuf.Bytes() // compress the data @@ -299,6 +262,51 @@ func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) return rv, nil } +func persistStoredFieldValues(fieldID int, + storedFieldValues [][]byte, stf []byte, spf [][]uint64, + curr int, metaEncoder *govarint.Base128Encoder, data []byte) ( + int, []byte, error) { + for i := 0; i < len(storedFieldValues); i++ { + // encode field + _, err := metaEncoder.PutU64(uint64(fieldID)) + if err != nil { + return 0, nil, err + } + // encode type + _, err = metaEncoder.PutU64(uint64(stf[i])) + if err != nil { + return 0, nil, err + } + // encode start offset + _, err = metaEncoder.PutU64(uint64(curr)) + if err != nil { + return 0, nil, err + } + // end len + _, err = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) + if err != nil { + return 0, nil, err + } + // encode number of array pos + _, err = metaEncoder.PutU64(uint64(len(spf[i]))) + if err != nil { + return 0, nil, err + } + // encode all array positions + for _, pos := range spf[i] { + _, err = metaEncoder.PutU64(pos) + if err != nil { + return 0, nil, err + } + } + + data = append(data, storedFieldValues[i]...) + curr += len(storedFieldValues[i]) + } + + return curr, data, nil +} + func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) { var freqOffsets, locOfffsets []uint64 tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1)) @@ -580,7 +588,7 @@ func persistDocValues(memSegment *mem.Segment, w *CountHashWriter, if err != nil { return nil, err } - // resetting encoder for the next field + // reseting encoder for the next field fdvEncoder.Reset() } diff --git a/index/scorch/segment/zap/contentcoder.go b/index/scorch/segment/zap/contentcoder.go index b0394049..83457146 100644 --- a/index/scorch/segment/zap/contentcoder.go +++ b/index/scorch/segment/zap/contentcoder.go @@ -39,7 +39,7 @@ type chunkedContentCoder struct { // MetaData represents the data information inside a // chunk. type MetaData struct { - DocID uint64 // docid of the data inside the chunk + DocNum uint64 // docNum of the data inside the chunk DocDvLoc uint64 // starting offset for a given docid DocDvLen uint64 // length of data inside the chunk for the given docid } @@ -52,7 +52,7 @@ func newChunkedContentCoder(chunkSize uint64, rv := &chunkedContentCoder{ chunkSize: chunkSize, chunkLens: make([]uint64, total), - chunkMeta: []MetaData{}, + chunkMeta: make([]MetaData, 0, total), } return rv @@ -68,7 +68,7 @@ func (c *chunkedContentCoder) Reset() { for i := range c.chunkLens { c.chunkLens[i] = 0 } - c.chunkMeta = []MetaData{} + c.chunkMeta = c.chunkMeta[:0] } // Close indicates you are done calling Add() this allows @@ -88,7 +88,7 @@ func (c *chunkedContentCoder) flushContents() error { // write out the metaData slice for _, meta := range c.chunkMeta { - _, err := writeUvarints(&c.chunkMetaBuf, meta.DocID, meta.DocDvLoc, meta.DocDvLen) + _, err := writeUvarints(&c.chunkMetaBuf, meta.DocNum, meta.DocDvLoc, meta.DocDvLen) if err != nil { return err } @@ -118,7 +118,7 @@ func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error { // clearing the chunk specific meta for next chunk c.chunkBuf.Reset() c.chunkMetaBuf.Reset() - c.chunkMeta = []MetaData{} + c.chunkMeta = c.chunkMeta[:0] c.currChunk = chunk } @@ -130,7 +130,7 @@ func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error { } c.chunkMeta = append(c.chunkMeta, MetaData{ - DocID: docNum, + DocNum: docNum, DocDvLoc: uint64(dvOffset), DocDvLen: uint64(dvSize), }) diff --git a/index/scorch/segment/zap/docvalues.go b/index/scorch/segment/zap/docvalues.go index fb5b348a..0514bd30 100644 --- a/index/scorch/segment/zap/docvalues.go +++ b/index/scorch/segment/zap/docvalues.go @@ -99,7 +99,7 @@ func (s *SegmentBase) loadFieldDocValueIterator(field string, func (di *docValueIterator) loadDvChunk(chunkNumber, localDocNum uint64, s *SegmentBase) error { // advance to the chunk where the docValues - // reside for the given docID + // reside for the given docNum destChunkDataLoc := di.dvDataLoc for i := 0; i < int(chunkNumber); i++ { destChunkDataLoc += di.chunkLens[i] @@ -116,7 +116,7 @@ func (di *docValueIterator) loadDvChunk(chunkNumber, offset := uint64(0) di.curChunkHeader = make([]MetaData, int(numDocs)) for i := 0; i < int(numDocs); i++ { - di.curChunkHeader[i].DocID, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) + di.curChunkHeader[i].DocNum, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) offset += uint64(read) di.curChunkHeader[i].DocDvLoc, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) offset += uint64(read) @@ -131,10 +131,10 @@ func (di *docValueIterator) loadDvChunk(chunkNumber, return nil } -func (di *docValueIterator) visitDocValues(docID uint64, +func (di *docValueIterator) visitDocValues(docNum uint64, visitor index.DocumentFieldTermVisitor) error { - // binary search the term locations for the docID - start, length := di.getDocValueLocs(docID) + // binary search the term locations for the docNum + start, length := di.getDocValueLocs(docNum) if start == math.MaxUint64 || length == math.MaxUint64 { return nil } @@ -144,7 +144,7 @@ func (di *docValueIterator) visitDocValues(docID uint64, return err } - // pick the terms for the given docID + // pick the terms for the given docNum uncompressed = uncompressed[start : start+length] for { i := bytes.Index(uncompressed, termSeparatorSplitSlice) @@ -159,11 +159,11 @@ func (di *docValueIterator) visitDocValues(docID uint64, return nil } -func (di *docValueIterator) getDocValueLocs(docID uint64) (uint64, uint64) { +func (di *docValueIterator) getDocValueLocs(docNum uint64) (uint64, uint64) { i := sort.Search(len(di.curChunkHeader), func(i int) bool { - return di.curChunkHeader[i].DocID >= docID + return di.curChunkHeader[i].DocNum >= docNum }) - if i < len(di.curChunkHeader) && di.curChunkHeader[i].DocID == docID { + if i < len(di.curChunkHeader) && di.curChunkHeader[i].DocNum == docNum { return di.curChunkHeader[i].DocDvLoc, di.curChunkHeader[i].DocDvLen } return math.MaxUint64, math.MaxUint64 diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index cc348d72..327446c5 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -52,41 +52,15 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string, // wrap it for counting (tracking offsets) cr := NewCountHashWriter(br) - fieldsInv := mergeFields(segments) - fieldsMap := mapFields(fieldsInv) - - var newDocNums [][]uint64 - var storedIndexOffset uint64 - fieldDvLocsOffset := uint64(fieldNotUninverted) - var dictLocs []uint64 - - newSegDocCount := computeNewDocCount(segments, drops) - if newSegDocCount > 0 { - storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, - fieldsMap, fieldsInv, newSegDocCount, cr) - if err != nil { - cleanup() - return nil, err - } - - dictLocs, fieldDvLocsOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap, - newDocNums, newSegDocCount, chunkFactor, cr) - if err != nil { - cleanup() - return nil, err - } - } else { - dictLocs = make([]uint64, len(fieldsInv)) - } - - fieldsIndexOffset, err := persistFields(fieldsInv, cr, dictLocs) + newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, err := + mergeToWriter(segments, drops, chunkFactor, cr) if err != nil { cleanup() return nil, err } - err = persistFooter(newSegDocCount, storedIndexOffset, - fieldsIndexOffset, fieldDvLocsOffset, chunkFactor, cr.Sum32(), cr) + err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, + docValueOffset, chunkFactor, cr.Sum32(), cr) if err != nil { cleanup() return nil, err @@ -113,6 +87,43 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string, return newDocNums, nil } +func mergeToWriter(segments []*Segment, drops []*roaring.Bitmap, + chunkFactor uint32, cr *CountHashWriter) ( + newDocNums [][]uint64, + numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64, + err error) { + docValueOffset = uint64(fieldNotUninverted) + + var dictLocs []uint64 + + fieldsInv := mergeFields(segments) + fieldsMap := mapFields(fieldsInv) + + numDocs = computeNewDocCount(segments, drops) + if numDocs > 0 { + storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, + fieldsMap, fieldsInv, numDocs, cr) + if err != nil { + return nil, 0, 0, 0, 0, err + } + + dictLocs, docValueOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap, + newDocNums, numDocs, chunkFactor, cr) + if err != nil { + return nil, 0, 0, 0, 0, err + } + } else { + dictLocs = make([]uint64, len(fieldsInv)) + } + + fieldsIndexOffset, err = persistFields(fieldsInv, cr, dictLocs) + if err != nil { + return nil, 0, 0, 0, 0, err + } + + return newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, nil +} + // mapFields takes the fieldsInv list and builds the map func mapFields(fields []string) map[string]uint16 { rv := make(map[string]uint16, len(fields)) @@ -453,47 +464,14 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, for fieldID := range fieldsInv { storedFieldValues := vals[int(fieldID)] - // has stored values for this field - num := len(storedFieldValues) + stf := typs[int(fieldID)] + spf := poss[int(fieldID)] - // process each value - for i := 0; i < num; i++ { - // encode field - _, err2 := metaEncoder.PutU64(uint64(fieldID)) - if err2 != nil { - return 0, nil, err2 - } - // encode type - _, err2 = metaEncoder.PutU64(uint64(typs[int(fieldID)][i])) - if err2 != nil { - return 0, nil, err2 - } - // encode start offset - _, err2 = metaEncoder.PutU64(uint64(curr)) - if err2 != nil { - return 0, nil, err2 - } - // end len - _, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) - if err2 != nil { - return 0, nil, err2 - } - // encode number of array pos - _, err2 = metaEncoder.PutU64(uint64(len(poss[int(fieldID)][i]))) - if err2 != nil { - return 0, nil, err2 - } - // encode all array positions - for j := 0; j < len(poss[int(fieldID)][i]); j++ { - _, err2 = metaEncoder.PutU64(poss[int(fieldID)][i][j]) - if err2 != nil { - return 0, nil, err2 - } - } - // append data - data = append(data, storedFieldValues[i]...) - // update curr - curr += len(storedFieldValues[i]) + var err2 error + curr, data, err2 = persistStoredFieldValues(fieldID, + storedFieldValues, stf, spf, curr, metaEncoder, data) + if err2 != nil { + return 0, nil, err2 } }