From fa5de8e09aae8ac706182333a2d9488111663936 Mon Sep 17 00:00:00 2001 From: Sreekanth Sivasankaran Date: Tue, 6 Mar 2018 16:22:11 +0530 Subject: [PATCH 01/20] making NumSnapshotsToKeep configurable --- index/scorch/persister.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/index/scorch/persister.go b/index/scorch/persister.go index f1a372e7..cab2d035 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -633,14 +633,19 @@ func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) { return 0, err } - if len(persistedEpochs) <= NumSnapshotsToKeep { + numSnapshotsToKeep := NumSnapshotsToKeep + if val, ok := s.config["numSnapshotsToKeep"].(float64); ok && val > 0 { + numSnapshotsToKeep = int(val) + } + + if len(persistedEpochs) <= numSnapshotsToKeep { // we need to keep everything return 0, nil } // make a map of epochs to protect from deletion - protectedEpochs := make(map[uint64]struct{}, NumSnapshotsToKeep) - for _, epoch := range persistedEpochs[0:NumSnapshotsToKeep] { + protectedEpochs := make(map[uint64]struct{}, numSnapshotsToKeep) + for _, epoch := range persistedEpochs[0:numSnapshotsToKeep] { protectedEpochs[epoch] = struct{}{} } From b04909d3ee67e964e3decb4db745ce63106a7280 Mon Sep 17 00:00:00 2001 From: Sreekanth Sivasankaran Date: Wed, 7 Mar 2018 13:12:06 +0530 Subject: [PATCH 02/20] adding the integer parser utility --- index/scorch/persister.go | 11 +++------- index/scorch/scorch.go | 43 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 8 deletions(-) diff --git a/index/scorch/persister.go b/index/scorch/persister.go index cab2d035..ccb0c1f2 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -633,19 +633,14 @@ func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) { return 0, err } - numSnapshotsToKeep := NumSnapshotsToKeep - if val, ok := s.config["numSnapshotsToKeep"].(float64); ok && val > 0 { - numSnapshotsToKeep = int(val) - } - - if len(persistedEpochs) <= numSnapshotsToKeep { + if len(persistedEpochs) <= s.numSnapshotsToKeep { // we need to keep everything return 0, nil } // make a map of epochs to protect from deletion - protectedEpochs := make(map[uint64]struct{}, numSnapshotsToKeep) - for _, epoch := range persistedEpochs[0:numSnapshotsToKeep] { + protectedEpochs := make(map[uint64]struct{}, s.numSnapshotsToKeep) + for _, epoch := range persistedEpochs[0:s.numSnapshotsToKeep] { protectedEpochs[epoch] = struct{}{} } diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 87372a32..7a33fb7f 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -58,6 +58,7 @@ type Scorch struct { nextSnapshotEpoch uint64 eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC. ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet. + numSnapshotsToKeep int closeCh chan struct{} introductions chan *segmentIntroduction @@ -191,6 +192,17 @@ func (s *Scorch) openBolt() error { } } + s.numSnapshotsToKeep = NumSnapshotsToKeep + if v, ok := s.config["numSnapshotsToKeep"]; ok { + var t int + if t, err = parseToInteger(v); err != nil { + return fmt.Errorf("numSnapshotsToKeep parse err: %v", err) + } + if t > 0 { + s.numSnapshotsToKeep = t + } + } + return nil } @@ -503,3 +515,34 @@ func (s *Scorch) unmarkIneligibleForRemoval(filename string) { func init() { registry.RegisterIndexType(Name, NewScorch) } + +func parseToInteger(v interface{}) (int, error) { + switch v.(type) { + case float32: + return int(v.(float32)), nil + case float64: + return int(v.(float64)), nil + case int: + return v.(int), nil + case int8: + return int(v.(int8)), nil + case int16: + return int(v.(int16)), nil + case int32: + return int(v.(int32)), nil + case int64: + return int(v.(int64)), nil + case uint: + return int(v.(uint)), nil + case uint8: + return int(v.(uint8)), nil + case uint16: + return int(v.(uint16)), nil + case uint32: + return int(v.(uint32)), nil + case uint64: + return int(v.(uint64)), nil + default: + return 0, fmt.Errorf("expects a numeric value") + } +} From d6522e7e17812bcb6795751abb1cad7acd7af6b1 Mon Sep 17 00:00:00 2001 From: Sreekanth Sivasankaran Date: Fri, 9 Mar 2018 16:01:37 +0530 Subject: [PATCH 03/20] minor optimisation to loadChunk method --- index/scorch/segment/zap/contentcoder.go | 7 +- index/scorch/segment/zap/docvalues.go | 23 ++- index/scorch/segment/zap/intcoder.go | 46 ++++- index/scorch/segment/zap/intcoder_test.go | 212 ++++++++++++++++++++++ index/scorch/segment/zap/posting.go | 44 ++--- 5 files changed, 294 insertions(+), 38 deletions(-) diff --git a/index/scorch/segment/zap/contentcoder.go b/index/scorch/segment/zap/contentcoder.go index 933f10a1..c731f52c 100644 --- a/index/scorch/segment/zap/contentcoder.go +++ b/index/scorch/segment/zap/contentcoder.go @@ -156,7 +156,12 @@ func (c *chunkedContentCoder) Write(w io.Writer) (int, error) { if err != nil { return tw, err } - // write out the chunk lens + + if len(c.chunkLens) > 1 { + chunkLengthsToOffsets(c.chunkLens) + } + + // write out the chunk starting offsets for _, chunkLen := range c.chunkLens { n := binary.PutUvarint(buf, uint64(chunkLen)) nw, err = w.Write(buf[:n]) diff --git a/index/scorch/segment/zap/docvalues.go b/index/scorch/segment/zap/docvalues.go index 13635c57..882ff43d 100644 --- a/index/scorch/segment/zap/docvalues.go +++ b/index/scorch/segment/zap/docvalues.go @@ -38,7 +38,7 @@ type docValueIterator struct { field string curChunkNum uint64 numChunks uint64 - chunkLens []uint64 + chunkOffsets []uint64 dvDataLoc uint64 curChunkHeader []MetaData curChunkData []byte // compressed data cache @@ -47,7 +47,7 @@ type docValueIterator struct { func (di *docValueIterator) size() int { return reflectStaticSizedocValueIterator + size.SizeOfPtr + len(di.field) + - len(di.chunkLens)*size.SizeOfUint64 + + len(di.chunkOffsets)*size.SizeOfUint64 + len(di.curChunkHeader)*reflectStaticSizeMetaData + len(di.curChunkData) } @@ -78,16 +78,16 @@ func (s *SegmentBase) loadFieldDocValueIterator(field string, offset += uint64(read) fdvIter := &docValueIterator{ - curChunkNum: math.MaxUint64, - field: field, - chunkLens: make([]uint64, int(numChunks)), + curChunkNum: math.MaxUint64, + field: field, + chunkOffsets: make([]uint64, int(numChunks)), } for i := 0; i < int(numChunks); i++ { clen, read = binary.Uvarint(s.mem[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64]) if read <= 0 { return nil, fmt.Errorf("corrupted chunk length during segment load") } - fdvIter.chunkLens[i] = clen + fdvIter.chunkOffsets[i] = clen offset += uint64(read) } @@ -99,12 +99,11 @@ func (di *docValueIterator) loadDvChunk(chunkNumber, localDocNum uint64, s *SegmentBase) error { // advance to the chunk where the docValues // reside for the given docNum - destChunkDataLoc := di.dvDataLoc - for i := 0; i < int(chunkNumber); i++ { - destChunkDataLoc += di.chunkLens[i] - } + destChunkDataLoc, curChunkEnd := di.dvDataLoc, di.dvDataLoc + start, end := readChunkBoundary(int(chunkNumber), di.chunkOffsets) + destChunkDataLoc += start + curChunkEnd += end - curChunkSize := di.chunkLens[chunkNumber] // read the number of docs reside in the chunk numDocs, read := binary.Uvarint(s.mem[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64]) if read <= 0 { @@ -124,7 +123,7 @@ func (di *docValueIterator) loadDvChunk(chunkNumber, } compressedDataLoc := chunkMetaLoc + offset - dataLength := destChunkDataLoc + curChunkSize - compressedDataLoc + dataLength := curChunkEnd - compressedDataLoc di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength] di.curChunkNum = chunkNumber return nil diff --git a/index/scorch/segment/zap/intcoder.go b/index/scorch/segment/zap/intcoder.go index 6680e608..79fe5156 100644 --- a/index/scorch/segment/zap/intcoder.go +++ b/index/scorch/segment/zap/intcoder.go @@ -111,7 +111,12 @@ func (c *chunkedIntCoder) Write(w io.Writer) (int, error) { } buf := c.buf - // write out the number of chunks & each chunkLen + // convert the chunk lengths into starting chunk offsets + if len(c.chunkLens) > 1 { + chunkLengthsToOffsets(c.chunkLens) + } + + // write out the number of chunks & each chunk starting offsets n := binary.PutUvarint(buf, uint64(len(c.chunkLens))) for _, chunkLen := range c.chunkLens { n += binary.PutUvarint(buf[n:], uint64(chunkLen)) @@ -134,3 +139,42 @@ func (c *chunkedIntCoder) Write(w io.Writer) (int, error) { func (c *chunkedIntCoder) FinalSize() int { return len(c.final) } + +// chunkLengthsToOffsets converts the chunk length array +// to a chunk starting offset array. The readChunkBoundary +// will figure out the start and end of every chunk from +// these offsets. The starting offset of the first/single +// array element will always be zero and this position is +// used for storing the size of the current last item in +// the array at any given point. +// For eg: +// Lens -> 5 5 5 5 => 5 5 10 15 +// Lens -> 0 5 0 5 => 5 0 5 5 +// Lens -> 0 0 0 5 => 5 0 0 0 +// Lens -> 5 0 0 0 => 0 5 5 5 +// Lens -> 0 5 0 0 => 0 0 5 5 +// Lens -> 0 0 5 0 => 0 0 0 5 +func chunkLengthsToOffsets(lengths []uint64) { + lengths[1], lengths[0] = lengths[0], lengths[1] + for i := 2; i < len(lengths); i++ { + cur := lengths[i] + lengths[i] = lengths[i-1] + lengths[0] + lengths[0] = cur + } +} + +func readChunkBoundary(chunk int, offsets []uint64) (uint64, uint64) { + var start, end uint64 + if chunk > 0 { + start = offsets[chunk] + } + // single element case + if chunk == 0 && len(offsets) == 1 { + end = offsets[chunk] + } else if chunk < len(offsets)-1 { + end = offsets[chunk+1] + } else { // for last element + end = start + offsets[0] + } + return start, end +} diff --git a/index/scorch/segment/zap/intcoder_test.go b/index/scorch/segment/zap/intcoder_test.go index 85d2c5a7..8c77eab6 100644 --- a/index/scorch/segment/zap/intcoder_test.go +++ b/index/scorch/segment/zap/intcoder_test.go @@ -71,3 +71,215 @@ func TestChunkIntCoder(t *testing.T) { } } } + +func TestChunkLengthToOffsets(t *testing.T) { + + tests := []struct { + lengths []uint64 + expectedOffsets []uint64 + }{ + { + lengths: []uint64{5, 5, 5, 5, 5}, + expectedOffsets: []uint64{5, 5, 10, 15, 20}, + }, + { + lengths: []uint64{0, 5, 0, 5, 0}, + expectedOffsets: []uint64{0, 0, 5, 5, 10}, + }, + { + lengths: []uint64{0, 0, 0, 0, 5}, + expectedOffsets: []uint64{5, 0, 0, 0, 0}, + }, + { + lengths: []uint64{5, 0, 0, 0, 0}, + expectedOffsets: []uint64{0, 5, 5, 5, 5}, + }, + { + lengths: []uint64{0, 5, 0, 0, 0}, + expectedOffsets: []uint64{0, 0, 5, 5, 5}, + }, + { + lengths: []uint64{0, 0, 0, 5, 0}, + expectedOffsets: []uint64{0, 0, 0, 0, 5}, + }, + { + lengths: []uint64{0, 0, 0, 5, 5}, + expectedOffsets: []uint64{5, 0, 0, 0, 5}, + }, + { + lengths: []uint64{5, 5, 5, 0, 0}, + expectedOffsets: []uint64{0, 5, 10, 15, 15}, + }, + } + + for i, test := range tests { + chunkLengthsToOffsets(test.lengths) + if !reflect.DeepEqual(test.expectedOffsets, test.lengths) { + t.Errorf("Test: %d failed, got %+v, expected %+v", i, test.lengths, test.expectedOffsets) + } + } +} + +func TestChunkReadBoundaryFromOffsets(t *testing.T) { + + tests := []struct { + chunkNumber int + offsets []uint64 + expectedStart uint64 + expectedEnd uint64 + }{ + { + offsets: []uint64{5, 5, 10, 15, 20}, + chunkNumber: 4, + expectedStart: 20, + expectedEnd: 25, + }, + { + offsets: []uint64{5, 5, 10, 15, 20}, + chunkNumber: 0, + expectedStart: 0, + expectedEnd: 5, + }, + { + offsets: []uint64{5, 5, 10, 15, 20}, + chunkNumber: 2, + expectedStart: 10, + expectedEnd: 15, + }, + { + offsets: []uint64{0, 0, 5, 5, 10}, + chunkNumber: 4, + expectedStart: 10, + expectedEnd: 10, + }, + { + offsets: []uint64{0, 0, 5, 5, 10}, + chunkNumber: 1, + expectedStart: 0, + expectedEnd: 5, + }, + { + offsets: []uint64{5, 0, 0, 0, 0}, + chunkNumber: 0, + expectedStart: 0, + expectedEnd: 0, + }, + { + offsets: []uint64{5, 0, 0, 0, 0}, + chunkNumber: 4, + expectedStart: 0, + expectedEnd: 5, + }, + { + offsets: []uint64{5, 0, 0, 0, 0}, + chunkNumber: 1, + expectedStart: 0, + expectedEnd: 0, + }, + { + offsets: []uint64{0, 5, 5, 5, 5}, + chunkNumber: 1, + expectedStart: 5, + expectedEnd: 5, + }, + { + offsets: []uint64{0, 5, 5, 5, 5}, + chunkNumber: 0, + expectedStart: 0, + expectedEnd: 5, + }, + { + offsets: []uint64{0, 0, 5, 5, 5}, + chunkNumber: 2, + expectedStart: 5, + expectedEnd: 5, + }, + { + offsets: []uint64{0, 0, 5, 5, 5}, + chunkNumber: 1, + expectedStart: 0, + expectedEnd: 5, + }, + { + offsets: []uint64{0, 0, 0, 0, 5}, + chunkNumber: 4, + expectedStart: 5, + expectedEnd: 5, + }, + { + offsets: []uint64{0, 0, 0, 0, 5}, + chunkNumber: 3, + expectedStart: 0, + expectedEnd: 5, + }, + { + offsets: []uint64{0, 0, 0, 0, 5}, + chunkNumber: 2, + expectedStart: 0, + expectedEnd: 0, + }, + { + offsets: []uint64{5, 0, 0, 0, 5}, + chunkNumber: 0, + expectedStart: 0, + expectedEnd: 0, + }, + { + offsets: []uint64{5, 0, 0, 0, 5}, + chunkNumber: 1, + expectedStart: 0, + expectedEnd: 0, + }, + { + offsets: []uint64{5, 0, 0, 0, 5}, + chunkNumber: 3, + expectedStart: 0, + expectedEnd: 5, + }, + { + offsets: []uint64{5, 0, 0, 0, 5}, + chunkNumber: 4, + expectedStart: 5, + expectedEnd: 10, + }, + { + offsets: []uint64{0, 5, 10, 15, 15}, + chunkNumber: 0, + expectedStart: 0, + expectedEnd: 5, + }, + { + offsets: []uint64{0, 5, 10, 15, 15}, + chunkNumber: 1, + expectedStart: 5, + expectedEnd: 10, + }, + { + offsets: []uint64{0, 5, 10, 15, 15}, + chunkNumber: 2, + expectedStart: 10, + expectedEnd: 15, + }, + { + offsets: []uint64{0, 5, 10, 15, 15}, + chunkNumber: 3, + expectedStart: 15, + expectedEnd: 15, + }, + { + offsets: []uint64{0, 5, 10, 15, 15}, + chunkNumber: 4, + expectedStart: 15, + expectedEnd: 15, + }, + } + + for i, test := range tests { + s, e := readChunkBoundary(test.chunkNumber, test.offsets) + if test.expectedStart != s || test.expectedEnd != e { + t.Errorf("Test: %d failed for chunkNumber: %d got start: %d end: %d,"+ + " expected start: %d end: %d", i, test.chunkNumber, s, e, + test.expectedStart, test.expectedEnd) + } + } +} diff --git a/index/scorch/segment/zap/posting.go b/index/scorch/segment/zap/posting.go index 7ae36120..c47648cd 100644 --- a/index/scorch/segment/zap/posting.go +++ b/index/scorch/segment/zap/posting.go @@ -163,9 +163,9 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator { var numFreqChunks uint64 numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) n += uint64(read) - rv.freqChunkLens = make([]uint64, int(numFreqChunks)) + rv.freqChunkOffsets = make([]uint64, int(numFreqChunks)) for i := 0; i < int(numFreqChunks); i++ { - rv.freqChunkLens[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) + rv.freqChunkOffsets[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) n += uint64(read) } rv.freqChunkStart = p.freqOffset + n @@ -175,9 +175,9 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator { var numLocChunks uint64 numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) n += uint64(read) - rv.locChunkLens = make([]uint64, int(numLocChunks)) + rv.locChunkOffsets = make([]uint64, int(numLocChunks)) for i := 0; i < int(numLocChunks); i++ { - rv.locChunkLens[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) + rv.locChunkOffsets[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) n += uint64(read) } rv.locChunkStart = p.locOffset + n @@ -297,11 +297,11 @@ type PostingsIterator struct { locDecoder *govarint.Base128Decoder locReader *bytes.Reader - freqChunkLens []uint64 - freqChunkStart uint64 + freqChunkOffsets []uint64 + freqChunkStart uint64 - locChunkLens []uint64 - locChunkStart uint64 + locChunkOffsets []uint64 + locChunkStart uint64 locBitmap *roaring.Bitmap @@ -317,8 +317,8 @@ func (i *PostingsIterator) Size() int { sizeInBytes := reflectStaticSizePostingsIterator + size.SizeOfPtr + len(i.currChunkFreqNorm) + len(i.currChunkLoc) + - len(i.freqChunkLens)*size.SizeOfUint64 + - len(i.locChunkLens)*size.SizeOfUint64 + + len(i.freqChunkOffsets)*size.SizeOfUint64 + + len(i.locChunkOffsets)*size.SizeOfUint64 + i.next.Size() if i.locBitmap != nil { @@ -333,16 +333,14 @@ func (i *PostingsIterator) Size() int { } func (i *PostingsIterator) loadChunk(chunk int) error { - if chunk >= len(i.freqChunkLens) || chunk >= len(i.locChunkLens) { - return fmt.Errorf("tried to load chunk that doesn't exist %d/(%d %d)", chunk, len(i.freqChunkLens), len(i.locChunkLens)) + if chunk >= len(i.freqChunkOffsets) || chunk >= len(i.locChunkOffsets) { + return fmt.Errorf("tried to load chunk that doesn't exist %d/(%d %d)", chunk, len(i.freqChunkOffsets), len(i.locChunkOffsets)) } - // load freq chunk bytes - start := i.freqChunkStart - for j := 0; j < chunk; j++ { - start += i.freqChunkLens[j] - } - end := start + i.freqChunkLens[chunk] + end, start := i.freqChunkStart, i.freqChunkStart + s, e := readChunkBoundary(chunk, i.freqChunkOffsets) + start += s + end += e i.currChunkFreqNorm = i.postings.sb.mem[start:end] if i.freqNormReader == nil { i.freqNormReader = bytes.NewReader(i.currChunkFreqNorm) @@ -351,12 +349,10 @@ func (i *PostingsIterator) loadChunk(chunk int) error { i.freqNormReader.Reset(i.currChunkFreqNorm) } - // load loc chunk bytes - start = i.locChunkStart - for j := 0; j < chunk; j++ { - start += i.locChunkLens[j] - } - end = start + i.locChunkLens[chunk] + end, start = i.locChunkStart, i.locChunkStart + s, e = readChunkBoundary(chunk, i.locChunkOffsets) + start += s + end += e i.currChunkLoc = i.postings.sb.mem[start:end] if i.locReader == nil { i.locReader = bytes.NewReader(i.currChunkLoc) From 3884cf4d1216457fe46fbe617cf4e73c3a516250 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Thu, 8 Mar 2018 21:36:19 -0800 Subject: [PATCH 04/20] scorch zap writePostings() helper func refactored out --- index/scorch/segment/zap/merge.go | 120 ++++++++++++++++++------------ 1 file changed, 71 insertions(+), 49 deletions(-) diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index 383fedbf..a934dfc3 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -248,56 +248,15 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, tfEncoder.Close() locEncoder.Close() - termCardinality := newRoaring.GetCardinality() + postingsOffset, err := writePostings( + newRoaring, newRoaringLocs, tfEncoder, locEncoder, + use1HitEncoding, w, bufMaxVarintLen64) + if err != nil { + return err + } - encodeAs1Hit, docNum1Hit, normBits1Hit := use1HitEncoding(termCardinality) - if encodeAs1Hit { - err = newVellum.Insert(term, FSTValEncode1Hit(docNum1Hit, normBits1Hit)) - if err != nil { - return err - } - } else if termCardinality > 0 { - // this field/term has hits in the new segment - freqOffset := uint64(w.Count()) - _, err := tfEncoder.Write(w) - if err != nil { - return err - } - locOffset := uint64(w.Count()) - _, err = locEncoder.Write(w) - if err != nil { - return err - } - postingLocOffset := uint64(w.Count()) - _, err = writeRoaringWithLen(newRoaringLocs, w, bufMaxVarintLen64) - if err != nil { - return err - } - postingOffset := uint64(w.Count()) - // write out the start of the term info - n := binary.PutUvarint(bufMaxVarintLen64, freqOffset) - _, err = w.Write(bufMaxVarintLen64[:n]) - if err != nil { - return err - } - // write out the start of the loc info - n = binary.PutUvarint(bufMaxVarintLen64, locOffset) - _, err = w.Write(bufMaxVarintLen64[:n]) - if err != nil { - return err - } - // write out the start of the posting locs - n = binary.PutUvarint(bufMaxVarintLen64, postingLocOffset) - _, err = w.Write(bufMaxVarintLen64[:n]) - if err != nil { - return err - } - _, err = writeRoaringWithLen(newRoaring, w, bufMaxVarintLen64) - if err != nil { - return err - } - - err = newVellum.Insert(term, postingOffset) + if postingsOffset > 0 { + err = newVellum.Insert(term, postingsOffset) if err != nil { return err } @@ -460,6 +419,69 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, return rv, fieldDvLocsOffset, nil } +func writePostings(postings, postingLocs *roaring.Bitmap, + tfEncoder, locEncoder *chunkedIntCoder, + use1HitEncoding func(uint64) (bool, uint64, uint64), + w *CountHashWriter, bufMaxVarintLen64 []byte) ( + offset uint64, err error) { + termCardinality := postings.GetCardinality() + if termCardinality <= 0 { + return 0, nil + } + + if use1HitEncoding != nil { + encodeAs1Hit, docNum1Hit, normBits1Hit := use1HitEncoding(termCardinality) + if encodeAs1Hit { + return FSTValEncode1Hit(docNum1Hit, normBits1Hit), nil + } + } + + tfOffset := uint64(w.Count()) + _, err = tfEncoder.Write(w) + if err != nil { + return 0, err + } + + locOffset := uint64(w.Count()) + _, err = locEncoder.Write(w) + if err != nil { + return 0, err + } + + postingLocsOffset := uint64(w.Count()) + _, err = writeRoaringWithLen(postingLocs, w, bufMaxVarintLen64) + if err != nil { + return 0, err + } + + postingsOffset := uint64(w.Count()) + + n := binary.PutUvarint(bufMaxVarintLen64, tfOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return 0, err + } + + n = binary.PutUvarint(bufMaxVarintLen64, locOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return 0, err + } + + n = binary.PutUvarint(bufMaxVarintLen64, postingLocsOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return 0, err + } + + _, err = writeRoaringWithLen(postings, w, bufMaxVarintLen64) + if err != nil { + return 0, err + } + + return postingsOffset, nil +} + func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64, w *CountHashWriter) (uint64, [][]uint64, error) { From e82774ad20cef894996733f144a7f8c09c3ae7f1 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Fri, 9 Mar 2018 00:16:28 -0800 Subject: [PATCH 05/20] scorch zap AnalysisResultsToSegmentBase() AnalysisResultsToSegmentBase() allows analysis results to be directly converted into a zap-encoded SegmentBase, which can then be introduced onto the root, avoiding the creation of mem.Segment data structures. This leads to some reduction of garbage memory allocations. The grouping and sorting and shaping of the postings list information is taken from the mem.Segment codepaths. The encoding of stored fields reuses functions from zap's merger, which has the largest savings of garbage memory avoidance. And, the encoding of tf/loc chunks, postings & dictionary information also follows the approach used by zap's merger, which also has some savings of garbage memory avoidance. In future changes, the mem.Segment dependencies will be removed from zap, which should result in a smaller codebase. --- index/scorch/scorch.go | 3 +- index/scorch/segment/zap/new.go | 659 ++++++++++++++++++++++++++++++++ 2 files changed, 660 insertions(+), 2 deletions(-) create mode 100644 index/scorch/segment/zap/new.go diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 6d0bcd1e..e16a146f 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -28,7 +28,6 @@ import ( "github.com/blevesearch/bleve/document" "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/index/scorch/segment" - "github.com/blevesearch/bleve/index/scorch/segment/mem" "github.com/blevesearch/bleve/index/scorch/segment/zap" "github.com/blevesearch/bleve/index/store" "github.com/blevesearch/bleve/registry" @@ -289,7 +288,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) { var newSegment segment.Segment if len(analysisResults) > 0 { - newSegment, err = zap.NewSegmentBase(mem.NewFromAnalyzedDocs(analysisResults), DefaultChunkFactor) + newSegment, err = zap.AnalysisResultsToSegmentBase(analysisResults, DefaultChunkFactor) if err != nil { return err } diff --git a/index/scorch/segment/zap/new.go b/index/scorch/segment/zap/new.go new file mode 100644 index 00000000..3a8b2012 --- /dev/null +++ b/index/scorch/segment/zap/new.go @@ -0,0 +1,659 @@ +// Copyright (c) 2018 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "bytes" + "encoding/binary" + "math" + "sort" + + "github.com/RoaringBitmap/roaring" + "github.com/Smerity/govarint" + "github.com/blevesearch/bleve/analysis" + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/index" + "github.com/couchbase/vellum" + "github.com/golang/snappy" +) + +// AnalysisResultsToSegmentBase produces an in-memory zap-encoded +// SegmentBase from analysis results +func AnalysisResultsToSegmentBase(results []*index.AnalysisResult, + chunkFactor uint32) (*SegmentBase, error) { + var br bytes.Buffer + + s := interim{ + results: results, + chunkFactor: chunkFactor, + w: NewCountHashWriter(&br), + FieldsMap: map[string]uint16{}, + } + + storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets, + err := s.convert() + if err != nil { + return nil, err + } + + sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkFactor, + s.FieldsMap, s.FieldsInv, uint64(len(results)), + storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets) + + return sb, err +} + +// interim holds temporary working data used while converting from +// analysis results to a zap-encoded segment +type interim struct { + results []*index.AnalysisResult + + chunkFactor uint32 + + w *CountHashWriter + + // FieldsMap adds 1 to field id to avoid zero value issues + // name -> field id + 1 + FieldsMap map[string]uint16 + + // FieldsInv is the inverse of FieldsMap + // field id -> name + FieldsInv []string + + // Term dictionaries for each field + // field id -> term -> postings list id + 1 + Dicts []map[string]uint64 + + // Terms for each field, where terms are sorted ascending + // field id -> []term + DictKeys [][]string + + // Fields whose IncludeDocValues is true + // field id -> bool + IncludeDocValues []bool + + // postings id -> bitmap of docNums + Postings []*roaring.Bitmap + + // postings id -> bitmap of docNums that have locations + PostingsLocs []*roaring.Bitmap + + // postings id -> freq/norm's, one for each docNum in postings + FreqNorms [][]interimFreqNorm + + // postings id -> locs, one for each freq + Locs [][]interimLoc + + buf0 bytes.Buffer + tmp0 []byte + tmp1 []byte +} + +func (s *interim) grabBuf(size int) []byte { + buf := s.tmp0 + if cap(buf) < size { + buf = make([]byte, size) + s.tmp0 = buf + } + return buf[0:size] +} + +type interimStoredField struct { + vals [][]byte + typs []byte + arrayposs [][]uint64 // array positions +} + +type interimFreqNorm struct { + freq uint64 + norm float32 +} + +type interimLoc struct { + fieldID uint16 + pos uint64 + start uint64 + end uint64 + arrayposs []uint64 +} + +func (s *interim) convert() (uint64, uint64, uint64, []uint64, error) { + s.getOrDefineField("_id") // _id field is fieldID 0 + + for _, result := range s.results { + for _, field := range result.Document.CompositeFields { + s.getOrDefineField(field.Name()) + } + for _, field := range result.Document.Fields { + s.getOrDefineField(field.Name()) + } + } + + sort.Strings(s.FieldsInv[1:]) // keep _id as first field + + s.FieldsMap = make(map[string]uint16, len(s.FieldsInv)) + for fieldID, fieldName := range s.FieldsInv { + s.FieldsMap[fieldName] = uint16(fieldID + 1) + } + + s.IncludeDocValues = make([]bool, len(s.FieldsInv)) + + s.prepareDicts() + + for _, dict := range s.DictKeys { + sort.Strings(dict) + } + + s.processDocuments() + + storedIndexOffset, err := s.writeStoredFields() + if err != nil { + return 0, 0, 0, nil, err + } + + var fdvIndexOffset uint64 + var dictOffsets []uint64 + + if len(s.results) > 0 { + fdvIndexOffset, dictOffsets, err = s.writeDicts() + if err != nil { + return 0, 0, 0, nil, err + } + } else { + dictOffsets = make([]uint64, len(s.FieldsInv)) + } + + fieldsIndexOffset, err := persistFields(s.FieldsInv, s.w, dictOffsets) + if err != nil { + return 0, 0, 0, nil, err + } + + return storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets, nil +} + +func (s *interim) getOrDefineField(fieldName string) int { + fieldIDPlus1, exists := s.FieldsMap[fieldName] + if !exists { + fieldIDPlus1 = uint16(len(s.FieldsInv) + 1) + s.FieldsMap[fieldName] = fieldIDPlus1 + s.FieldsInv = append(s.FieldsInv, fieldName) + s.Dicts = append(s.Dicts, make(map[string]uint64)) + s.DictKeys = append(s.DictKeys, make([]string, 0)) + } + return int(fieldIDPlus1 - 1) +} + +// fill Dicts and DictKeys from analysis results +func (s *interim) prepareDicts() { + var pidNext int + + numTermsPerPostingsList := make([]int, 0, 64) // key is postings list id + numLocsPerPostingsList := make([]int, 0, 64) // key is postings list id + + var totTFs int + var totLocs int + + visitField := func(fieldID uint16, tfs analysis.TokenFrequencies) { + dict := s.Dicts[fieldID] + dictKeys := s.DictKeys[fieldID] + + for term, tf := range tfs { + pidPlus1, exists := dict[term] + if !exists { + pidNext++ + pidPlus1 = uint64(pidNext) + + dict[term] = pidPlus1 + dictKeys = append(dictKeys, term) + + numTermsPerPostingsList = append(numTermsPerPostingsList, 0) + numLocsPerPostingsList = append(numLocsPerPostingsList, 0) + } + + pid := pidPlus1 - 1 + + numTermsPerPostingsList[pid] += 1 + numLocsPerPostingsList[pid] += len(tf.Locations) + + totLocs += len(tf.Locations) + } + + totTFs += len(tfs) + + s.DictKeys[fieldID] = dictKeys + } + + for _, result := range s.results { + // walk each composite field + for _, field := range result.Document.CompositeFields { + fieldID := uint16(s.getOrDefineField(field.Name())) + _, tf := field.Analyze() + visitField(fieldID, tf) + } + + // walk each field + for i, field := range result.Document.Fields { + fieldID := uint16(s.getOrDefineField(field.Name())) + tf := result.Analyzed[i] + visitField(fieldID, tf) + } + } + + numPostingsLists := pidNext + + s.Postings = make([]*roaring.Bitmap, numPostingsLists) + for i := 0; i < numPostingsLists; i++ { + s.Postings[i] = roaring.New() + } + + s.PostingsLocs = make([]*roaring.Bitmap, numPostingsLists) + for i := 0; i < numPostingsLists; i++ { + s.PostingsLocs[i] = roaring.New() + } + + s.FreqNorms = make([][]interimFreqNorm, numPostingsLists) + + freqNormsBacking := make([]interimFreqNorm, totTFs) + for pid, numTerms := range numTermsPerPostingsList { + s.FreqNorms[pid] = freqNormsBacking[0:0] + freqNormsBacking = freqNormsBacking[numTerms:] + } + + s.Locs = make([][]interimLoc, numPostingsLists) + + locsBacking := make([]interimLoc, totLocs) + for pid, numLocs := range numLocsPerPostingsList { + s.Locs[pid] = locsBacking[0:0] + locsBacking = locsBacking[numLocs:] + } +} + +func (s *interim) processDocuments() { + numFields := len(s.FieldsInv) + reuseFieldLens := make([]int, numFields) + reuseFieldTFs := make([]analysis.TokenFrequencies, numFields) + + for docNum, result := range s.results { + for i := 0; i < numFields; i++ { // clear these for reuse + reuseFieldLens[i] = 0 + reuseFieldTFs[i] = nil + } + + s.processDocument(uint64(docNum), result, + reuseFieldLens, reuseFieldTFs) + } +} + +func (s *interim) processDocument(docNum uint64, + result *index.AnalysisResult, + fieldLens []int, fieldTFs []analysis.TokenFrequencies) { + visitField := func(fieldID uint16, fieldName string, + ln int, tf analysis.TokenFrequencies) { + fieldLens[fieldID] += ln + + existingFreqs := fieldTFs[fieldID] + if existingFreqs != nil { + existingFreqs.MergeAll(fieldName, tf) + } else { + fieldTFs[fieldID] = tf + } + } + + // walk each composite field + for _, field := range result.Document.CompositeFields { + fieldID := uint16(s.getOrDefineField(field.Name())) + ln, tf := field.Analyze() + visitField(fieldID, field.Name(), ln, tf) + } + + // walk each field + for i, field := range result.Document.Fields { + fieldID := uint16(s.getOrDefineField(field.Name())) + ln := result.Length[i] + tf := result.Analyzed[i] + visitField(fieldID, field.Name(), ln, tf) + } + + // now that it's been rolled up into fieldTFs, walk that + for fieldID, tfs := range fieldTFs { + dict := s.Dicts[fieldID] + norm := float32(1.0 / math.Sqrt(float64(fieldLens[fieldID]))) + + for term, tf := range tfs { + pid := dict[term] - 1 + bs := s.Postings[pid] + bs.AddInt(int(docNum)) + + s.FreqNorms[pid] = append(s.FreqNorms[pid], + interimFreqNorm{ + freq: uint64(tf.Frequency()), + norm: norm, + }) + + if len(tf.Locations) > 0 { + locBS := s.PostingsLocs[pid] + locBS.AddInt(int(docNum)) + + locs := s.Locs[pid] + + for _, loc := range tf.Locations { + var locf = uint16(fieldID) + if loc.Field != "" { + locf = uint16(s.getOrDefineField(loc.Field)) + } + var arrayposs []uint64 + if len(loc.ArrayPositions) > 0 { + arrayposs = loc.ArrayPositions + } + locs = append(locs, interimLoc{ + fieldID: locf, + pos: uint64(loc.Position), + start: uint64(loc.Start), + end: uint64(loc.End), + arrayposs: arrayposs, + }) + } + + s.Locs[pid] = locs + } + } + } +} + +func (s *interim) writeStoredFields() ( + storedIndexOffset uint64, err error) { + metaBuf := &s.buf0 + metaEncoder := govarint.NewU64Base128Encoder(metaBuf) + + data, compressed := s.tmp0[:0], s.tmp1[:0] + defer func() { s.tmp0, s.tmp1 = data, compressed }() + + // keyed by docNum + docStoredOffsets := make([]uint64, len(s.results)) + + // keyed by fieldID, for the current doc in the loop + docStoredFields := map[uint16]interimStoredField{} + + for docNum, result := range s.results { + for fieldID := range docStoredFields { // reset for next doc + delete(docStoredFields, fieldID) + } + + for _, field := range result.Document.Fields { + fieldID := uint16(s.getOrDefineField(field.Name())) + + opts := field.Options() + + if opts.IsStored() { + isf := docStoredFields[fieldID] + isf.vals = append(isf.vals, field.Value()) + isf.typs = append(isf.typs, encodeFieldType(field)) + isf.arrayposs = append(isf.arrayposs, field.ArrayPositions()) + docStoredFields[fieldID] = isf + } + + if opts.IncludeDocValues() { + s.IncludeDocValues[fieldID] = true + } + } + + var curr int + + metaBuf.Reset() + data = data[:0] + compressed = compressed[:0] + + for fieldID := range s.FieldsInv { + isf, exists := docStoredFields[uint16(fieldID)] + if exists { + curr, data, err = persistStoredFieldValues( + fieldID, isf.vals, isf.typs, isf.arrayposs, + curr, metaEncoder, data) + if err != nil { + return 0, err + } + } + } + + metaEncoder.Close() + metaBytes := metaBuf.Bytes() + + compressed = snappy.Encode(compressed, data) + + docStoredOffsets[docNum] = uint64(s.w.Count()) + + _, err := writeUvarints(s.w, + uint64(len(metaBytes)), + uint64(len(compressed))) + if err != nil { + return 0, err + } + + _, err = s.w.Write(metaBytes) + if err != nil { + return 0, err + } + + _, err = s.w.Write(compressed) + if err != nil { + return 0, err + } + } + + storedIndexOffset = uint64(s.w.Count()) + + for _, docStoredOffset := range docStoredOffsets { + err = binary.Write(s.w, binary.BigEndian, docStoredOffset) + if err != nil { + return 0, err + } + } + + return storedIndexOffset, nil +} + +func (s *interim) writeDicts() (uint64, []uint64, error) { + dictOffsets := make([]uint64, len(s.FieldsInv)) + + fdvOffsets := make([]uint64, len(s.FieldsInv)) + + buf := s.grabBuf(binary.MaxVarintLen64) + + tfEncoder := newChunkedIntCoder(uint64(s.chunkFactor), uint64(len(s.results)-1)) + locEncoder := newChunkedIntCoder(uint64(s.chunkFactor), uint64(len(s.results)-1)) + fdvEncoder := newChunkedContentCoder(uint64(s.chunkFactor), uint64(len(s.results)-1)) + + var docTermMap [][]byte + + s.buf0.Reset() + builder, err := vellum.New(&s.buf0, nil) + if err != nil { + return 0, nil, err + } + + for fieldID, terms := range s.DictKeys { + if cap(docTermMap) < len(s.results) { + docTermMap = make([][]byte, len(s.results)) + } else { + docTermMap = docTermMap[0:len(s.results)] + for docNum := range docTermMap { // reset the docTermMap + docTermMap[docNum] = docTermMap[docNum][:0] + } + } + + dict := s.Dicts[fieldID] + + for _, term := range terms { // terms are already sorted + pid := dict[term] - 1 + + postingsBS := s.Postings[pid] + postingsLocsBS := s.PostingsLocs[pid] + + freqNorms := s.FreqNorms[pid] + freqNormOffset := 0 + + locs := s.Locs[pid] + locOffset := 0 + + postingsItr := postingsBS.Iterator() + for postingsItr.HasNext() { + docNum := uint64(postingsItr.Next()) + + freqNorm := freqNorms[freqNormOffset] + + err = tfEncoder.Add(docNum, freqNorm.freq, + uint64(math.Float32bits(freqNorm.norm))) + if err != nil { + return 0, nil, err + } + + for i := uint64(0); i < freqNorm.freq; i++ { + if len(locs) > 0 { + loc := locs[locOffset] + + err = locEncoder.Add(docNum, uint64(loc.fieldID), + loc.pos, loc.start, loc.end, + uint64(len(loc.arrayposs))) + if err != nil { + return 0, nil, err + } + + err = locEncoder.Add(docNum, loc.arrayposs...) + if err != nil { + return 0, nil, err + } + } + + locOffset++ + } + + freqNormOffset++ + + docTermMap[docNum] = append( + append(docTermMap[docNum], term...), + termSeparator) + } + + tfEncoder.Close() + locEncoder.Close() + + postingsOffset, err := writePostings( + postingsBS, postingsLocsBS, tfEncoder, locEncoder, + nil, s.w, buf) + if err != nil { + return 0, nil, err + } + + if postingsOffset > uint64(0) { + err = builder.Insert([]byte(term), postingsOffset) + if err != nil { + return 0, nil, err + } + } + + tfEncoder.Reset() + locEncoder.Reset() + } + + err = builder.Close() + if err != nil { + return 0, nil, err + } + + // record where this dictionary starts + dictOffsets[fieldID] = uint64(s.w.Count()) + + vellumData := s.buf0.Bytes() + + // write out the length of the vellum data + n := binary.PutUvarint(buf, uint64(len(vellumData))) + _, err = s.w.Write(buf[:n]) + if err != nil { + return 0, nil, err + } + + // write this vellum to disk + _, err = s.w.Write(vellumData) + if err != nil { + return 0, nil, err + } + + // reset vellum for reuse + s.buf0.Reset() + + err = builder.Reset(&s.buf0) + if err != nil { + return 0, nil, err + } + + // write the field doc values + if s.IncludeDocValues[fieldID] { + for docNum, docTerms := range docTermMap { + if len(docTerms) > 0 { + err = fdvEncoder.Add(uint64(docNum), docTerms) + if err != nil { + return 0, nil, err + } + } + } + err = fdvEncoder.Close() + if err != nil { + return 0, nil, err + } + + fdvOffsets[fieldID] = uint64(s.w.Count()) + + _, err = fdvEncoder.Write(s.w) + if err != nil { + return 0, nil, err + } + + fdvEncoder.Reset() + } else { + fdvOffsets[fieldID] = fieldNotUninverted + } + } + + fdvIndexOffset := uint64(s.w.Count()) + + for _, fdvOffset := range fdvOffsets { + n := binary.PutUvarint(buf, fdvOffset) + _, err := s.w.Write(buf[:n]) + if err != nil { + return 0, nil, err + } + } + + return fdvIndexOffset, dictOffsets, nil +} + +func encodeFieldType(f document.Field) byte { + fieldType := byte('x') + switch f.(type) { + case *document.TextField: + fieldType = 't' + case *document.NumericField: + fieldType = 'n' + case *document.DateTimeField: + fieldType = 'd' + case *document.BooleanField: + fieldType = 'b' + case *document.GeoPointField: + fieldType = 'g' + case *document.CompositeField: + fieldType = 'c' + } + return fieldType +} From eade78be2f2026b82a66e4eff98ca1567c3d88c3 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Fri, 9 Mar 2018 14:05:17 -0800 Subject: [PATCH 06/20] scorch zap unit tests no longer use mem.Segment --- index/scorch/segment/zap/build_test.go | 37 +++++++++++++------ index/scorch/segment/zap/dict_test.go | 11 ++---- index/scorch/segment/zap/merge_test.go | 47 +++++++++++------------- index/scorch/segment/zap/segment_test.go | 20 +++++----- 4 files changed, 61 insertions(+), 54 deletions(-) diff --git a/index/scorch/segment/zap/build_test.go b/index/scorch/segment/zap/build_test.go index 9063980b..65de7931 100644 --- a/index/scorch/segment/zap/build_test.go +++ b/index/scorch/segment/zap/build_test.go @@ -21,20 +21,22 @@ import ( "github.com/blevesearch/bleve/analysis" "github.com/blevesearch/bleve/document" "github.com/blevesearch/bleve/index" - "github.com/blevesearch/bleve/index/scorch/segment/mem" ) func TestBuild(t *testing.T) { _ = os.RemoveAll("/tmp/scorch.zap") - memSegment := buildMemSegment() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + sb, err := buildTestSegment() + if err != nil { + t.Fatal(err) + } + err = PersistSegmentBase(sb, "/tmp/scorch.zap") if err != nil { t.Fatal(err) } } -func buildMemSegment() *mem.Segment { +func buildTestSegment() (*SegmentBase, error) { doc := &document.Document{ ID: "a", Fields: []document.Field{ @@ -120,11 +122,22 @@ func buildMemSegment() *mem.Segment { } } - return mem.NewFromAnalyzedDocs(results) + return AnalysisResultsToSegmentBase(results, 1024) } -func buildMemSegmentMulti() *mem.Segment { +func buildTestSegmentMulti() (*SegmentBase, error) { + results := buildTestAnalysisResultsMulti() + return AnalysisResultsToSegmentBase(results, 1024) +} + +func buildTestSegmentMultiWithChunkFactor(chunkFactor uint32) (*SegmentBase, error) { + results := buildTestAnalysisResultsMulti() + + return AnalysisResultsToSegmentBase(results, chunkFactor) +} + +func buildTestAnalysisResultsMulti() []*index.AnalysisResult { doc := &document.Document{ ID: "a", Fields: []document.Field{ @@ -282,13 +295,11 @@ func buildMemSegmentMulti() *mem.Segment { } } - segment := mem.NewFromAnalyzedDocs(results) - - return segment + return results } -func buildMemSegmentWithDefaultFieldMapping() (*mem.Segment, []string) { - +func buildTestSegmentWithDefaultFieldMapping(chunkFactor uint32) ( + *SegmentBase, []string, error) { doc := &document.Document{ ID: "a", Fields: []document.Field{ @@ -371,5 +382,7 @@ func buildMemSegmentWithDefaultFieldMapping() (*mem.Segment, []string) { } } - return mem.NewFromAnalyzedDocs(results), fields + sb, err := AnalysisResultsToSegmentBase(results, chunkFactor) + + return sb, fields, err } diff --git a/index/scorch/segment/zap/dict_test.go b/index/scorch/segment/zap/dict_test.go index 336fb37c..b70f2adf 100644 --- a/index/scorch/segment/zap/dict_test.go +++ b/index/scorch/segment/zap/dict_test.go @@ -22,10 +22,9 @@ import ( "github.com/blevesearch/bleve/analysis" "github.com/blevesearch/bleve/document" "github.com/blevesearch/bleve/index" - "github.com/blevesearch/bleve/index/scorch/segment/mem" ) -func buildMemSegmentForDict() *mem.Segment { +func buildTestSegmentForDict() (*SegmentBase, error) { doc := &document.Document{ ID: "a", Fields: []document.Field{ @@ -99,17 +98,15 @@ func buildMemSegmentForDict() *mem.Segment { }, } - segment := mem.NewFromAnalyzedDocs(results) - - return segment + return AnalysisResultsToSegmentBase(results, 1024) } func TestDictionary(t *testing.T) { _ = os.RemoveAll("/tmp/scorch.zap") - memSegment := buildMemSegmentForDict() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + testSeg, _ := buildTestSegmentForDict() + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatalf("error persisting segment: %v", err) } diff --git a/index/scorch/segment/zap/merge_test.go b/index/scorch/segment/zap/merge_test.go index d80b2608..d931f6c2 100644 --- a/index/scorch/segment/zap/merge_test.go +++ b/index/scorch/segment/zap/merge_test.go @@ -26,7 +26,6 @@ import ( "github.com/blevesearch/bleve/analysis" "github.com/blevesearch/bleve/document" "github.com/blevesearch/bleve/index" - "github.com/blevesearch/bleve/index/scorch/segment/mem" ) func TestMerge(t *testing.T) { @@ -34,14 +33,14 @@ func TestMerge(t *testing.T) { _ = os.RemoveAll("/tmp/scorch2.zap") _ = os.RemoveAll("/tmp/scorch3.zap") - memSegment := buildMemSegmentMulti() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + testSeg, _ := buildTestSegmentMulti() + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatal(err) } - memSegment2 := buildMemSegmentMulti2() - err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024) + testSeg2, _ := buildTestSegmentMulti2() + err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap") if err != nil { t.Fatal(err) } @@ -121,8 +120,8 @@ func TestMergeWithEmptySegmentsFirst(t *testing.T) { func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int) { _ = os.RemoveAll("/tmp/scorch.zap") - memSegment := buildMemSegmentMulti() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + testSeg, _ := buildTestSegmentMulti() + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatal(err) } @@ -148,8 +147,8 @@ func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int) _ = os.RemoveAll("/tmp/" + fname) - emptySegment := mem.NewFromAnalyzedDocs([]*index.AnalysisResult{}) - err = PersistSegment(emptySegment, "/tmp/"+fname, 1024) + emptySegment, _ := AnalysisResultsToSegmentBase([]*index.AnalysisResult{}, 1024) + err = PersistSegmentBase(emptySegment, "/tmp/"+fname) if err != nil { t.Fatal(err) } @@ -462,8 +461,8 @@ func testMergeAndDrop(t *testing.T, docsToDrop []*roaring.Bitmap) { _ = os.RemoveAll("/tmp/scorch.zap") _ = os.RemoveAll("/tmp/scorch2.zap") - memSegment := buildMemSegmentMulti() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + testSeg, _ := buildTestSegmentMulti() + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatal(err) } @@ -478,8 +477,8 @@ func testMergeAndDrop(t *testing.T, docsToDrop []*roaring.Bitmap) { } }() - memSegment2 := buildMemSegmentMulti2() - err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024) + testSeg2, _ := buildTestSegmentMulti2() + err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap") if err != nil { t.Fatal(err) } @@ -565,8 +564,8 @@ func testMergeWithUpdates(t *testing.T, segmentDocIds [][]string, docsToDrop []* _ = os.RemoveAll("/tmp/" + fname) - memSegment := buildMemSegmentMultiHelper(docIds) - err := PersistSegment(memSegment, "/tmp/"+fname, 1024) + testSeg, _ := buildTestSegmentMultiHelper(docIds) + err := PersistSegmentBase(testSeg, "/tmp/"+fname) if err != nil { t.Fatal(err) } @@ -616,11 +615,11 @@ func testMergeAndDropSegments(t *testing.T, segsToMerge []*Segment, docsToDrop [ testMergeWithSelf(t, segm.(*Segment), expectedNumDocs) } -func buildMemSegmentMulti2() *mem.Segment { - return buildMemSegmentMultiHelper([]string{"c", "d"}) +func buildTestSegmentMulti2() (*SegmentBase, error) { + return buildTestSegmentMultiHelper([]string{"c", "d"}) } -func buildMemSegmentMultiHelper(docIds []string) *mem.Segment { +func buildTestSegmentMultiHelper(docIds []string) (*SegmentBase, error) { doc := &document.Document{ ID: "c", Fields: []document.Field{ @@ -778,9 +777,7 @@ func buildMemSegmentMultiHelper(docIds []string) *mem.Segment { } } - segment := mem.NewFromAnalyzedDocs(results) - - return segment + return AnalysisResultsToSegmentBase(results, 1024) } func TestMergeBytesWritten(t *testing.T) { @@ -788,14 +785,14 @@ func TestMergeBytesWritten(t *testing.T) { _ = os.RemoveAll("/tmp/scorch2.zap") _ = os.RemoveAll("/tmp/scorch3.zap") - memSegment := buildMemSegmentMulti() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + testSeg, _ := buildTestSegmentMulti() + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatal(err) } - memSegment2 := buildMemSegmentMulti2() - err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024) + testSeg2, _ := buildTestSegmentMulti2() + err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap") if err != nil { t.Fatal(err) } diff --git a/index/scorch/segment/zap/segment_test.go b/index/scorch/segment/zap/segment_test.go index 9ce354ce..50d5dbd7 100644 --- a/index/scorch/segment/zap/segment_test.go +++ b/index/scorch/segment/zap/segment_test.go @@ -28,8 +28,8 @@ import ( func TestOpen(t *testing.T) { _ = os.RemoveAll("/tmp/scorch.zap") - memSegment := buildMemSegment() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + testSeg, _ := buildTestSegment() + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatalf("error persisting segment: %v", err) } @@ -328,8 +328,8 @@ func TestOpen(t *testing.T) { func TestOpenMulti(t *testing.T) { _ = os.RemoveAll("/tmp/scorch.zap") - memSegment := buildMemSegmentMulti() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + testSeg, _ := buildTestSegmentMulti() + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatalf("error persisting segment: %v", err) } @@ -428,8 +428,8 @@ func TestOpenMulti(t *testing.T) { func TestOpenMultiWithTwoChunks(t *testing.T) { _ = os.RemoveAll("/tmp/scorch.zap") - memSegment := buildMemSegmentMulti() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1) + testSeg, _ := buildTestSegmentMultiWithChunkFactor(1) + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatalf("error persisting segment: %v", err) } @@ -523,8 +523,8 @@ func TestOpenMultiWithTwoChunks(t *testing.T) { func TestSegmentVisitableDocValueFieldsList(t *testing.T) { _ = os.RemoveAll("/tmp/scorch.zap") - memSegment := buildMemSegmentMulti() - err := PersistSegment(memSegment, "/tmp/scorch.zap", 1) + testSeg, _ := buildTestSegmentMultiWithChunkFactor(1) + err := PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatalf("error persisting segment: %v", err) } @@ -551,8 +551,8 @@ func TestSegmentVisitableDocValueFieldsList(t *testing.T) { } _ = os.RemoveAll("/tmp/scorch.zap") - memSegment, expectedFields := buildMemSegmentWithDefaultFieldMapping() - err = PersistSegment(memSegment, "/tmp/scorch.zap", 1) + testSeg, expectedFields, _ := buildTestSegmentWithDefaultFieldMapping(1) + err = PersistSegmentBase(testSeg, "/tmp/scorch.zap") if err != nil { t.Fatalf("error persisting segment: %v", err) } From 5abf7b7a19071d24123e1add6443ebd91005fc68 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Fri, 9 Mar 2018 14:11:42 -0800 Subject: [PATCH 07/20] scorch zap remove mem.Segment usage from persist / build.go --- index/scorch/segment/zap/build.go | 488 ------------------------------ 1 file changed, 488 deletions(-) diff --git a/index/scorch/segment/zap/build.go b/index/scorch/segment/zap/build.go index 8ec61095..30ae8d77 100644 --- a/index/scorch/segment/zap/build.go +++ b/index/scorch/segment/zap/build.go @@ -16,16 +16,10 @@ package zap import ( "bufio" - "bytes" - "encoding/binary" "math" "os" - "sort" "github.com/Smerity/govarint" - "github.com/blevesearch/bleve/index/scorch/segment/mem" - "github.com/couchbase/vellum" - "github.com/golang/snappy" ) const version uint32 = 4 @@ -82,186 +76,6 @@ func PersistSegmentBase(sb *SegmentBase, path string) error { return nil } -// PersistSegment takes the in-memory segment and persists it to -// the specified path in the zap file format. -func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) error { - flag := os.O_RDWR | os.O_CREATE - - f, err := os.OpenFile(path, flag, 0600) - if err != nil { - return err - } - - cleanup := func() { - _ = f.Close() - _ = os.Remove(path) - } - - // buffer the output - br := bufio.NewWriter(f) - - // wrap it for counting (tracking offsets) - cr := NewCountHashWriter(br) - - numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, _, err := - persistBase(memSegment, cr, chunkFactor) - if err != nil { - cleanup() - return err - } - - err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, - chunkFactor, cr.Sum32(), cr) - if err != nil { - cleanup() - return err - } - - err = br.Flush() - if err != nil { - cleanup() - return err - } - - err = f.Sync() - if err != nil { - cleanup() - return err - } - - err = f.Close() - if err != nil { - cleanup() - return err - } - - return nil -} - -func persistBase(memSegment *mem.Segment, cr *CountHashWriter, chunkFactor uint32) ( - numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64, - dictLocs []uint64, err error) { - docValueOffset = uint64(fieldNotUninverted) - - if len(memSegment.Stored) > 0 { - storedIndexOffset, err = persistStored(memSegment, cr) - if err != nil { - return 0, 0, 0, 0, nil, err - } - - freqOffsets, locOffsets, err := persistPostingDetails(memSegment, cr, chunkFactor) - if err != nil { - return 0, 0, 0, 0, nil, err - } - - postingsListLocs, err := persistPostingsLocs(memSegment, cr) - if err != nil { - return 0, 0, 0, 0, nil, err - } - - postingsLocs, err := persistPostingsLists(memSegment, cr, postingsListLocs, freqOffsets, locOffsets) - if err != nil { - return 0, 0, 0, 0, nil, err - } - - dictLocs, err = persistDictionary(memSegment, cr, postingsLocs) - if err != nil { - return 0, 0, 0, 0, nil, err - } - - docValueOffset, err = persistFieldDocValues(memSegment, cr, chunkFactor) - if err != nil { - return 0, 0, 0, 0, nil, err - } - } else { - dictLocs = make([]uint64, len(memSegment.FieldsInv)) - } - - fieldsIndexOffset, err = persistFields(memSegment.FieldsInv, cr, dictLocs) - if err != nil { - return 0, 0, 0, 0, nil, err - } - - return uint64(len(memSegment.Stored)), storedIndexOffset, fieldsIndexOffset, docValueOffset, - dictLocs, nil -} - -func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) { - var curr int - var metaBuf bytes.Buffer - var data, compressed []byte - - metaEncoder := govarint.NewU64Base128Encoder(&metaBuf) - - docNumOffsets := make(map[int]uint64, len(memSegment.Stored)) - - for docNum, storedValues := range memSegment.Stored { - if docNum != 0 { - // reset buffer if necessary - curr = 0 - metaBuf.Reset() - data = data[:0] - compressed = compressed[:0] - } - - st := memSegment.StoredTypes[docNum] - sp := memSegment.StoredPos[docNum] - - // encode fields in order - for fieldID := range memSegment.FieldsInv { - if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok { - stf := st[uint16(fieldID)] - spf := sp[uint16(fieldID)] - - var err2 error - curr, data, err2 = persistStoredFieldValues(fieldID, - storedFieldValues, stf, spf, curr, metaEncoder, data) - if err2 != nil { - return 0, err2 - } - } - } - - metaEncoder.Close() - metaBytes := metaBuf.Bytes() - - // compress the data - compressed = snappy.Encode(compressed, data) - - // record where we're about to start writing - docNumOffsets[docNum] = uint64(w.Count()) - - // write out the meta len and compressed data len - _, err := writeUvarints(w, uint64(len(metaBytes)), uint64(len(compressed))) - if err != nil { - return 0, err - } - - // now write the meta - _, err = w.Write(metaBytes) - if err != nil { - return 0, err - } - // now write the compressed data - _, err = w.Write(compressed) - if err != nil { - return 0, err - } - } - - // return value is the start of the stored index - rv := uint64(w.Count()) - // now write out the stored doc index - for docNum := range memSegment.Stored { - err := binary.Write(w, binary.BigEndian, docNumOffsets[docNum]) - if err != nil { - return 0, err - } - } - - return rv, nil -} - func persistStoredFieldValues(fieldID int, storedFieldValues [][]byte, stf []byte, spf [][]uint64, curr int, metaEncoder *govarint.Base128Encoder, data []byte) ( @@ -307,308 +121,6 @@ func persistStoredFieldValues(fieldID int, return curr, data, nil } -func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) { - freqOffsets := make([]uint64, 0, len(memSegment.Postings)) - tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1)) - for postingID := range memSegment.Postings { - if postingID != 0 { - tfEncoder.Reset() - } - freqs := memSegment.Freqs[postingID] - norms := memSegment.Norms[postingID] - postingsListItr := memSegment.Postings[postingID].Iterator() - var offset int - for postingsListItr.HasNext() { - docNum := uint64(postingsListItr.Next()) - - // put freq & norm - err := tfEncoder.Add(docNum, freqs[offset], uint64(math.Float32bits(norms[offset]))) - if err != nil { - return nil, nil, err - } - - offset++ - } - - // record where this postings freq info starts - freqOffsets = append(freqOffsets, uint64(w.Count())) - - tfEncoder.Close() - _, err := tfEncoder.Write(w) - if err != nil { - return nil, nil, err - } - } - - // now do it again for the locations - locOffsets := make([]uint64, 0, len(memSegment.Postings)) - locEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1)) - for postingID := range memSegment.Postings { - if postingID != 0 { - locEncoder.Reset() - } - freqs := memSegment.Freqs[postingID] - locfields := memSegment.Locfields[postingID] - locpos := memSegment.Locpos[postingID] - locstarts := memSegment.Locstarts[postingID] - locends := memSegment.Locends[postingID] - locarraypos := memSegment.Locarraypos[postingID] - postingsListItr := memSegment.Postings[postingID].Iterator() - var offset int - var locOffset int - for postingsListItr.HasNext() { - docNum := uint64(postingsListItr.Next()) - n := int(freqs[offset]) - for i := 0; i < n; i++ { - if len(locfields) > 0 { - err := locEncoder.Add(docNum, uint64(locfields[locOffset]), - locpos[locOffset], locstarts[locOffset], locends[locOffset], - uint64(len(locarraypos[locOffset]))) - if err != nil { - return nil, nil, err - } - - // put each array position - err = locEncoder.Add(docNum, locarraypos[locOffset]...) - if err != nil { - return nil, nil, err - } - } - locOffset++ - } - offset++ - } - - // record where this postings loc info starts - locOffsets = append(locOffsets, uint64(w.Count())) - - locEncoder.Close() - _, err := locEncoder.Write(w) - if err != nil { - return nil, nil, err - } - } - - return freqOffsets, locOffsets, nil -} - -func persistPostingsLocs(memSegment *mem.Segment, w *CountHashWriter) (rv []uint64, err error) { - rv = make([]uint64, 0, len(memSegment.PostingsLocs)) - reuseBufVarint := make([]byte, binary.MaxVarintLen64) - for postingID := range memSegment.PostingsLocs { - // record where we start this posting loc - rv = append(rv, uint64(w.Count())) - // write out the length and bitmap - _, err = writeRoaringWithLen(memSegment.PostingsLocs[postingID], w, reuseBufVarint) - if err != nil { - return nil, err - } - } - return rv, nil -} - -func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter, - postingsListLocs, freqOffsets, locOffsets []uint64) (rv []uint64, err error) { - rv = make([]uint64, 0, len(memSegment.Postings)) - reuseBufVarint := make([]byte, binary.MaxVarintLen64) - for postingID := range memSegment.Postings { - // record where we start this posting list - rv = append(rv, uint64(w.Count())) - - // write out the term info, loc info, and loc posting list offset - _, err = writeUvarints(w, freqOffsets[postingID], - locOffsets[postingID], postingsListLocs[postingID]) - if err != nil { - return nil, err - } - - // write out the length and bitmap - _, err = writeRoaringWithLen(memSegment.Postings[postingID], w, reuseBufVarint) - if err != nil { - return nil, err - } - } - return rv, nil -} - -func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs []uint64) ([]uint64, error) { - rv := make([]uint64, 0, len(memSegment.DictKeys)) - - varintBuf := make([]byte, binary.MaxVarintLen64) - - var buffer bytes.Buffer - builder, err := vellum.New(&buffer, nil) - if err != nil { - return nil, err - } - for fieldID, fieldTerms := range memSegment.DictKeys { - - dict := memSegment.Dicts[fieldID] - // now walk the dictionary in order of fieldTerms (already sorted) - for _, fieldTerm := range fieldTerms { - postingID := dict[fieldTerm] - 1 - postingsAddr := postingsLocs[postingID] - err = builder.Insert([]byte(fieldTerm), postingsAddr) - if err != nil { - return nil, err - } - } - err = builder.Close() - if err != nil { - return nil, err - } - - // record where this dictionary starts - rv = append(rv, uint64(w.Count())) - - vellumData := buffer.Bytes() - - // write out the length of the vellum data - n := binary.PutUvarint(varintBuf, uint64(len(vellumData))) - _, err = w.Write(varintBuf[:n]) - if err != nil { - return nil, err - } - - // write this vellum to disk - _, err = w.Write(vellumData) - if err != nil { - return nil, err - } - - // reset buffer and vellum builder - buffer.Reset() - err = builder.Reset(&buffer) - if err != nil { - return nil, err - } - } - - return rv, nil -} - -type docIDRange []uint64 - -func (a docIDRange) Len() int { return len(a) } -func (a docIDRange) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a docIDRange) Less(i, j int) bool { return a[i] < a[j] } - -func persistDocValues(memSegment *mem.Segment, w *CountHashWriter, - chunkFactor uint32) (map[uint16]uint64, error) { - fieldChunkOffsets := make(map[uint16]uint64, len(memSegment.FieldsInv)) - fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1)) - - var postings *mem.PostingsList - var postingsItr *mem.PostingsIterator - - for fieldID := range memSegment.DocValueFields { - field := memSegment.FieldsInv[fieldID] - docTermMap := make(map[uint64][]byte, 0) - dict, err := memSegment.Dictionary(field) - if err != nil { - return nil, err - } - - dictItr := dict.Iterator() - next, err := dictItr.Next() - for err == nil && next != nil { - var err1 error - postings, err1 = dict.(*mem.Dictionary).InitPostingsList(next.Term, nil, postings) - if err1 != nil { - return nil, err1 - } - - postingsItr = postings.InitIterator(postingsItr) - nextPosting, err2 := postingsItr.Next() - for err2 == nil && nextPosting != nil { - docNum := nextPosting.Number() - docTermMap[docNum] = append(append(docTermMap[docNum], []byte(next.Term)...), termSeparator) - nextPosting, err2 = postingsItr.Next() - } - if err2 != nil { - return nil, err2 - } - - next, err = dictItr.Next() - } - if err != nil { - return nil, err - } - - // sort wrt to docIDs - docNumbers := make(docIDRange, 0, len(docTermMap)) - for k := range docTermMap { - docNumbers = append(docNumbers, k) - } - sort.Sort(docNumbers) - - for _, docNum := range docNumbers { - err = fdvEncoder.Add(docNum, docTermMap[docNum]) - if err != nil { - return nil, err - } - } - - fieldChunkOffsets[fieldID] = uint64(w.Count()) - err = fdvEncoder.Close() - if err != nil { - return nil, err - } - // persist the doc value details for this field - _, err = fdvEncoder.Write(w) - if err != nil { - return nil, err - } - // reseting encoder for the next field - fdvEncoder.Reset() - } - - return fieldChunkOffsets, nil -} - -func persistFieldDocValues(memSegment *mem.Segment, w *CountHashWriter, - chunkFactor uint32) (uint64, error) { - fieldDvOffsets, err := persistDocValues(memSegment, w, chunkFactor) - if err != nil { - return 0, err - } - - fieldDocValuesOffset := uint64(w.Count()) - buf := make([]byte, binary.MaxVarintLen64) - offset := uint64(0) - ok := true - for fieldID := range memSegment.FieldsInv { - // if the field isn't configured for docValue, then mark - // the offset accordingly - if offset, ok = fieldDvOffsets[uint16(fieldID)]; !ok { - offset = fieldNotUninverted - } - n := binary.PutUvarint(buf, uint64(offset)) - _, err := w.Write(buf[:n]) - if err != nil { - return 0, err - } - } - - return fieldDocValuesOffset, nil -} - -func NewSegmentBase(memSegment *mem.Segment, chunkFactor uint32) (*SegmentBase, error) { - var br bytes.Buffer - - cr := NewCountHashWriter(&br) - - numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs, err := - persistBase(memSegment, cr, chunkFactor) - if err != nil { - return nil, err - } - - return InitSegmentBase(br.Bytes(), cr.Sum32(), chunkFactor, - memSegment.FieldsMap, memSegment.FieldsInv, numDocs, - storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs) -} - func InitSegmentBase(mem []byte, memCRC uint32, chunkFactor uint32, fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64, storedIndexOffset uint64, fieldsIndexOffset uint64, docValueOffset uint64, From 2a20a36e154e58b35b37904d59a372eed20ee792 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Fri, 9 Mar 2018 23:19:47 -0800 Subject: [PATCH 08/20] scorch zap optimimze to avoid bitmaps for 1-hit posting lists This commit avoids creating roaring.Bitmap's (which would have just a single entry) when a postings list/iterator represents a single "1-hit" encoding. --- index/scorch/segment/zap/posting.go | 133 ++++++++++++++++------------ index/scorch/segment/zap/segment.go | 8 +- 2 files changed, 80 insertions(+), 61 deletions(-) diff --git a/index/scorch/segment/zap/posting.go b/index/scorch/segment/zap/posting.go index 7ae36120..f2df32bf 100644 --- a/index/scorch/segment/zap/posting.go +++ b/index/scorch/segment/zap/posting.go @@ -92,6 +92,8 @@ func under32Bits(x uint64) bool { return x <= mask31Bits } +const docNum1HitFinished = math.MaxUint64 + // PostingsList is an in-memory represenation of a postings list type PostingsList struct { sb *SegmentBase @@ -102,8 +104,9 @@ type PostingsList struct { postings *roaring.Bitmap except *roaring.Bitmap - // when postingsOffset == freqOffset == 0, then the postings list - // represents a "1-hit" encoding, and has the following norm + // when normBits1Hit != 0, then this postings list came from a + // 1-hit encoding, and only the docNum1Hit & normBits1Hit apply + docNum1Hit uint64 normBits1Hit uint64 } @@ -117,6 +120,17 @@ func (p *PostingsList) Size() int { return sizeInBytes } +func (p *PostingsList) OrInto(receiver *roaring.Bitmap) { + if p.normBits1Hit != 0 { + receiver.Add(uint32(p.docNum1Hit)) + return + } + + if p.postings != nil { + receiver.Or(p.postings) + } +} + // Iterator returns an iterator for this postings list func (p *PostingsList) Iterator() segment.PostingsIterator { return p.iterator(nil) @@ -152,39 +166,47 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator { } rv.postings = p + if p.normBits1Hit != 0 { + // "1-hit" encoding + rv.docNum1Hit = p.docNum1Hit + rv.normBits1Hit = p.normBits1Hit + + if p.except != nil && p.except.Contains(uint32(rv.docNum1Hit)) { + rv.docNum1Hit = docNum1HitFinished + } + + return rv + } + + // "general" encoding, check if empty if p.postings == nil { return rv } - if p.freqOffset > 0 && p.locOffset > 0 { - // "general" encoding, so prepare the freq chunk details - var n uint64 - var read int - var numFreqChunks uint64 - numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) + // prepare the freq chunk details + var n uint64 + var read int + var numFreqChunks uint64 + numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) + n += uint64(read) + rv.freqChunkLens = make([]uint64, int(numFreqChunks)) + for i := 0; i < int(numFreqChunks); i++ { + rv.freqChunkLens[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) n += uint64(read) - rv.freqChunkLens = make([]uint64, int(numFreqChunks)) - for i := 0; i < int(numFreqChunks); i++ { - rv.freqChunkLens[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) - n += uint64(read) - } - rv.freqChunkStart = p.freqOffset + n - - // prepare the loc chunk details - n = 0 - var numLocChunks uint64 - numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) - n += uint64(read) - rv.locChunkLens = make([]uint64, int(numLocChunks)) - for i := 0; i < int(numLocChunks); i++ { - rv.locChunkLens[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) - n += uint64(read) - } - rv.locChunkStart = p.locOffset + n - } else { - // "1-hit" encoding - rv.normBits1Hit = p.normBits1Hit } + rv.freqChunkStart = p.freqOffset + n + + // prepare the loc chunk details + n = 0 + var numLocChunks uint64 + numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) + n += uint64(read) + rv.locChunkLens = make([]uint64, int(numLocChunks)) + for i := 0; i < int(numLocChunks); i++ { + rv.locChunkLens[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) + n += uint64(read) + } + rv.locChunkStart = p.locOffset + n rv.locBitmap = p.locBitmap @@ -201,18 +223,20 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator { // Count returns the number of items on this postings list func (p *PostingsList) Count() uint64 { - if p.postings != nil { - n := p.postings.GetCardinality() - if p.except != nil { - e := p.except.GetCardinality() - if e > n { - e = n - } - return n - e - } - return n + var n uint64 + if p.normBits1Hit != 0 { + n = 1 + } else if p.postings != nil { + n = p.postings.GetCardinality() } - return 0 + var e uint64 + if p.except != nil { + e = p.except.GetCardinality() + } + if n <= e { + return 0 + } + return n - e } func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error { @@ -263,19 +287,10 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error { return nil } -var emptyRoaring = roaring.NewBitmap() - func (rv *PostingsList) init1Hit(fstVal uint64) error { docNum, normBits := FSTValDecode1Hit(fstVal) - rv.locBitmap = emptyRoaring - - rv.postings = roaring.NewBitmap() - rv.postings.Add(uint32(docNum)) - - // TODO: we can likely do better than allocating a roaring bitmap - // with just 1 entry, but for now reuse existing machinery - + rv.docNum1Hit = docNum rv.normBits1Hit = normBits return nil @@ -308,6 +323,7 @@ type PostingsIterator struct { next Posting // reused across Next() calls nextLocs []Location // reused across Next() calls + docNum1Hit uint64 normBits1Hit uint64 buf []byte @@ -460,7 +476,7 @@ func (i *PostingsIterator) Next() (segment.Posting, error) { } rv.norm = math.Float32frombits(uint32(normBits)) - if i.locBitmap.Contains(uint32(docNum)) { + if i.locBitmap != nil && i.locBitmap.Contains(uint32(docNum)) { // read off 'freq' locations, into reused slices if cap(i.nextLocs) >= int(rv.freq) { i.nextLocs = i.nextLocs[0:rv.freq] @@ -513,7 +529,7 @@ func (i *PostingsIterator) nextBytes() ( endFreqNorm := len(i.currChunkFreqNorm) - i.freqNormReader.Len() bytesFreqNorm = i.currChunkFreqNorm[startFreqNorm:endFreqNorm] - if i.locBitmap.Contains(uint32(docNum)) { + if i.locBitmap != nil && i.locBitmap.Contains(uint32(docNum)) { startLoc := len(i.currChunkLoc) - i.locReader.Len() for j := uint64(0); j < freq; j++ { @@ -533,6 +549,15 @@ func (i *PostingsIterator) nextBytes() ( // nextDocNum returns the next docNum on the postings list, and also // sets up the currChunk / loc related fields of the iterator. func (i *PostingsIterator) nextDocNum() (uint64, bool, error) { + if i.normBits1Hit != 0 { + if i.docNum1Hit == docNum1HitFinished { + return 0, false, nil + } + docNum := i.docNum1Hit + i.docNum1Hit = docNum1HitFinished // consume our 1-hit docNum + return docNum, true, nil + } + if i.actual == nil || !i.actual.HasNext() { return 0, false, nil } @@ -540,10 +565,6 @@ func (i *PostingsIterator) nextDocNum() (uint64, bool, error) { n := i.actual.Next() allN := i.all.Next() - if i.normBits1Hit != 0 { - return uint64(n), true, nil - } - nChunk := n / i.postings.sb.chunkFactor allNChunk := allN / i.postings.sb.chunkFactor diff --git a/index/scorch/segment/zap/segment.go b/index/scorch/segment/zap/segment.go index 972b7578..e1d2a14f 100644 --- a/index/scorch/segment/zap/segment.go +++ b/index/scorch/segment/zap/segment.go @@ -341,15 +341,13 @@ func (s *SegmentBase) DocNumbers(ids []string) (*roaring.Bitmap, error) { return nil, err } - var postings *PostingsList + var postingsList *PostingsList for _, id := range ids { - postings, err = idDict.postingsList([]byte(id), nil, postings) + postingsList, err = idDict.postingsList([]byte(id), nil, postingsList) if err != nil { return nil, err } - if postings.postings != nil { - rv.Or(postings.postings) - } + postingsList.OrInto(rv) } } From 90aa91105ab3ab5848902a59e161e6a642aa3577 Mon Sep 17 00:00:00 2001 From: Sreekanth Sivasankaran Date: Fri, 9 Mar 2018 11:19:39 +0530 Subject: [PATCH 09/20] handling only int, float64 values --- index/scorch/scorch.go | 31 ++++++------------------------- 1 file changed, 6 insertions(+), 25 deletions(-) diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 7a33fb7f..c171092d 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -516,33 +516,14 @@ func init() { registry.RegisterIndexType(Name, NewScorch) } -func parseToInteger(v interface{}) (int, error) { - switch v.(type) { - case float32: - return int(v.(float32)), nil +func parseToInteger(i interface{}) (int, error) { + switch v := i.(type) { case float64: - return int(v.(float64)), nil + return int(v), nil case int: - return v.(int), nil - case int8: - return int(v.(int8)), nil - case int16: - return int(v.(int16)), nil - case int32: - return int(v.(int32)), nil - case int64: - return int(v.(int64)), nil - case uint: - return int(v.(uint)), nil - case uint8: - return int(v.(uint8)), nil - case uint16: - return int(v.(uint16)), nil - case uint32: - return int(v.(uint32)), nil - case uint64: - return int(v.(uint64)), nil + return v, nil + default: - return 0, fmt.Errorf("expects a numeric value") + return 0, fmt.Errorf("expects int or float64 value") } } From 531800c479915f58ab823f00fe1f5fa112695702 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Sun, 11 Mar 2018 16:30:52 -0700 Subject: [PATCH 10/20] scorch zap use roaring Add() instead of AddInt() This change invokes Add() directly as AddInt() is a convenience wrapper around Add(). --- index/scorch/segment/zap/new.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/index/scorch/segment/zap/new.go b/index/scorch/segment/zap/new.go index 3a8b2012..f29711c0 100644 --- a/index/scorch/segment/zap/new.go +++ b/index/scorch/segment/zap/new.go @@ -334,7 +334,7 @@ func (s *interim) processDocument(docNum uint64, for term, tf := range tfs { pid := dict[term] - 1 bs := s.Postings[pid] - bs.AddInt(int(docNum)) + bs.Add(uint32(docNum)) s.FreqNorms[pid] = append(s.FreqNorms[pid], interimFreqNorm{ @@ -344,7 +344,7 @@ func (s *interim) processDocument(docNum uint64, if len(tf.Locations) > 0 { locBS := s.PostingsLocs[pid] - locBS.AddInt(int(docNum)) + locBS.Add(uint32(docNum)) locs := s.Locs[pid] From c4ceffe58430d4f4103d5cca7a0777dceebd8af6 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Sun, 11 Mar 2018 12:09:21 -0700 Subject: [PATCH 11/20] scorch zap sync Pool for interim data --- index/scorch/segment/zap/new.go | 93 +++++++++++++++++++++++++++------ 1 file changed, 78 insertions(+), 15 deletions(-) diff --git a/index/scorch/segment/zap/new.go b/index/scorch/segment/zap/new.go index f29711c0..c7a6b0ce 100644 --- a/index/scorch/segment/zap/new.go +++ b/index/scorch/segment/zap/new.go @@ -19,6 +19,7 @@ import ( "encoding/binary" "math" "sort" + "sync" "github.com/RoaringBitmap/roaring" "github.com/Smerity/govarint" @@ -35,12 +36,11 @@ func AnalysisResultsToSegmentBase(results []*index.AnalysisResult, chunkFactor uint32) (*SegmentBase, error) { var br bytes.Buffer - s := interim{ - results: results, - chunkFactor: chunkFactor, - w: NewCountHashWriter(&br), - FieldsMap: map[string]uint16{}, - } + s := interimPool.Get().(*interim) + + s.results = results + s.chunkFactor = chunkFactor + s.w = NewCountHashWriter(&br) storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets, err := s.convert() @@ -52,9 +52,13 @@ func AnalysisResultsToSegmentBase(results []*index.AnalysisResult, s.FieldsMap, s.FieldsInv, uint64(len(results)), storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets) + interimPool.Put(s.cleanse()) + return sb, err } +var interimPool = sync.Pool{New: func() interface{} { return &interim{} }} + // interim holds temporary working data used while converting from // analysis results to a zap-encoded segment type interim struct { @@ -101,6 +105,41 @@ type interim struct { tmp1 []byte } +func (s *interim) cleanse() *interim { + s.results = nil + s.chunkFactor = 0 + s.w = nil + s.FieldsMap = nil + s.FieldsInv = s.FieldsInv[:0] + for i := range s.Dicts { + s.Dicts[i] = nil + } + s.Dicts = s.Dicts[:0] + for i := range s.DictKeys { + s.DictKeys[i] = s.DictKeys[i][:0] + } + s.DictKeys = s.DictKeys[:0] + for i := range s.IncludeDocValues { + s.IncludeDocValues[i] = false + } + s.IncludeDocValues = s.IncludeDocValues[:0] + for _, idn := range s.Postings { + idn.Clear() + } + s.Postings = s.Postings[:0] + for _, idn := range s.PostingsLocs { + idn.Clear() + } + s.PostingsLocs = s.PostingsLocs[:0] + s.FreqNorms = nil + s.Locs = nil + s.buf0.Reset() + s.tmp0 = s.tmp0[:0] + s.tmp1 = s.tmp1[:0] + + return s +} + func (s *interim) grabBuf(size int) []byte { buf := s.tmp0 if cap(buf) < size { @@ -130,6 +169,8 @@ type interimLoc struct { } func (s *interim) convert() (uint64, uint64, uint64, []uint64, error) { + s.FieldsMap = map[string]uint16{} + s.getOrDefineField("_id") // _id field is fieldID 0 for _, result := range s.results { @@ -143,12 +184,15 @@ func (s *interim) convert() (uint64, uint64, uint64, []uint64, error) { sort.Strings(s.FieldsInv[1:]) // keep _id as first field - s.FieldsMap = make(map[string]uint16, len(s.FieldsInv)) for fieldID, fieldName := range s.FieldsInv { s.FieldsMap[fieldName] = uint16(fieldID + 1) } - s.IncludeDocValues = make([]bool, len(s.FieldsInv)) + if cap(s.IncludeDocValues) >= len(s.FieldsInv) { + s.IncludeDocValues = s.IncludeDocValues[:len(s.FieldsInv)] + } else { + s.IncludeDocValues = make([]bool, len(s.FieldsInv)) + } s.prepareDicts() @@ -189,9 +233,18 @@ func (s *interim) getOrDefineField(fieldName string) int { fieldIDPlus1 = uint16(len(s.FieldsInv) + 1) s.FieldsMap[fieldName] = fieldIDPlus1 s.FieldsInv = append(s.FieldsInv, fieldName) + s.Dicts = append(s.Dicts, make(map[string]uint64)) - s.DictKeys = append(s.DictKeys, make([]string, 0)) + + n := len(s.DictKeys) + if n < cap(s.DictKeys) { + s.DictKeys = s.DictKeys[:n+1] + s.DictKeys[n] = s.DictKeys[n][:0] + } else { + s.DictKeys = append(s.DictKeys, []string(nil)) + } } + return int(fieldIDPlus1 - 1) } @@ -253,16 +306,25 @@ func (s *interim) prepareDicts() { numPostingsLists := pidNext - s.Postings = make([]*roaring.Bitmap, numPostingsLists) - for i := 0; i < numPostingsLists; i++ { - s.Postings[i] = roaring.New() + if cap(s.Postings) >= numPostingsLists { + s.Postings = s.Postings[:numPostingsLists] + } else { + s.Postings = make([]*roaring.Bitmap, numPostingsLists) + for i := 0; i < numPostingsLists; i++ { + s.Postings[i] = roaring.New() + } } - s.PostingsLocs = make([]*roaring.Bitmap, numPostingsLists) - for i := 0; i < numPostingsLists; i++ { - s.PostingsLocs[i] = roaring.New() + if cap(s.PostingsLocs) >= numPostingsLists { + s.PostingsLocs = s.PostingsLocs[:numPostingsLists] + } else { + s.PostingsLocs = make([]*roaring.Bitmap, numPostingsLists) + for i := 0; i < numPostingsLists; i++ { + s.PostingsLocs[i] = roaring.New() + } } + // TODO: reuse this. s.FreqNorms = make([][]interimFreqNorm, numPostingsLists) freqNormsBacking := make([]interimFreqNorm, totTFs) @@ -271,6 +333,7 @@ func (s *interim) prepareDicts() { freqNormsBacking = freqNormsBacking[numTerms:] } + // TODO: reuse this. s.Locs = make([][]interimLoc, numPostingsLists) locsBacking := make([]interimLoc, totLocs) From cad88096cacab844536d322312c0a59f150926c3 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Sun, 11 Mar 2018 16:30:34 -0700 Subject: [PATCH 12/20] scorch zap reuse roaring Bitmap during merge --- index/scorch/segment/zap/merge.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index a934dfc3..07de0943 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -183,6 +183,9 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, return nil, 0, err } + newRoaring := roaring.NewBitmap() + newRoaringLocs := roaring.NewBitmap() + // for each field for fieldID, fieldName := range fieldsInv { @@ -222,8 +225,8 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, var prevTerm []byte - newRoaring := roaring.NewBitmap() - newRoaringLocs := roaring.NewBitmap() + newRoaring.Clear() + newRoaringLocs.Clear() var lastDocNum, lastFreq, lastNorm uint64 @@ -262,8 +265,8 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap, } } - newRoaring = roaring.NewBitmap() - newRoaringLocs = roaring.NewBitmap() + newRoaring.Clear() + newRoaringLocs.Clear() tfEncoder.Reset() locEncoder.Reset() From b1f39695217295421681d509744e55c0027b5f5d Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Sun, 11 Mar 2018 20:13:31 -0700 Subject: [PATCH 13/20] scorch zap reuse roaring Bitmap in postings lists --- index/scorch/segment/zap/dict.go | 12 ++++++++++++ index/scorch/segment/zap/posting.go | 8 ++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/index/scorch/segment/zap/dict.go b/index/scorch/segment/zap/dict.go index e5d71268..3b8132f2 100644 --- a/index/scorch/segment/zap/dict.go +++ b/index/scorch/segment/zap/dict.go @@ -68,7 +68,19 @@ func (d *Dictionary) postingsListInit(rv *PostingsList, except *roaring.Bitmap) if rv == nil { rv = &PostingsList{} } else { + postings := rv.postings + if postings != nil { + postings.Clear() + } + locBitmap := rv.locBitmap + if locBitmap != nil { + locBitmap.Clear() + } + *rv = PostingsList{} // clear the struct + + rv.postings = postings + rv.locBitmap = locBitmap } rv.sb = d.sb rv.except = except diff --git a/index/scorch/segment/zap/posting.go b/index/scorch/segment/zap/posting.go index f2df32bf..bc533ad1 100644 --- a/index/scorch/segment/zap/posting.go +++ b/index/scorch/segment/zap/posting.go @@ -266,7 +266,9 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error { locRoaringBytes := d.sb.mem[locBitmapOffset+uint64(read) : locBitmapOffset+uint64(read)+locBitmapLen] - rv.locBitmap = roaring.NewBitmap() + if rv.locBitmap == nil { + rv.locBitmap = roaring.NewBitmap() + } _, err := rv.locBitmap.FromBuffer(locRoaringBytes) if err != nil { return fmt.Errorf("error loading roaring bitmap of locations with hits: %v", err) @@ -278,7 +280,9 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error { roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen] - rv.postings = roaring.NewBitmap() + if rv.postings == nil { + rv.postings = roaring.NewBitmap() + } _, err = rv.postings.FromBuffer(roaringBytes) if err != nil { return fmt.Errorf("error loading roaring bitmap: %v", err) From 07901910e24dabd67f9e26c88a03a8675e0fd87d Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Sun, 11 Mar 2018 21:07:14 -0700 Subject: [PATCH 14/20] scorch zap reuse roaring Bitmap in prepareDicts() slice growth In this change, if the postings/postingsLocs slices need to be grown, then copy over and reuse any of the preallocated roaring Bitmap's from the old slice. --- index/scorch/segment/zap/new.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/index/scorch/segment/zap/new.go b/index/scorch/segment/zap/new.go index c7a6b0ce..5b625b99 100644 --- a/index/scorch/segment/zap/new.go +++ b/index/scorch/segment/zap/new.go @@ -309,19 +309,27 @@ func (s *interim) prepareDicts() { if cap(s.Postings) >= numPostingsLists { s.Postings = s.Postings[:numPostingsLists] } else { - s.Postings = make([]*roaring.Bitmap, numPostingsLists) + postings := make([]*roaring.Bitmap, numPostingsLists) + copy(postings, s.Postings[:cap(s.Postings)]) for i := 0; i < numPostingsLists; i++ { - s.Postings[i] = roaring.New() + if postings[i] == nil { + postings[i] = roaring.New() + } } + s.Postings = postings } if cap(s.PostingsLocs) >= numPostingsLists { s.PostingsLocs = s.PostingsLocs[:numPostingsLists] } else { - s.PostingsLocs = make([]*roaring.Bitmap, numPostingsLists) + postingsLocs := make([]*roaring.Bitmap, numPostingsLists) + copy(postingsLocs, s.PostingsLocs[:cap(s.PostingsLocs)]) for i := 0; i < numPostingsLists; i++ { - s.PostingsLocs[i] = roaring.New() + if postingsLocs[i] == nil { + postingsLocs[i] = roaring.New() + } } + s.PostingsLocs = postingsLocs } // TODO: reuse this. From dbfc5e913027c7dc5a85f9a9f91476babf8322da Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Mon, 12 Mar 2018 10:04:11 -0700 Subject: [PATCH 15/20] scorch zap reuse interim freq/norm/loc slices --- index/scorch/segment/zap/new.go | 68 ++++++++++++++++++++++++--------- 1 file changed, 49 insertions(+), 19 deletions(-) diff --git a/index/scorch/segment/zap/new.go b/index/scorch/segment/zap/new.go index 5b625b99..51971ba5 100644 --- a/index/scorch/segment/zap/new.go +++ b/index/scorch/segment/zap/new.go @@ -95,10 +95,15 @@ type interim struct { PostingsLocs []*roaring.Bitmap // postings id -> freq/norm's, one for each docNum in postings - FreqNorms [][]interimFreqNorm + FreqNorms [][]interimFreqNorm + freqNormsBacking []interimFreqNorm // postings id -> locs, one for each freq - Locs [][]interimLoc + Locs [][]interimLoc + locsBacking []interimLoc + + numTermsPerPostingsList []int // key is postings list id + numLocsPerPostingsList []int // key is postings list id buf0 bytes.Buffer tmp0 []byte @@ -131,8 +136,18 @@ func (s *interim) cleanse() *interim { idn.Clear() } s.PostingsLocs = s.PostingsLocs[:0] - s.FreqNorms = nil - s.Locs = nil + s.FreqNorms = s.FreqNorms[:0] + for i := range s.freqNormsBacking { + s.freqNormsBacking[i] = interimFreqNorm{} + } + s.freqNormsBacking = s.freqNormsBacking[:0] + s.Locs = s.Locs[:0] + for i := range s.locsBacking { + s.locsBacking[i] = interimLoc{} + } + s.locsBacking = s.locsBacking[:0] + s.numTermsPerPostingsList = s.numTermsPerPostingsList[:0] + s.numLocsPerPostingsList = s.numLocsPerPostingsList[:0] s.buf0.Reset() s.tmp0 = s.tmp0[:0] s.tmp1 = s.tmp1[:0] @@ -252,9 +267,6 @@ func (s *interim) getOrDefineField(fieldName string) int { func (s *interim) prepareDicts() { var pidNext int - numTermsPerPostingsList := make([]int, 0, 64) // key is postings list id - numLocsPerPostingsList := make([]int, 0, 64) // key is postings list id - var totTFs int var totLocs int @@ -271,14 +283,14 @@ func (s *interim) prepareDicts() { dict[term] = pidPlus1 dictKeys = append(dictKeys, term) - numTermsPerPostingsList = append(numTermsPerPostingsList, 0) - numLocsPerPostingsList = append(numLocsPerPostingsList, 0) + s.numTermsPerPostingsList = append(s.numTermsPerPostingsList, 0) + s.numLocsPerPostingsList = append(s.numLocsPerPostingsList, 0) } pid := pidPlus1 - 1 - numTermsPerPostingsList[pid] += 1 - numLocsPerPostingsList[pid] += len(tf.Locations) + s.numTermsPerPostingsList[pid] += 1 + s.numLocsPerPostingsList[pid] += len(tf.Locations) totLocs += len(tf.Locations) } @@ -332,20 +344,38 @@ func (s *interim) prepareDicts() { s.PostingsLocs = postingsLocs } - // TODO: reuse this. - s.FreqNorms = make([][]interimFreqNorm, numPostingsLists) + if cap(s.FreqNorms) >= numPostingsLists { + s.FreqNorms = s.FreqNorms[:numPostingsLists] + } else { + s.FreqNorms = make([][]interimFreqNorm, numPostingsLists) + } - freqNormsBacking := make([]interimFreqNorm, totTFs) - for pid, numTerms := range numTermsPerPostingsList { + if cap(s.freqNormsBacking) >= totTFs { + s.freqNormsBacking = s.freqNormsBacking[:totTFs] + } else { + s.freqNormsBacking = make([]interimFreqNorm, totTFs) + } + + freqNormsBacking := s.freqNormsBacking + for pid, numTerms := range s.numTermsPerPostingsList { s.FreqNorms[pid] = freqNormsBacking[0:0] freqNormsBacking = freqNormsBacking[numTerms:] } - // TODO: reuse this. - s.Locs = make([][]interimLoc, numPostingsLists) + if cap(s.Locs) >= numPostingsLists { + s.Locs = s.Locs[:numPostingsLists] + } else { + s.Locs = make([][]interimLoc, numPostingsLists) + } - locsBacking := make([]interimLoc, totLocs) - for pid, numLocs := range numLocsPerPostingsList { + if cap(s.locsBacking) >= totLocs { + s.locsBacking = s.locsBacking[:totLocs] + } else { + s.locsBacking = make([]interimLoc, totLocs) + } + + locsBacking := s.locsBacking + for pid, numLocs := range s.numLocsPerPostingsList { s.Locs[pid] = locsBacking[0:0] locsBacking = locsBacking[numLocs:] } From debbcd7d47715c2dc78d5007fca7a5e8e63184f6 Mon Sep 17 00:00:00 2001 From: Sreekanth Sivasankaran Date: Tue, 13 Mar 2018 17:29:05 +0530 Subject: [PATCH 16/20] adding maxsegment size limit checks --- index/scorch/merge.go | 5 ++ index/scorch/mergeplan/merge_plan.go | 17 +++++++ index/scorch/mergeplan/merge_plan_test.go | 58 +++++++++++++++++++++++ 3 files changed, 80 insertions(+) diff --git a/index/scorch/merge.go b/index/scorch/merge.go index ec2c8d4b..41086ad3 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -111,6 +111,11 @@ func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions, if err != nil { return &mergePlannerOptions, err } + + err = mergeplan.ValidateMergePlannerOptions(&mergePlannerOptions) + if err != nil { + return nil, err + } } return &mergePlannerOptions, nil } diff --git a/index/scorch/mergeplan/merge_plan.go b/index/scorch/mergeplan/merge_plan.go index 62f643f4..f0d6f162 100644 --- a/index/scorch/mergeplan/merge_plan.go +++ b/index/scorch/mergeplan/merge_plan.go @@ -18,6 +18,7 @@ package mergeplan import ( + "errors" "fmt" "math" "sort" @@ -115,6 +116,14 @@ func (o *MergePlanOptions) RaiseToFloorSegmentSize(s int64) int64 { return o.FloorSegmentSize } +// MaxSegmentSizeLimit represents the maximum size of a segment, +// this limit comes as the roaring lib supports uint32. +const MaxSegmentSizeLimit = 1<<32 - 1 + +// ErrMaxSegmentSizeTooLarge is returned when the size of the segment +// exceeds the MaxSegmentSizeLimit +var ErrMaxSegmentSizeTooLarge = errors.New("MaxSegmentSize exceeds the size limit") + // Suggested default options. var DefaultMergePlanOptions = MergePlanOptions{ MaxSegmentsPerTier: 10, @@ -367,3 +376,11 @@ func ToBarChart(prefix string, barMax int, segments []Segment, plan *MergePlan) return strings.Join(rv, "\n") } + +// ValidateMergePlannerOptions validates the merge planner options +func ValidateMergePlannerOptions(options *MergePlanOptions) error { + if options.MaxSegmentSize > MaxSegmentSizeLimit { + return ErrMaxSegmentSizeTooLarge + } + return nil +} diff --git a/index/scorch/mergeplan/merge_plan_test.go b/index/scorch/mergeplan/merge_plan_test.go index 419ab825..3adc1f4b 100644 --- a/index/scorch/mergeplan/merge_plan_test.go +++ b/index/scorch/mergeplan/merge_plan_test.go @@ -17,10 +17,12 @@ package mergeplan import ( "encoding/json" "fmt" + "math/rand" "os" "reflect" "sort" "testing" + "time" ) // Implements the Segment interface for testing, @@ -401,6 +403,62 @@ func TestManySameSizedSegmentsWithDeletesBetweenMerges(t *testing.T) { } } +func TestValidateMergePlannerOptions(t *testing.T) { + o := &MergePlanOptions{ + MaxSegmentSize: 1 << 32, + MaxSegmentsPerTier: 3, + TierGrowth: 3.0, + SegmentsPerMergeTask: 3, + } + err := ValidateMergePlannerOptions(o) + if err != ErrMaxSegmentSizeTooLarge { + t.Error("Validation expected to fail as the MaxSegmentSize exceeds limit") + } +} + +func TestPlanMaxSegmentSizeLimit(t *testing.T) { + o := &MergePlanOptions{ + MaxSegmentSize: 20, + MaxSegmentsPerTier: 5, + TierGrowth: 3.0, + SegmentsPerMergeTask: 5, + FloorSegmentSize: 5, + } + segments := makeLinearSegments(20) + + s := rand.NewSource(time.Now().UnixNano()) + r := rand.New(s) + + max := 20 + min := 5 + randomInRange := func() int64 { + return int64(r.Intn(max-min) + min) + } + for i := 1; i < 20; i++ { + o.MaxSegmentSize = randomInRange() + plans, err := Plan(segments, o) + if err != nil { + t.Errorf("Plan failed, err: %v", err) + } + if len(plans.Tasks) == 0 { + t.Errorf("expected some plans with tasks") + } + + for _, task := range plans.Tasks { + var totalLiveSize int64 + for _, segs := range task.Segments { + totalLiveSize += segs.LiveSize() + + } + if totalLiveSize >= o.MaxSegmentSize { + t.Errorf("merged segments size: %d exceeding the MaxSegmentSize"+ + "limit: %d", totalLiveSize, o.MaxSegmentSize) + } + } + } + +} + // ---------------------------------------- type testCyclesSpec struct { From 715144d6323850d01ed970f8794692c1b99d5b9d Mon Sep 17 00:00:00 2001 From: abhinavdangeti Date: Tue, 13 Mar 2018 13:34:48 -0700 Subject: [PATCH 17/20] MB-27385: De-duplicate the list of requested fields De-duplicate the list of fields provided by the client as part of the search request, so as to not inadvertantly load the same stored field more than once. --- index_impl.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/index_impl.go b/index_impl.go index 68777f07..4d03b78a 100644 --- a/index_impl.go +++ b/index_impl.go @@ -534,7 +534,8 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr doc, err := indexReader.Document(hit.ID) if err == nil && doc != nil { if len(req.Fields) > 0 { - for _, f := range req.Fields { + fieldsToLoad := deDuplicate(req.Fields) + for _, f := range fieldsToLoad { for _, docF := range doc.Fields { if f == "*" || docF.Name() == f { var value interface{} @@ -830,3 +831,16 @@ func (f *indexImplFieldDict) Close() error { } return f.indexReader.Close() } + +// helper function to remove duplicate entries from slice of strings +func deDuplicate(fields []string) []string { + entries := make(map[string]struct{}) + ret := []string{} + for _, entry := range fields { + if _, exists := entries[entry]; !exists { + entries[entry] = struct{}{} + ret = append(ret, entry) + } + } + return ret +} From 7578ff7cb8df1279853e29684f5c43d7a5c9af70 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Tue, 13 Mar 2018 11:10:31 -0700 Subject: [PATCH 18/20] scorch zap optimize interim's reuse of vellum builders Since interim structs are now sync.Pool'ed, we can now also hold onto and reuse the associated vellum builder. --- index/scorch/segment/zap/new.go | 52 ++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/index/scorch/segment/zap/new.go b/index/scorch/segment/zap/new.go index 51971ba5..4c9ec9c1 100644 --- a/index/scorch/segment/zap/new.go +++ b/index/scorch/segment/zap/new.go @@ -52,7 +52,9 @@ func AnalysisResultsToSegmentBase(results []*index.AnalysisResult, s.FieldsMap, s.FieldsInv, uint64(len(results)), storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets) - interimPool.Put(s.cleanse()) + if err == nil && s.reset() == nil { + interimPool.Put(s) + } return sb, err } @@ -105,12 +107,16 @@ type interim struct { numTermsPerPostingsList []int // key is postings list id numLocsPerPostingsList []int // key is postings list id - buf0 bytes.Buffer + builder *vellum.Builder + builderBuf bytes.Buffer + + metaBuf bytes.Buffer + tmp0 []byte tmp1 []byte } -func (s *interim) cleanse() *interim { +func (s *interim) reset() (err error) { s.results = nil s.chunkFactor = 0 s.w = nil @@ -148,11 +154,15 @@ func (s *interim) cleanse() *interim { s.locsBacking = s.locsBacking[:0] s.numTermsPerPostingsList = s.numTermsPerPostingsList[:0] s.numLocsPerPostingsList = s.numLocsPerPostingsList[:0] - s.buf0.Reset() + s.builderBuf.Reset() + if s.builder != nil { + err = s.builder.Reset(&s.builderBuf) + } + s.metaBuf.Reset() s.tmp0 = s.tmp0[:0] s.tmp1 = s.tmp1[:0] - return s + return err } func (s *interim) grabBuf(size int) []byte { @@ -475,8 +485,7 @@ func (s *interim) processDocument(docNum uint64, func (s *interim) writeStoredFields() ( storedIndexOffset uint64, err error) { - metaBuf := &s.buf0 - metaEncoder := govarint.NewU64Base128Encoder(metaBuf) + metaEncoder := govarint.NewU64Base128Encoder(&s.metaBuf) data, compressed := s.tmp0[:0], s.tmp1[:0] defer func() { s.tmp0, s.tmp1 = data, compressed }() @@ -512,7 +521,7 @@ func (s *interim) writeStoredFields() ( var curr int - metaBuf.Reset() + s.metaBuf.Reset() data = data[:0] compressed = compressed[:0] @@ -529,7 +538,7 @@ func (s *interim) writeStoredFields() ( } metaEncoder.Close() - metaBytes := metaBuf.Bytes() + metaBytes := s.metaBuf.Bytes() compressed = snappy.Encode(compressed, data) @@ -565,8 +574,8 @@ func (s *interim) writeStoredFields() ( return storedIndexOffset, nil } -func (s *interim) writeDicts() (uint64, []uint64, error) { - dictOffsets := make([]uint64, len(s.FieldsInv)) +func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err error) { + dictOffsets = make([]uint64, len(s.FieldsInv)) fdvOffsets := make([]uint64, len(s.FieldsInv)) @@ -578,10 +587,11 @@ func (s *interim) writeDicts() (uint64, []uint64, error) { var docTermMap [][]byte - s.buf0.Reset() - builder, err := vellum.New(&s.buf0, nil) - if err != nil { - return 0, nil, err + if s.builder == nil { + s.builder, err = vellum.New(&s.builderBuf, nil) + if err != nil { + return 0, nil, err + } } for fieldID, terms := range s.DictKeys { @@ -658,7 +668,7 @@ func (s *interim) writeDicts() (uint64, []uint64, error) { } if postingsOffset > uint64(0) { - err = builder.Insert([]byte(term), postingsOffset) + err = s.builder.Insert([]byte(term), postingsOffset) if err != nil { return 0, nil, err } @@ -668,7 +678,7 @@ func (s *interim) writeDicts() (uint64, []uint64, error) { locEncoder.Reset() } - err = builder.Close() + err = s.builder.Close() if err != nil { return 0, nil, err } @@ -676,7 +686,7 @@ func (s *interim) writeDicts() (uint64, []uint64, error) { // record where this dictionary starts dictOffsets[fieldID] = uint64(s.w.Count()) - vellumData := s.buf0.Bytes() + vellumData := s.builderBuf.Bytes() // write out the length of the vellum data n := binary.PutUvarint(buf, uint64(len(vellumData))) @@ -692,9 +702,9 @@ func (s *interim) writeDicts() (uint64, []uint64, error) { } // reset vellum for reuse - s.buf0.Reset() + s.builderBuf.Reset() - err = builder.Reset(&s.buf0) + err = s.builder.Reset(&s.builderBuf) if err != nil { return 0, nil, err } @@ -727,7 +737,7 @@ func (s *interim) writeDicts() (uint64, []uint64, error) { } } - fdvIndexOffset := uint64(s.w.Count()) + fdvIndexOffset = uint64(s.w.Count()) for _, fdvOffset := range fdvOffsets { n := binary.PutUvarint(buf, fdvOffset) From 1775602958abb6358f2f490d9914865280d026fa Mon Sep 17 00:00:00 2001 From: Sreekanth Sivasankaran Date: Thu, 15 Mar 2018 14:40:00 +0530 Subject: [PATCH 19/20] posting iterator array positions clean up, max segment size limit adjustment for hit-1 optimisation --- index/scorch/mergeplan/merge_plan.go | 6 +++--- index/scorch/segment/zap/posting.go | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/index/scorch/mergeplan/merge_plan.go b/index/scorch/mergeplan/merge_plan.go index f0d6f162..b09e5381 100644 --- a/index/scorch/mergeplan/merge_plan.go +++ b/index/scorch/mergeplan/merge_plan.go @@ -117,14 +117,14 @@ func (o *MergePlanOptions) RaiseToFloorSegmentSize(s int64) int64 { } // MaxSegmentSizeLimit represents the maximum size of a segment, -// this limit comes as the roaring lib supports uint32. -const MaxSegmentSizeLimit = 1<<32 - 1 +// this limit comes with hit-1 optimisation/max encoding limit uint31. +const MaxSegmentSizeLimit = 1<<31 - 1 // ErrMaxSegmentSizeTooLarge is returned when the size of the segment // exceeds the MaxSegmentSizeLimit var ErrMaxSegmentSizeTooLarge = errors.New("MaxSegmentSize exceeds the size limit") -// Suggested default options. +// DefaultMergePlanOptions suggests the default options. var DefaultMergePlanOptions = MergePlanOptions{ MaxSegmentsPerTier: 10, MaxSegmentSize: 5000000, diff --git a/index/scorch/segment/zap/posting.go b/index/scorch/segment/zap/posting.go index bc533ad1..fbe703c1 100644 --- a/index/scorch/segment/zap/posting.go +++ b/index/scorch/segment/zap/posting.go @@ -444,6 +444,8 @@ func (i *PostingsIterator) readLocation(l *Location) error { l.end = end if numArrayPos > 0 { l.ap = make([]uint64, int(numArrayPos)) + } else { + l.ap = l.ap[:0] } } From d1155c223a431bdd0e564ff5f15547aa9fe8e640 Mon Sep 17 00:00:00 2001 From: Sreekanth Sivasankaran Date: Tue, 13 Mar 2018 12:13:48 +0530 Subject: [PATCH 20/20] zap version bump, changed the offset slice format ,UTs --- index/scorch/segment/zap/build.go | 2 +- index/scorch/segment/zap/contentcoder.go | 11 +- index/scorch/segment/zap/contentcoder_test.go | 2 +- index/scorch/segment/zap/docvalues.go | 8 +- index/scorch/segment/zap/intcoder.go | 68 ++++---- index/scorch/segment/zap/intcoder_test.go | 148 ++++++++---------- index/scorch/segment/zap/posting.go | 8 +- 7 files changed, 110 insertions(+), 137 deletions(-) diff --git a/index/scorch/segment/zap/build.go b/index/scorch/segment/zap/build.go index 30ae8d77..cd56e3b5 100644 --- a/index/scorch/segment/zap/build.go +++ b/index/scorch/segment/zap/build.go @@ -22,7 +22,7 @@ import ( "github.com/Smerity/govarint" ) -const version uint32 = 4 +const version uint32 = 5 const fieldNotUninverted = math.MaxUint64 diff --git a/index/scorch/segment/zap/contentcoder.go b/index/scorch/segment/zap/contentcoder.go index c731f52c..5ba15d69 100644 --- a/index/scorch/segment/zap/contentcoder.go +++ b/index/scorch/segment/zap/contentcoder.go @@ -157,13 +157,10 @@ func (c *chunkedContentCoder) Write(w io.Writer) (int, error) { return tw, err } - if len(c.chunkLens) > 1 { - chunkLengthsToOffsets(c.chunkLens) - } - - // write out the chunk starting offsets - for _, chunkLen := range c.chunkLens { - n := binary.PutUvarint(buf, uint64(chunkLen)) + chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens) + // write out the chunk offsets + for _, chunkOffset := range chunkOffsets { + n := binary.PutUvarint(buf, chunkOffset) nw, err = w.Write(buf[:n]) tw += nw if err != nil { diff --git a/index/scorch/segment/zap/contentcoder_test.go b/index/scorch/segment/zap/contentcoder_test.go index 0e45b783..da80f947 100644 --- a/index/scorch/segment/zap/contentcoder_test.go +++ b/index/scorch/segment/zap/contentcoder_test.go @@ -46,7 +46,7 @@ func TestChunkContentCoder(t *testing.T) { []byte("scorch"), }, - expected: string([]byte{0x02, 0x0c, 0x0c, 0x01, 0x00, 0x00, 0x06, 0x06, 0x14, + expected: string([]byte{0x02, 0x0c, 0x18, 0x01, 0x00, 0x00, 0x06, 0x06, 0x14, 0x75, 0x70, 0x73, 0x69, 0x64, 0x65, 0x01, 0x01, 0x00, 0x06, 0x06, 0x14, 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68}), }, diff --git a/index/scorch/segment/zap/docvalues.go b/index/scorch/segment/zap/docvalues.go index 882ff43d..61b83877 100644 --- a/index/scorch/segment/zap/docvalues.go +++ b/index/scorch/segment/zap/docvalues.go @@ -69,7 +69,7 @@ func (s *SegmentBase) loadFieldDocValueIterator(field string, } // read the number of chunks, chunk lengths - var offset, clen uint64 + var offset, loc uint64 numChunks, read := binary.Uvarint(s.mem[fieldDvLoc : fieldDvLoc+binary.MaxVarintLen64]) if read <= 0 { return nil, fmt.Errorf("failed to read the field "+ @@ -83,11 +83,11 @@ func (s *SegmentBase) loadFieldDocValueIterator(field string, chunkOffsets: make([]uint64, int(numChunks)), } for i := 0; i < int(numChunks); i++ { - clen, read = binary.Uvarint(s.mem[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64]) + loc, read = binary.Uvarint(s.mem[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64]) if read <= 0 { - return nil, fmt.Errorf("corrupted chunk length during segment load") + return nil, fmt.Errorf("corrupted chunk offset during segment load") } - fdvIter.chunkOffsets[i] = clen + fdvIter.chunkOffsets[i] = loc offset += uint64(read) } diff --git a/index/scorch/segment/zap/intcoder.go b/index/scorch/segment/zap/intcoder.go index 79fe5156..81ef8bb2 100644 --- a/index/scorch/segment/zap/intcoder.go +++ b/index/scorch/segment/zap/intcoder.go @@ -111,15 +111,13 @@ func (c *chunkedIntCoder) Write(w io.Writer) (int, error) { } buf := c.buf - // convert the chunk lengths into starting chunk offsets - if len(c.chunkLens) > 1 { - chunkLengthsToOffsets(c.chunkLens) - } + // convert the chunk lengths into chunk offsets + chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens) - // write out the number of chunks & each chunk starting offsets - n := binary.PutUvarint(buf, uint64(len(c.chunkLens))) - for _, chunkLen := range c.chunkLens { - n += binary.PutUvarint(buf[n:], uint64(chunkLen)) + // write out the number of chunks & each chunk offsets + n := binary.PutUvarint(buf, uint64(len(chunkOffsets))) + for _, chunkOffset := range chunkOffsets { + n += binary.PutUvarint(buf[n:], chunkOffset) } tw, err := w.Write(buf[:n]) @@ -140,41 +138,35 @@ func (c *chunkedIntCoder) FinalSize() int { return len(c.final) } -// chunkLengthsToOffsets converts the chunk length array -// to a chunk starting offset array. The readChunkBoundary +// modifyLengthsToEndOffsets converts the chunk length array +// to a chunk offset array. The readChunkBoundary // will figure out the start and end of every chunk from -// these offsets. The starting offset of the first/single -// array element will always be zero and this position is -// used for storing the size of the current last item in -// the array at any given point. -// For eg: -// Lens -> 5 5 5 5 => 5 5 10 15 -// Lens -> 0 5 0 5 => 5 0 5 5 -// Lens -> 0 0 0 5 => 5 0 0 0 -// Lens -> 5 0 0 0 => 0 5 5 5 -// Lens -> 0 5 0 0 => 0 0 5 5 -// Lens -> 0 0 5 0 => 0 0 0 5 -func chunkLengthsToOffsets(lengths []uint64) { - lengths[1], lengths[0] = lengths[0], lengths[1] - for i := 2; i < len(lengths); i++ { - cur := lengths[i] - lengths[i] = lengths[i-1] + lengths[0] - lengths[0] = cur +// these offsets. Starting offset of i'th index is stored +// in i-1'th position except for 0'th index and ending offset +// is stored at i'th index position. +// For 0'th element, starting position is always zero. +// eg: +// Lens -> 5 5 5 5 => 5 10 15 20 +// Lens -> 0 5 0 5 => 0 5 5 10 +// Lens -> 0 0 0 5 => 0 0 0 5 +// Lens -> 5 0 0 0 => 5 5 5 5 +// Lens -> 0 5 0 0 => 0 5 5 5 +// Lens -> 0 0 5 0 => 0 0 5 5 +func modifyLengthsToEndOffsets(lengths []uint64) []uint64 { + var runningOffset uint64 + var index, i int + for i = 1; i <= len(lengths); i++ { + runningOffset += lengths[i-1] + lengths[index] = runningOffset + index++ } + return lengths } func readChunkBoundary(chunk int, offsets []uint64) (uint64, uint64) { - var start, end uint64 + var start uint64 if chunk > 0 { - start = offsets[chunk] + start = offsets[chunk-1] } - // single element case - if chunk == 0 && len(offsets) == 1 { - end = offsets[chunk] - } else if chunk < len(offsets)-1 { - end = offsets[chunk+1] - } else { // for last element - end = start + offsets[0] - } - return start, end + return start, offsets[chunk] } diff --git a/index/scorch/segment/zap/intcoder_test.go b/index/scorch/segment/zap/intcoder_test.go index 8c77eab6..952e0669 100644 --- a/index/scorch/segment/zap/intcoder_test.go +++ b/index/scorch/segment/zap/intcoder_test.go @@ -46,8 +46,8 @@ func TestChunkIntCoder(t *testing.T) { []uint64{3}, []uint64{7}, }, - // 2 chunks, chunk-0 length 1, chunk-1 length 1, value 3, value 7 - expected: []byte{0x2, 0x1, 0x1, 0x3, 0x7}, + // 2 chunks, chunk-0 offset 1, chunk-1 offset 2, value 3, value 7 + expected: []byte{0x2, 0x1, 0x2, 0x3, 0x7}, }, } @@ -80,40 +80,48 @@ func TestChunkLengthToOffsets(t *testing.T) { }{ { lengths: []uint64{5, 5, 5, 5, 5}, - expectedOffsets: []uint64{5, 5, 10, 15, 20}, + expectedOffsets: []uint64{5, 10, 15, 20, 25}, }, { lengths: []uint64{0, 5, 0, 5, 0}, - expectedOffsets: []uint64{0, 0, 5, 5, 10}, + expectedOffsets: []uint64{0, 5, 5, 10, 10}, }, { lengths: []uint64{0, 0, 0, 0, 5}, - expectedOffsets: []uint64{5, 0, 0, 0, 0}, - }, - { - lengths: []uint64{5, 0, 0, 0, 0}, - expectedOffsets: []uint64{0, 5, 5, 5, 5}, - }, - { - lengths: []uint64{0, 5, 0, 0, 0}, - expectedOffsets: []uint64{0, 0, 5, 5, 5}, - }, - { - lengths: []uint64{0, 0, 0, 5, 0}, expectedOffsets: []uint64{0, 0, 0, 0, 5}, }, + { + lengths: []uint64{5, 0, 0, 0, 0}, + expectedOffsets: []uint64{5, 5, 5, 5, 5}, + }, + { + lengths: []uint64{0, 5, 0, 0, 0}, + expectedOffsets: []uint64{0, 5, 5, 5, 5}, + }, + { + lengths: []uint64{0, 0, 0, 5, 0}, + expectedOffsets: []uint64{0, 0, 0, 5, 5}, + }, { lengths: []uint64{0, 0, 0, 5, 5}, - expectedOffsets: []uint64{5, 0, 0, 0, 5}, + expectedOffsets: []uint64{0, 0, 0, 5, 10}, }, { lengths: []uint64{5, 5, 5, 0, 0}, - expectedOffsets: []uint64{0, 5, 10, 15, 15}, + expectedOffsets: []uint64{5, 10, 15, 15, 15}, + }, + { + lengths: []uint64{5}, + expectedOffsets: []uint64{5}, + }, + { + lengths: []uint64{5, 5}, + expectedOffsets: []uint64{5, 10}, }, } for i, test := range tests { - chunkLengthsToOffsets(test.lengths) + modifyLengthsToEndOffsets(test.lengths) if !reflect.DeepEqual(test.expectedOffsets, test.lengths) { t.Errorf("Test: %d failed, got %+v, expected %+v", i, test.lengths, test.expectedOffsets) } @@ -129,86 +137,80 @@ func TestChunkReadBoundaryFromOffsets(t *testing.T) { expectedEnd uint64 }{ { - offsets: []uint64{5, 5, 10, 15, 20}, + offsets: []uint64{5, 10, 15, 20, 25}, chunkNumber: 4, expectedStart: 20, expectedEnd: 25, }, { - offsets: []uint64{5, 5, 10, 15, 20}, + offsets: []uint64{5, 10, 15, 20, 25}, chunkNumber: 0, expectedStart: 0, expectedEnd: 5, }, { - offsets: []uint64{5, 5, 10, 15, 20}, + offsets: []uint64{5, 10, 15, 20, 25}, chunkNumber: 2, expectedStart: 10, expectedEnd: 15, }, { - offsets: []uint64{0, 0, 5, 5, 10}, + offsets: []uint64{0, 5, 5, 10, 10}, chunkNumber: 4, expectedStart: 10, expectedEnd: 10, }, { - offsets: []uint64{0, 0, 5, 5, 10}, + offsets: []uint64{0, 5, 5, 10, 10}, chunkNumber: 1, expectedStart: 0, expectedEnd: 5, }, { - offsets: []uint64{5, 0, 0, 0, 0}, + offsets: []uint64{5, 5, 5, 5, 5}, chunkNumber: 0, expectedStart: 0, - expectedEnd: 0, - }, - { - offsets: []uint64{5, 0, 0, 0, 0}, - chunkNumber: 4, - expectedStart: 0, expectedEnd: 5, }, { - offsets: []uint64{5, 0, 0, 0, 0}, - chunkNumber: 1, - expectedStart: 0, - expectedEnd: 0, + offsets: []uint64{5, 5, 5, 5, 5}, + chunkNumber: 4, + expectedStart: 5, + expectedEnd: 5, }, { - offsets: []uint64{0, 5, 5, 5, 5}, + offsets: []uint64{5, 5, 5, 5, 5}, chunkNumber: 1, expectedStart: 5, expectedEnd: 5, }, { offsets: []uint64{0, 5, 5, 5, 5}, - chunkNumber: 0, - expectedStart: 0, - expectedEnd: 5, - }, - { - offsets: []uint64{0, 0, 5, 5, 5}, - chunkNumber: 2, - expectedStart: 5, - expectedEnd: 5, - }, - { - offsets: []uint64{0, 0, 5, 5, 5}, chunkNumber: 1, expectedStart: 0, expectedEnd: 5, }, { - offsets: []uint64{0, 0, 0, 0, 5}, - chunkNumber: 4, - expectedStart: 5, - expectedEnd: 5, + offsets: []uint64{0, 5, 5, 5, 5}, + chunkNumber: 0, + expectedStart: 0, + expectedEnd: 0, + }, + { + offsets: []uint64{0, 0, 0, 5, 5}, + chunkNumber: 2, + expectedStart: 0, + expectedEnd: 0, + }, + { + offsets: []uint64{0, 0, 0, 5, 5}, + chunkNumber: 1, + expectedStart: 0, + expectedEnd: 0, }, { offsets: []uint64{0, 0, 0, 0, 5}, - chunkNumber: 3, + chunkNumber: 4, expectedStart: 0, expectedEnd: 5, }, @@ -219,59 +221,41 @@ func TestChunkReadBoundaryFromOffsets(t *testing.T) { expectedEnd: 0, }, { - offsets: []uint64{5, 0, 0, 0, 5}, - chunkNumber: 0, - expectedStart: 0, - expectedEnd: 0, - }, - { - offsets: []uint64{5, 0, 0, 0, 5}, - chunkNumber: 1, - expectedStart: 0, - expectedEnd: 0, - }, - { - offsets: []uint64{5, 0, 0, 0, 5}, - chunkNumber: 3, - expectedStart: 0, - expectedEnd: 5, - }, - { - offsets: []uint64{5, 0, 0, 0, 5}, - chunkNumber: 4, - expectedStart: 5, - expectedEnd: 10, - }, - { - offsets: []uint64{0, 5, 10, 15, 15}, + offsets: []uint64{5, 10, 15, 15, 15}, chunkNumber: 0, expectedStart: 0, expectedEnd: 5, }, { - offsets: []uint64{0, 5, 10, 15, 15}, + offsets: []uint64{5, 10, 15, 15, 15}, chunkNumber: 1, expectedStart: 5, expectedEnd: 10, }, { - offsets: []uint64{0, 5, 10, 15, 15}, + offsets: []uint64{5, 10, 15, 15, 15}, chunkNumber: 2, expectedStart: 10, expectedEnd: 15, }, { - offsets: []uint64{0, 5, 10, 15, 15}, + offsets: []uint64{5, 10, 15, 15, 15}, chunkNumber: 3, expectedStart: 15, expectedEnd: 15, }, { - offsets: []uint64{0, 5, 10, 15, 15}, + offsets: []uint64{5, 10, 15, 15, 15}, chunkNumber: 4, expectedStart: 15, expectedEnd: 15, }, + { + offsets: []uint64{5}, + chunkNumber: 0, + expectedStart: 0, + expectedEnd: 5, + }, } for i, test := range tests { diff --git a/index/scorch/segment/zap/posting.go b/index/scorch/segment/zap/posting.go index c3fc2330..bdbb47e3 100644 --- a/index/scorch/segment/zap/posting.go +++ b/index/scorch/segment/zap/posting.go @@ -189,9 +189,9 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator { var numFreqChunks uint64 numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) n += uint64(read) - rv.freqChunkLens = make([]uint64, int(numFreqChunks)) + rv.freqChunkOffsets = make([]uint64, int(numFreqChunks)) for i := 0; i < int(numFreqChunks); i++ { - rv.freqChunkLens[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) + rv.freqChunkOffsets[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) n += uint64(read) } rv.freqChunkStart = p.freqOffset + n @@ -201,9 +201,9 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator { var numLocChunks uint64 numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) n += uint64(read) - rv.locChunkLens = make([]uint64, int(numLocChunks)) + rv.locChunkOffsets = make([]uint64, int(numLocChunks)) for i := 0; i < int(numLocChunks); i++ { - rv.locChunkLens[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) + rv.locChunkOffsets[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) n += uint64(read) } rv.locChunkStart = p.locOffset + n