Merge pull request #798 from blevesearch/compaction_bytes_stats
adding compaction_written_bytes/sec stats to scorch
This commit is contained in:
commit
c813165d4b
|
@ -183,16 +183,17 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
|
||||||
fileMergeZapStartTime := time.Now()
|
fileMergeZapStartTime := time.Now()
|
||||||
|
|
||||||
atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
|
atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
|
||||||
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024)
|
newDocNums, nBytes, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024)
|
||||||
atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)
|
atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)
|
||||||
|
atomic.AddUint64(&s.stats.TotFileMergeWrittenBytes, nBytes)
|
||||||
|
|
||||||
fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime))
|
fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime))
|
||||||
atomic.AddUint64(&s.stats.TotFileMergeZapTime, fileMergeZapTime)
|
atomic.AddUint64(&s.stats.TotFileMergeZapTime, fileMergeZapTime)
|
||||||
if atomic.LoadUint64(&s.stats.MaxFileMergeZapTime) < fileMergeZapTime {
|
if atomic.LoadUint64(&s.stats.MaxFileMergeZapTime) < fileMergeZapTime {
|
||||||
atomic.StoreUint64(&s.stats.MaxFileMergeZapTime, fileMergeZapTime)
|
atomic.StoreUint64(&s.stats.MaxFileMergeZapTime, fileMergeZapTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.unmarkIneligibleForRemoval(filename)
|
s.unmarkIneligibleForRemoval(filename)
|
||||||
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
|
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
|
||||||
return fmt.Errorf("merging failed: %v", err)
|
return fmt.Errorf("merging failed: %v", err)
|
||||||
|
|
|
@ -430,6 +430,7 @@ func (s *Scorch) StatsMap() map[string]interface{} {
|
||||||
m["num_items_persisted"] = m["TotPersistedItems"]
|
m["num_items_persisted"] = m["TotPersistedItems"]
|
||||||
m["num_bytes_used_disk"] = m["CurOnDiskBytes"]
|
m["num_bytes_used_disk"] = m["CurOnDiskBytes"]
|
||||||
m["num_files_on_disk"] = m["CurOnDiskFiles"]
|
m["num_files_on_disk"] = m["CurOnDiskFiles"]
|
||||||
|
m["total_compaction_written_bytes"] = m["TotFileMergeWrittenBytes"]
|
||||||
|
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,12 +36,12 @@ const docDropped = math.MaxUint64 // sentinel docNum to represent a deleted doc
|
||||||
// remaining data. This new segment is built at the specified path,
|
// remaining data. This new segment is built at the specified path,
|
||||||
// with the provided chunkFactor.
|
// with the provided chunkFactor.
|
||||||
func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
|
func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
|
||||||
chunkFactor uint32) ([][]uint64, error) {
|
chunkFactor uint32) ([][]uint64, uint64, error) {
|
||||||
flag := os.O_RDWR | os.O_CREATE
|
flag := os.O_RDWR | os.O_CREATE
|
||||||
|
|
||||||
f, err := os.OpenFile(path, flag, 0600)
|
f, err := os.OpenFile(path, flag, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup := func() {
|
cleanup := func() {
|
||||||
|
@ -64,35 +64,35 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
|
||||||
MergeToWriter(segmentBases, drops, chunkFactor, cr)
|
MergeToWriter(segmentBases, drops, chunkFactor, cr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cleanup()
|
cleanup()
|
||||||
return nil, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset,
|
err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset,
|
||||||
docValueOffset, chunkFactor, cr.Sum32(), cr)
|
docValueOffset, chunkFactor, cr.Sum32(), cr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cleanup()
|
cleanup()
|
||||||
return nil, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = br.Flush()
|
err = br.Flush()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cleanup()
|
cleanup()
|
||||||
return nil, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = f.Sync()
|
err = f.Sync()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cleanup()
|
cleanup()
|
||||||
return nil, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = f.Close()
|
err = f.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cleanup()
|
cleanup()
|
||||||
return nil, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return newDocNums, nil
|
return newDocNums, uint64(cr.Count()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
|
func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
|
||||||
|
|
|
@ -72,7 +72,7 @@ func TestMerge(t *testing.T) {
|
||||||
segsToMerge[0] = segment.(*Segment)
|
segsToMerge[0] = segment.(*Segment)
|
||||||
segsToMerge[1] = segment2.(*Segment)
|
segsToMerge[1] = segment2.(*Segment)
|
||||||
|
|
||||||
_, err = Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/scorch3.zap", 1024)
|
_, _, err = Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/scorch3.zap", 1024)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -176,7 +176,7 @@ func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int)
|
||||||
|
|
||||||
drops := make([]*roaring.Bitmap, len(segsToMerge))
|
drops := make([]*roaring.Bitmap, len(segsToMerge))
|
||||||
|
|
||||||
_, err = Merge(segsToMerge, drops, "/tmp/scorch3.zap", 1024)
|
_, _, err = Merge(segsToMerge, drops, "/tmp/scorch3.zap", 1024)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -218,7 +218,7 @@ func testMergeWithSelf(t *testing.T, segCur *Segment, expectedCount uint64) {
|
||||||
segsToMerge := make([]*Segment, 1)
|
segsToMerge := make([]*Segment, 1)
|
||||||
segsToMerge[0] = segCur
|
segsToMerge[0] = segCur
|
||||||
|
|
||||||
_, err := Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/"+fname, 1024)
|
_, _, err := Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/"+fname, 1024)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -590,7 +590,7 @@ func testMergeWithUpdates(t *testing.T, segmentDocIds [][]string, docsToDrop []*
|
||||||
func testMergeAndDropSegments(t *testing.T, segsToMerge []*Segment, docsToDrop []*roaring.Bitmap, expectedNumDocs uint64) {
|
func testMergeAndDropSegments(t *testing.T, segsToMerge []*Segment, docsToDrop []*roaring.Bitmap, expectedNumDocs uint64) {
|
||||||
_ = os.RemoveAll("/tmp/scorch-merged.zap")
|
_ = os.RemoveAll("/tmp/scorch-merged.zap")
|
||||||
|
|
||||||
_, err := Merge(segsToMerge, docsToDrop, "/tmp/scorch-merged.zap", 1024)
|
_, _, err := Merge(segsToMerge, docsToDrop, "/tmp/scorch-merged.zap", 1024)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -782,3 +782,80 @@ func buildMemSegmentMultiHelper(docIds []string) *mem.Segment {
|
||||||
|
|
||||||
return segment
|
return segment
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMergeBytesWritten(t *testing.T) {
|
||||||
|
_ = os.RemoveAll("/tmp/scorch.zap")
|
||||||
|
_ = os.RemoveAll("/tmp/scorch2.zap")
|
||||||
|
_ = os.RemoveAll("/tmp/scorch3.zap")
|
||||||
|
|
||||||
|
memSegment := buildMemSegmentMulti()
|
||||||
|
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
memSegment2 := buildMemSegmentMulti2()
|
||||||
|
err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
segment, err := Open("/tmp/scorch.zap")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error opening segment: %v", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
cerr := segment.Close()
|
||||||
|
if cerr != nil {
|
||||||
|
t.Fatalf("error closing segment: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
segment2, err := Open("/tmp/scorch2.zap")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error opening segment: %v", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
cerr := segment2.Close()
|
||||||
|
if cerr != nil {
|
||||||
|
t.Fatalf("error closing segment: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
segsToMerge := make([]*Segment, 2)
|
||||||
|
segsToMerge[0] = segment.(*Segment)
|
||||||
|
segsToMerge[1] = segment2.(*Segment)
|
||||||
|
|
||||||
|
_, nBytes, err := Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/scorch3.zap", 1024)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if nBytes == 0 {
|
||||||
|
t.Fatalf("expected a non zero total_compaction_written_bytes")
|
||||||
|
}
|
||||||
|
|
||||||
|
segm, err := Open("/tmp/scorch3.zap")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error opening merged segment: %v", err)
|
||||||
|
}
|
||||||
|
seg3 := segm.(*Segment)
|
||||||
|
defer func() {
|
||||||
|
cerr := seg3.Close()
|
||||||
|
if cerr != nil {
|
||||||
|
t.Fatalf("error closing segment: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if seg3.Path() != "/tmp/scorch3.zap" {
|
||||||
|
t.Fatalf("wrong path")
|
||||||
|
}
|
||||||
|
if seg3.Count() != 4 {
|
||||||
|
t.Fatalf("wrong count")
|
||||||
|
}
|
||||||
|
if len(seg3.Fields()) != 5 {
|
||||||
|
t.Fatalf("wrong # fields: %#v\n", seg3.Fields())
|
||||||
|
}
|
||||||
|
|
||||||
|
testMergeWithSelf(t, seg3, 4)
|
||||||
|
}
|
||||||
|
|
|
@ -87,6 +87,7 @@ type Stats struct {
|
||||||
|
|
||||||
TotFileMergeSegmentsEmpty uint64
|
TotFileMergeSegmentsEmpty uint64
|
||||||
TotFileMergeSegments uint64
|
TotFileMergeSegments uint64
|
||||||
|
TotFileMergeWrittenBytes uint64
|
||||||
|
|
||||||
TotFileMergeZapBeg uint64
|
TotFileMergeZapBeg uint64
|
||||||
TotFileMergeZapEnd uint64
|
TotFileMergeZapEnd uint64
|
||||||
|
|
Loading…
Reference in New Issue