diff --git a/index/scorch/merge.go b/index/scorch/merge.go index ee3ec46c..2b0e734c 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -183,16 +183,17 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot, fileMergeZapStartTime := time.Now() atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1) - newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024) + newDocNums, nBytes, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024) atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1) - + atomic.AddUint64(&s.stats.TotFileMergeWrittenBytes, nBytes) + fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime)) atomic.AddUint64(&s.stats.TotFileMergeZapTime, fileMergeZapTime) if atomic.LoadUint64(&s.stats.MaxFileMergeZapTime) < fileMergeZapTime { atomic.StoreUint64(&s.stats.MaxFileMergeZapTime, fileMergeZapTime) } - - if err != nil { + + if err != nil { s.unmarkIneligibleForRemoval(filename) atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1) return fmt.Errorf("merging failed: %v", err) diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 2a9eb634..6d0bcd1e 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -430,6 +430,7 @@ func (s *Scorch) StatsMap() map[string]interface{} { m["num_items_persisted"] = m["TotPersistedItems"] m["num_bytes_used_disk"] = m["CurOnDiskBytes"] m["num_files_on_disk"] = m["CurOnDiskFiles"] + m["total_compaction_written_bytes"] = m["TotFileMergeWrittenBytes"] return m } diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index 5066dfb9..88073d62 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -36,12 +36,12 @@ const docDropped = math.MaxUint64 // sentinel docNum to represent a deleted doc // remaining data. This new segment is built at the specified path, // with the provided chunkFactor. func Merge(segments []*Segment, drops []*roaring.Bitmap, path string, - chunkFactor uint32) ([][]uint64, error) { + chunkFactor uint32) ([][]uint64, uint64, error) { flag := os.O_RDWR | os.O_CREATE f, err := os.OpenFile(path, flag, 0600) if err != nil { - return nil, err + return nil, 0, err } cleanup := func() { @@ -64,35 +64,35 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string, MergeToWriter(segmentBases, drops, chunkFactor, cr) if err != nil { cleanup() - return nil, err + return nil, 0, err } err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, chunkFactor, cr.Sum32(), cr) if err != nil { cleanup() - return nil, err + return nil, 0, err } err = br.Flush() if err != nil { cleanup() - return nil, err + return nil, 0, err } err = f.Sync() if err != nil { cleanup() - return nil, err + return nil, 0, err } err = f.Close() if err != nil { cleanup() - return nil, err + return nil, 0, err } - return newDocNums, nil + return newDocNums, uint64(cr.Count()), nil } func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, diff --git a/index/scorch/segment/zap/merge_test.go b/index/scorch/segment/zap/merge_test.go index bb09f831..501947f9 100644 --- a/index/scorch/segment/zap/merge_test.go +++ b/index/scorch/segment/zap/merge_test.go @@ -72,7 +72,7 @@ func TestMerge(t *testing.T) { segsToMerge[0] = segment.(*Segment) segsToMerge[1] = segment2.(*Segment) - _, err = Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/scorch3.zap", 1024) + _, _, err = Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/scorch3.zap", 1024) if err != nil { t.Fatal(err) } @@ -176,7 +176,7 @@ func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int) drops := make([]*roaring.Bitmap, len(segsToMerge)) - _, err = Merge(segsToMerge, drops, "/tmp/scorch3.zap", 1024) + _, _, err = Merge(segsToMerge, drops, "/tmp/scorch3.zap", 1024) if err != nil { t.Fatal(err) } @@ -218,7 +218,7 @@ func testMergeWithSelf(t *testing.T, segCur *Segment, expectedCount uint64) { segsToMerge := make([]*Segment, 1) segsToMerge[0] = segCur - _, err := Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/"+fname, 1024) + _, _, err := Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/"+fname, 1024) if err != nil { t.Fatal(err) } @@ -590,7 +590,7 @@ func testMergeWithUpdates(t *testing.T, segmentDocIds [][]string, docsToDrop []* func testMergeAndDropSegments(t *testing.T, segsToMerge []*Segment, docsToDrop []*roaring.Bitmap, expectedNumDocs uint64) { _ = os.RemoveAll("/tmp/scorch-merged.zap") - _, err := Merge(segsToMerge, docsToDrop, "/tmp/scorch-merged.zap", 1024) + _, _, err := Merge(segsToMerge, docsToDrop, "/tmp/scorch-merged.zap", 1024) if err != nil { t.Fatal(err) } @@ -782,3 +782,80 @@ func buildMemSegmentMultiHelper(docIds []string) *mem.Segment { return segment } + +func TestMergeBytesWritten(t *testing.T) { + _ = os.RemoveAll("/tmp/scorch.zap") + _ = os.RemoveAll("/tmp/scorch2.zap") + _ = os.RemoveAll("/tmp/scorch3.zap") + + memSegment := buildMemSegmentMulti() + err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024) + if err != nil { + t.Fatal(err) + } + + memSegment2 := buildMemSegmentMulti2() + err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024) + if err != nil { + t.Fatal(err) + } + + segment, err := Open("/tmp/scorch.zap") + if err != nil { + t.Fatalf("error opening segment: %v", err) + } + defer func() { + cerr := segment.Close() + if cerr != nil { + t.Fatalf("error closing segment: %v", err) + } + }() + + segment2, err := Open("/tmp/scorch2.zap") + if err != nil { + t.Fatalf("error opening segment: %v", err) + } + defer func() { + cerr := segment2.Close() + if cerr != nil { + t.Fatalf("error closing segment: %v", err) + } + }() + + segsToMerge := make([]*Segment, 2) + segsToMerge[0] = segment.(*Segment) + segsToMerge[1] = segment2.(*Segment) + + _, nBytes, err := Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/scorch3.zap", 1024) + if err != nil { + t.Fatal(err) + } + + if nBytes == 0 { + t.Fatalf("expected a non zero total_compaction_written_bytes") + } + + segm, err := Open("/tmp/scorch3.zap") + if err != nil { + t.Fatalf("error opening merged segment: %v", err) + } + seg3 := segm.(*Segment) + defer func() { + cerr := seg3.Close() + if cerr != nil { + t.Fatalf("error closing segment: %v", err) + } + }() + + if seg3.Path() != "/tmp/scorch3.zap" { + t.Fatalf("wrong path") + } + if seg3.Count() != 4 { + t.Fatalf("wrong count") + } + if len(seg3.Fields()) != 5 { + t.Fatalf("wrong # fields: %#v\n", seg3.Fields()) + } + + testMergeWithSelf(t, seg3, 4) +} diff --git a/index/scorch/stats.go b/index/scorch/stats.go index 3c978af7..e9bcd91d 100644 --- a/index/scorch/stats.go +++ b/index/scorch/stats.go @@ -87,6 +87,7 @@ type Stats struct { TotFileMergeSegmentsEmpty uint64 TotFileMergeSegments uint64 + TotFileMergeWrittenBytes uint64 TotFileMergeZapBeg uint64 TotFileMergeZapEnd uint64