From ffdeb8055efd2e69167caf4aadd19e2cdd6e4e27 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Mon, 5 Feb 2018 15:42:47 -0800 Subject: [PATCH 1/4] scorch sorts fields by name to assign fieldID's This is a stepping stone to allow easier future comparisons of field maps and potential merge optimizations. In bleve-blast tests on a 2015 macbook (50K wikipedia docs, 8 indexers, batch size 100, ssd), this does not seem to have a distinct effect on indexing throughput. --- index/scorch/segment/mem/build.go | 15 +++++++++++++++ index/scorch/segment/zap/merge.go | 4 ++++ index/scorch/segment/zap/segment_test.go | 2 ++ 3 files changed, 21 insertions(+) diff --git a/index/scorch/segment/mem/build.go b/index/scorch/segment/mem/build.go index d3344ce3..57d60dc8 100644 --- a/index/scorch/segment/mem/build.go +++ b/index/scorch/segment/mem/build.go @@ -95,6 +95,21 @@ func (s *Segment) initializeDict(results []*index.AnalysisResult) { var numTokenFrequencies int var totLocs int + // initial scan for all fieldID's to sort them + for _, result := range results { + for _, field := range result.Document.CompositeFields { + s.getOrDefineField(field.Name()) + } + for _, field := range result.Document.Fields { + s.getOrDefineField(field.Name()) + } + } + sort.Strings(s.FieldsInv[1:]) // keep _id as first field + s.FieldsMap = make(map[string]uint16, len(s.FieldsInv)) + for fieldID, fieldName := range s.FieldsInv { + s.FieldsMap[fieldName] = uint16(fieldID + 1) + } + processField := func(fieldID uint16, tfs analysis.TokenFrequencies) { for term, tf := range tfs { pidPlus1, exists := s.Dicts[fieldID][term] diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index db03c998..53b1ffe5 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -21,6 +21,7 @@ import ( "fmt" "math" "os" + "sort" "github.com/RoaringBitmap/roaring" "github.com/Smerity/govarint" @@ -545,5 +546,8 @@ func mergeFields(segments []*SegmentBase) []string { rv = append(rv, k) } } + + sort.Strings(rv[1:]) // leave _id as first + return rv } diff --git a/index/scorch/segment/zap/segment_test.go b/index/scorch/segment/zap/segment_test.go index 704f9e72..9ce354ce 100644 --- a/index/scorch/segment/zap/segment_test.go +++ b/index/scorch/segment/zap/segment_test.go @@ -18,6 +18,7 @@ import ( "math" "os" "reflect" + "sort" "testing" "github.com/blevesearch/bleve/index" @@ -574,6 +575,7 @@ func TestSegmentVisitableDocValueFieldsList(t *testing.T) { t.Fatalf("segment VisitableDocValueFields err: %v", err) } + sort.Strings(expectedFields[1:]) // keep _id as first field if !reflect.DeepEqual(fields, expectedFields) { t.Errorf("expected field terms: %#v, got: %#v", expectedFields, fields) } From 822457542efe74b6b6b228ef52db2f56b8f3aea2 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Mon, 5 Feb 2018 16:03:17 -0800 Subject: [PATCH 2/4] scorch zap VERSION bump: check whether fields are the same at merge COMPATIBILITY NOTE: scorch zap version bumped in this commit. The version bump is because mergeFields() now computes whether fields are the same across segments and it relies on the previous commit where fieldID's are assigned in field name sorted order (albeit with _id field always having fieldID of 0). Potential future commits might rely on this info that "fields are the same across segments" for more optimizations, etc. --- index/scorch/segment/zap/build.go | 2 +- index/scorch/segment/zap/merge.go | 27 ++++++++++++++++++++------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/index/scorch/segment/zap/build.go b/index/scorch/segment/zap/build.go index 60d168e6..b3bbbab5 100644 --- a/index/scorch/segment/zap/build.go +++ b/index/scorch/segment/zap/build.go @@ -28,7 +28,7 @@ import ( "github.com/golang/snappy" ) -const version uint32 = 2 +const version uint32 = 3 const fieldNotUninverted = math.MaxUint64 diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index 53b1ffe5..bef2a087 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -102,13 +102,13 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, var dictLocs []uint64 - fieldsInv := mergeFields(segments) + fieldsSame, fieldsInv := mergeFields(segments) fieldsMap := mapFields(fieldsInv) numDocs = computeNewDocCount(segments, drops) if numDocs > 0 { storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, - fieldsMap, fieldsInv, numDocs, cr) + fieldsMap, fieldsInv, fieldsSame, numDocs, cr) if err != nil { return nil, 0, 0, 0, 0, err } @@ -415,7 +415,7 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, const docDropped = math.MaxUint64 func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, - fieldsMap map[string]uint16, fieldsInv []string, newSegDocCount uint64, + fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64, w *CountHashWriter) (uint64, [][]uint64, error) { var rv [][]uint64 // The remapped or newDocNums for each segment. @@ -528,13 +528,26 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, return storedIndexOffset, rv, nil } -// mergeFields builds a unified list of fields used across all the input segments -func mergeFields(segments []*SegmentBase) []string { +// mergeFields builds a unified list of fields used across all the +// input segments, and computes whether the fields are the same across +// segments (which depends on fields to be sorted in the same way +// across segments) +func mergeFields(segments []*SegmentBase) (bool, []string) { + fieldsSame := true + + var segment0Fields []string + if len(segments) > 0 { + segment0Fields = segments[0].Fields() + } + fieldsMap := map[string]struct{}{} for _, segment := range segments { fields := segment.Fields() - for _, field := range fields { + for fieldi, field := range fields { fieldsMap[field] = struct{}{} + if len(segment0Fields) != len(fields) || segment0Fields[fieldi] != field { + fieldsSame = false + } } } @@ -549,5 +562,5 @@ func mergeFields(segments []*SegmentBase) []string { sort.Strings(rv[1:]) // leave _id as first - return rv + return fieldsSame, rv } From 0b50a20caccf5c4fae2ab295978af21af8d4c792 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Mon, 5 Feb 2018 17:53:47 -0800 Subject: [PATCH 3/4] scorch zap move docDropped const to earlier in file --- index/scorch/segment/zap/merge.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index bef2a087..dbb33110 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -29,6 +29,8 @@ import ( "github.com/golang/snappy" ) +const docDropped = math.MaxUint64 // sentinel docNum to represent a deleted doc + // Merge takes a slice of zap segments and bit masks describing which // documents may be dropped, and creates a new segment containing the // remaining data. This new segment is built at the specified path, @@ -412,8 +414,6 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, return rv, fieldDvLocsOffset, nil } -const docDropped = math.MaxUint64 - func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64, w *CountHashWriter) (uint64, [][]uint64, error) { From ed4826b189404618f33b57f6add214e86b0353e4 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Tue, 6 Feb 2018 09:12:48 -0800 Subject: [PATCH 4/4] scorch zap merge optimization to byte-copy storedDocs The optimization to byte-copy all the storedDocs for a given segment during merging kicks in when the fields are the same across all segments and when there are no deletions for that given segment. This can happen, for example, during data loading or insert-only scenarios. As part of this commit, the Segment.copyStoredDocs() method was added, which uses a single Write() call to copy all the stored docs bytes of a segment to a writer in one shot. And, getDocStoredMetaAndCompressed() was refactored into a related helper function, getDocStoredOffsets(), which provides the storedDocs metadata (offsets & lengths) for a doc. --- index/scorch/segment/zap/merge.go | 53 ++++++++++++++++++++++++++ index/scorch/segment/zap/merge_test.go | 34 +++++++++++++++++ index/scorch/segment/zap/read.go | 32 +++++++++++----- 3 files changed, 109 insertions(+), 10 deletions(-) diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index dbb33110..b1eed28b 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -437,6 +437,24 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, for segI, segment := range segments { segNewDocNums := make([]uint64, segment.numDocs) + // optimize when the field mapping is the same across all + // segments and there are no deletions, via byte-copying + // of stored docs bytes directly to the writer + if fieldsSame && (drops[segI] == nil || drops[segI].GetCardinality() == 0) { + err := segment.copyStoredDocs(newDocNum, docNumOffsets, w) + if err != nil { + return 0, nil, err + } + + for i := uint64(0); i < segment.numDocs; i++ { + segNewDocNums[i] = newDocNum + newDocNum++ + } + rv = append(rv, segNewDocNums) + + continue + } + // for each doc num for docNum := uint64(0); docNum < segment.numDocs; docNum++ { // TODO: roaring's API limits docNums to 32-bits? @@ -528,6 +546,41 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, return storedIndexOffset, rv, nil } +// copyStoredDocs writes out a segment's stored doc info, optimized by +// using a single Write() call for the entire set of bytes. The +// newDocNumOffsets is filled with the new offsets for each doc. +func (s *SegmentBase) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, + w *CountHashWriter) error { + if s.numDocs <= 0 { + return nil + } + + indexOffset0, storedOffset0, _, _, _ := + s.getDocStoredOffsets(0) // the segment's first doc + + indexOffsetN, storedOffsetN, readN, metaLenN, dataLenN := + s.getDocStoredOffsets(s.numDocs - 1) // the segment's last doc + + storedOffset0New := uint64(w.Count()) + + storedBytes := s.mem[storedOffset0 : storedOffsetN+readN+metaLenN+dataLenN] + _, err := w.Write(storedBytes) + if err != nil { + return err + } + + // remap the storedOffset's for the docs into new offsets relative + // to storedOffset0New, filling the given docNumOffsetsOut array + for indexOffset := indexOffset0; indexOffset <= indexOffsetN; indexOffset += 8 { + storedOffset := binary.BigEndian.Uint64(s.mem[indexOffset : indexOffset+8]) + storedOffsetNew := storedOffset - storedOffset0 + storedOffset0New + newDocNumOffsets[newDocNum] = storedOffsetNew + newDocNum += 1 + } + + return nil +} + // mergeFields builds a unified list of fields used across all the // input segments, and computes whether the fields are the same across // segments (which depends on fields to be sorted in the same way diff --git a/index/scorch/segment/zap/merge_test.go b/index/scorch/segment/zap/merge_test.go index 323fffed..bb09f831 100644 --- a/index/scorch/segment/zap/merge_test.go +++ b/index/scorch/segment/zap/merge_test.go @@ -398,6 +398,40 @@ func compareSegments(a, b *Segment) string { fieldName, next.Term, aloc, bloc)) } } + + if fieldName == "_id" { + docId := next.Term + docNumA := apitrn.Number() + docNumB := bpitrn.Number() + afields := map[string]interface{}{} + err = a.VisitDocument(apitrn.Number(), + func(field string, typ byte, value []byte, pos []uint64) bool { + afields[field+"-typ"] = typ + afields[field+"-value"] = value + afields[field+"-pos"] = pos + return true + }) + if err != nil { + rv = append(rv, fmt.Sprintf("a.VisitDocument err: %v", err)) + } + bfields := map[string]interface{}{} + err = b.VisitDocument(bpitrn.Number(), + func(field string, typ byte, value []byte, pos []uint64) bool { + bfields[field+"-typ"] = typ + bfields[field+"-value"] = value + bfields[field+"-pos"] = pos + return true + }) + if err != nil { + rv = append(rv, fmt.Sprintf("b.VisitDocument err: %v", err)) + } + if !reflect.DeepEqual(afields, bfields) { + rv = append(rv, fmt.Sprintf("afields != bfields,"+ + " id: %s, docNumA: %d, docNumB: %d,"+ + " afields: %#v, bfields: %#v", + docId, docNumA, docNumB, afields, bfields)) + } + } } } } diff --git a/index/scorch/segment/zap/read.go b/index/scorch/segment/zap/read.go index 0c5b9e17..e47d4c6a 100644 --- a/index/scorch/segment/zap/read.go +++ b/index/scorch/segment/zap/read.go @@ -17,15 +17,27 @@ package zap import "encoding/binary" func (s *SegmentBase) getDocStoredMetaAndCompressed(docNum uint64) ([]byte, []byte) { - docStoredStartAddr := s.storedIndexOffset + (8 * docNum) - docStoredStart := binary.BigEndian.Uint64(s.mem[docStoredStartAddr : docStoredStartAddr+8]) - var n uint64 - metaLen, read := binary.Uvarint(s.mem[docStoredStart : docStoredStart+binary.MaxVarintLen64]) - n += uint64(read) - var dataLen uint64 - dataLen, read = binary.Uvarint(s.mem[docStoredStart+n : docStoredStart+n+binary.MaxVarintLen64]) - n += uint64(read) - meta := s.mem[docStoredStart+n : docStoredStart+n+metaLen] - data := s.mem[docStoredStart+n+metaLen : docStoredStart+n+metaLen+dataLen] + _, storedOffset, n, metaLen, dataLen := s.getDocStoredOffsets(docNum) + + meta := s.mem[storedOffset+n : storedOffset+n+metaLen] + data := s.mem[storedOffset+n+metaLen : storedOffset+n+metaLen+dataLen] + return meta, data } + +func (s *SegmentBase) getDocStoredOffsets(docNum uint64) ( + uint64, uint64, uint64, uint64, uint64) { + indexOffset := s.storedIndexOffset + (8 * docNum) + + storedOffset := binary.BigEndian.Uint64(s.mem[indexOffset : indexOffset+8]) + + var n uint64 + + metaLen, read := binary.Uvarint(s.mem[storedOffset : storedOffset+binary.MaxVarintLen64]) + n += uint64(read) + + dataLen, read := binary.Uvarint(s.mem[storedOffset+n : storedOffset+n+binary.MaxVarintLen64]) + n += uint64(read) + + return indexOffset, storedOffset, n, metaLen, dataLen +}