diff --git a/index/scorch/segment/mem/build.go b/index/scorch/segment/mem/build.go index d3344ce3..57d60dc8 100644 --- a/index/scorch/segment/mem/build.go +++ b/index/scorch/segment/mem/build.go @@ -95,6 +95,21 @@ func (s *Segment) initializeDict(results []*index.AnalysisResult) { var numTokenFrequencies int var totLocs int + // initial scan for all fieldID's to sort them + for _, result := range results { + for _, field := range result.Document.CompositeFields { + s.getOrDefineField(field.Name()) + } + for _, field := range result.Document.Fields { + s.getOrDefineField(field.Name()) + } + } + sort.Strings(s.FieldsInv[1:]) // keep _id as first field + s.FieldsMap = make(map[string]uint16, len(s.FieldsInv)) + for fieldID, fieldName := range s.FieldsInv { + s.FieldsMap[fieldName] = uint16(fieldID + 1) + } + processField := func(fieldID uint16, tfs analysis.TokenFrequencies) { for term, tf := range tfs { pidPlus1, exists := s.Dicts[fieldID][term] diff --git a/index/scorch/segment/zap/build.go b/index/scorch/segment/zap/build.go index 60d168e6..b3bbbab5 100644 --- a/index/scorch/segment/zap/build.go +++ b/index/scorch/segment/zap/build.go @@ -28,7 +28,7 @@ import ( "github.com/golang/snappy" ) -const version uint32 = 2 +const version uint32 = 3 const fieldNotUninverted = math.MaxUint64 diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index db03c998..b1eed28b 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -21,6 +21,7 @@ import ( "fmt" "math" "os" + "sort" "github.com/RoaringBitmap/roaring" "github.com/Smerity/govarint" @@ -28,6 +29,8 @@ import ( "github.com/golang/snappy" ) +const docDropped = math.MaxUint64 // sentinel docNum to represent a deleted doc + // Merge takes a slice of zap segments and bit masks describing which // documents may be dropped, and creates a new segment containing the // remaining data. This new segment is built at the specified path, @@ -101,13 +104,13 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, var dictLocs []uint64 - fieldsInv := mergeFields(segments) + fieldsSame, fieldsInv := mergeFields(segments) fieldsMap := mapFields(fieldsInv) numDocs = computeNewDocCount(segments, drops) if numDocs > 0 { storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, - fieldsMap, fieldsInv, numDocs, cr) + fieldsMap, fieldsInv, fieldsSame, numDocs, cr) if err != nil { return nil, 0, 0, 0, 0, err } @@ -411,10 +414,8 @@ func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap, return rv, fieldDvLocsOffset, nil } -const docDropped = math.MaxUint64 - func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, - fieldsMap map[string]uint16, fieldsInv []string, newSegDocCount uint64, + fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64, w *CountHashWriter) (uint64, [][]uint64, error) { var rv [][]uint64 // The remapped or newDocNums for each segment. @@ -436,6 +437,24 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, for segI, segment := range segments { segNewDocNums := make([]uint64, segment.numDocs) + // optimize when the field mapping is the same across all + // segments and there are no deletions, via byte-copying + // of stored docs bytes directly to the writer + if fieldsSame && (drops[segI] == nil || drops[segI].GetCardinality() == 0) { + err := segment.copyStoredDocs(newDocNum, docNumOffsets, w) + if err != nil { + return 0, nil, err + } + + for i := uint64(0); i < segment.numDocs; i++ { + segNewDocNums[i] = newDocNum + newDocNum++ + } + rv = append(rv, segNewDocNums) + + continue + } + // for each doc num for docNum := uint64(0); docNum < segment.numDocs; docNum++ { // TODO: roaring's API limits docNums to 32-bits? @@ -527,13 +546,61 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, return storedIndexOffset, rv, nil } -// mergeFields builds a unified list of fields used across all the input segments -func mergeFields(segments []*SegmentBase) []string { +// copyStoredDocs writes out a segment's stored doc info, optimized by +// using a single Write() call for the entire set of bytes. The +// newDocNumOffsets is filled with the new offsets for each doc. +func (s *SegmentBase) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, + w *CountHashWriter) error { + if s.numDocs <= 0 { + return nil + } + + indexOffset0, storedOffset0, _, _, _ := + s.getDocStoredOffsets(0) // the segment's first doc + + indexOffsetN, storedOffsetN, readN, metaLenN, dataLenN := + s.getDocStoredOffsets(s.numDocs - 1) // the segment's last doc + + storedOffset0New := uint64(w.Count()) + + storedBytes := s.mem[storedOffset0 : storedOffsetN+readN+metaLenN+dataLenN] + _, err := w.Write(storedBytes) + if err != nil { + return err + } + + // remap the storedOffset's for the docs into new offsets relative + // to storedOffset0New, filling the given docNumOffsetsOut array + for indexOffset := indexOffset0; indexOffset <= indexOffsetN; indexOffset += 8 { + storedOffset := binary.BigEndian.Uint64(s.mem[indexOffset : indexOffset+8]) + storedOffsetNew := storedOffset - storedOffset0 + storedOffset0New + newDocNumOffsets[newDocNum] = storedOffsetNew + newDocNum += 1 + } + + return nil +} + +// mergeFields builds a unified list of fields used across all the +// input segments, and computes whether the fields are the same across +// segments (which depends on fields to be sorted in the same way +// across segments) +func mergeFields(segments []*SegmentBase) (bool, []string) { + fieldsSame := true + + var segment0Fields []string + if len(segments) > 0 { + segment0Fields = segments[0].Fields() + } + fieldsMap := map[string]struct{}{} for _, segment := range segments { fields := segment.Fields() - for _, field := range fields { + for fieldi, field := range fields { fieldsMap[field] = struct{}{} + if len(segment0Fields) != len(fields) || segment0Fields[fieldi] != field { + fieldsSame = false + } } } @@ -545,5 +612,8 @@ func mergeFields(segments []*SegmentBase) []string { rv = append(rv, k) } } - return rv + + sort.Strings(rv[1:]) // leave _id as first + + return fieldsSame, rv } diff --git a/index/scorch/segment/zap/merge_test.go b/index/scorch/segment/zap/merge_test.go index 323fffed..bb09f831 100644 --- a/index/scorch/segment/zap/merge_test.go +++ b/index/scorch/segment/zap/merge_test.go @@ -398,6 +398,40 @@ func compareSegments(a, b *Segment) string { fieldName, next.Term, aloc, bloc)) } } + + if fieldName == "_id" { + docId := next.Term + docNumA := apitrn.Number() + docNumB := bpitrn.Number() + afields := map[string]interface{}{} + err = a.VisitDocument(apitrn.Number(), + func(field string, typ byte, value []byte, pos []uint64) bool { + afields[field+"-typ"] = typ + afields[field+"-value"] = value + afields[field+"-pos"] = pos + return true + }) + if err != nil { + rv = append(rv, fmt.Sprintf("a.VisitDocument err: %v", err)) + } + bfields := map[string]interface{}{} + err = b.VisitDocument(bpitrn.Number(), + func(field string, typ byte, value []byte, pos []uint64) bool { + bfields[field+"-typ"] = typ + bfields[field+"-value"] = value + bfields[field+"-pos"] = pos + return true + }) + if err != nil { + rv = append(rv, fmt.Sprintf("b.VisitDocument err: %v", err)) + } + if !reflect.DeepEqual(afields, bfields) { + rv = append(rv, fmt.Sprintf("afields != bfields,"+ + " id: %s, docNumA: %d, docNumB: %d,"+ + " afields: %#v, bfields: %#v", + docId, docNumA, docNumB, afields, bfields)) + } + } } } } diff --git a/index/scorch/segment/zap/read.go b/index/scorch/segment/zap/read.go index 0c5b9e17..e47d4c6a 100644 --- a/index/scorch/segment/zap/read.go +++ b/index/scorch/segment/zap/read.go @@ -17,15 +17,27 @@ package zap import "encoding/binary" func (s *SegmentBase) getDocStoredMetaAndCompressed(docNum uint64) ([]byte, []byte) { - docStoredStartAddr := s.storedIndexOffset + (8 * docNum) - docStoredStart := binary.BigEndian.Uint64(s.mem[docStoredStartAddr : docStoredStartAddr+8]) - var n uint64 - metaLen, read := binary.Uvarint(s.mem[docStoredStart : docStoredStart+binary.MaxVarintLen64]) - n += uint64(read) - var dataLen uint64 - dataLen, read = binary.Uvarint(s.mem[docStoredStart+n : docStoredStart+n+binary.MaxVarintLen64]) - n += uint64(read) - meta := s.mem[docStoredStart+n : docStoredStart+n+metaLen] - data := s.mem[docStoredStart+n+metaLen : docStoredStart+n+metaLen+dataLen] + _, storedOffset, n, metaLen, dataLen := s.getDocStoredOffsets(docNum) + + meta := s.mem[storedOffset+n : storedOffset+n+metaLen] + data := s.mem[storedOffset+n+metaLen : storedOffset+n+metaLen+dataLen] + return meta, data } + +func (s *SegmentBase) getDocStoredOffsets(docNum uint64) ( + uint64, uint64, uint64, uint64, uint64) { + indexOffset := s.storedIndexOffset + (8 * docNum) + + storedOffset := binary.BigEndian.Uint64(s.mem[indexOffset : indexOffset+8]) + + var n uint64 + + metaLen, read := binary.Uvarint(s.mem[storedOffset : storedOffset+binary.MaxVarintLen64]) + n += uint64(read) + + dataLen, read := binary.Uvarint(s.mem[storedOffset+n : storedOffset+n+binary.MaxVarintLen64]) + n += uint64(read) + + return indexOffset, storedOffset, n, metaLen, dataLen +} diff --git a/index/scorch/segment/zap/segment_test.go b/index/scorch/segment/zap/segment_test.go index 704f9e72..9ce354ce 100644 --- a/index/scorch/segment/zap/segment_test.go +++ b/index/scorch/segment/zap/segment_test.go @@ -18,6 +18,7 @@ import ( "math" "os" "reflect" + "sort" "testing" "github.com/blevesearch/bleve/index" @@ -574,6 +575,7 @@ func TestSegmentVisitableDocValueFieldsList(t *testing.T) { t.Fatalf("segment VisitableDocValueFields err: %v", err) } + sort.Strings(expectedFields[1:]) // keep _id as first field if !reflect.DeepEqual(fields, expectedFields) { t.Errorf("expected field terms: %#v, got: %#v", expectedFields, fields) }