0
0
Fork 0

naming change, interface removal

This commit is contained in:
Sreekanth Sivasankaran 2018-03-07 14:37:33 +05:30
parent 395b0a312d
commit 2a9739ee1b
5 changed files with 18 additions and 42 deletions

View File

@ -180,8 +180,9 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
s.markIneligibleForRemoval(filename) s.markIneligibleForRemoval(filename)
path := s.path + string(os.PathSeparator) + filename path := s.path + string(os.PathSeparator) + filename
atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1) atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024, &s.stats) newDocNums, nBytes, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024)
atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1) atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)
atomic.AddUint64(&s.stats.TotFileMergeWrittenBytes, nBytes)
if err != nil { if err != nil {
s.unmarkIneligibleForRemoval(filename) s.unmarkIneligibleForRemoval(filename)
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1) atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)

View File

@ -430,7 +430,7 @@ func (s *Scorch) StatsMap() map[string]interface{} {
m["num_items_persisted"] = m["TotPersistedItems"] m["num_items_persisted"] = m["TotPersistedItems"]
m["num_bytes_used_disk"] = m["CurOnDiskBytes"] m["num_bytes_used_disk"] = m["CurOnDiskBytes"]
m["num_files_on_disk"] = m["CurOnDiskFiles"] m["num_files_on_disk"] = m["CurOnDiskFiles"]
m["total_compaction_written_bytes"] = m["TotCompactionWrittenBytes"] m["total_compaction_written_bytes"] = m["TotFileMergeWrittenBytes"]
return m return m
} }

View File

@ -31,22 +31,17 @@ import (
const docDropped = math.MaxUint64 // sentinel docNum to represent a deleted doc const docDropped = math.MaxUint64 // sentinel docNum to represent a deleted doc
// StatsReporter interface represents stats reporting methods.
type StatsReporter interface {
ReportBytesWritten(numBytesWritten uint64)
}
// Merge takes a slice of zap segments and bit masks describing which // Merge takes a slice of zap segments and bit masks describing which
// documents may be dropped, and creates a new segment containing the // documents may be dropped, and creates a new segment containing the
// remaining data. This new segment is built at the specified path, // remaining data. This new segment is built at the specified path,
// with the provided chunkFactor. // with the provided chunkFactor.
func Merge(segments []*Segment, drops []*roaring.Bitmap, path string, func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
chunkFactor uint32, stats StatsReporter) ([][]uint64, error) { chunkFactor uint32) ([][]uint64, uint64, error) {
flag := os.O_RDWR | os.O_CREATE flag := os.O_RDWR | os.O_CREATE
f, err := os.OpenFile(path, flag, 0600) f, err := os.OpenFile(path, flag, 0600)
if err != nil { if err != nil {
return nil, err return nil, 0, err
} }
cleanup := func() { cleanup := func() {
@ -69,39 +64,35 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
MergeToWriter(segmentBases, drops, chunkFactor, cr) MergeToWriter(segmentBases, drops, chunkFactor, cr)
if err != nil { if err != nil {
cleanup() cleanup()
return nil, err return nil, 0, err
} }
err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset,
docValueOffset, chunkFactor, cr.Sum32(), cr) docValueOffset, chunkFactor, cr.Sum32(), cr)
if err != nil { if err != nil {
cleanup() cleanup()
return nil, err return nil, 0, err
} }
err = br.Flush() err = br.Flush()
if err != nil { if err != nil {
cleanup() cleanup()
return nil, err return nil, 0, err
} }
err = f.Sync() err = f.Sync()
if err != nil { if err != nil {
cleanup() cleanup()
return nil, err return nil, 0, err
} }
err = f.Close() err = f.Close()
if err != nil { if err != nil {
cleanup() cleanup()
return nil, err return nil, 0, err
} }
if stats != nil { return newDocNums, uint64(cr.Count()), nil
stats.ReportBytesWritten(uint64(cr.Count()))
}
return newDocNums, nil
} }
func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,

View File

@ -20,7 +20,6 @@ import (
"reflect" "reflect"
"sort" "sort"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"github.com/RoaringBitmap/roaring" "github.com/RoaringBitmap/roaring"
@ -73,7 +72,7 @@ func TestMerge(t *testing.T) {
segsToMerge[0] = segment.(*Segment) segsToMerge[0] = segment.(*Segment)
segsToMerge[1] = segment2.(*Segment) segsToMerge[1] = segment2.(*Segment)
_, err = Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/scorch3.zap", 1024, nil) _, _, err = Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/scorch3.zap", 1024)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -177,7 +176,7 @@ func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int)
drops := make([]*roaring.Bitmap, len(segsToMerge)) drops := make([]*roaring.Bitmap, len(segsToMerge))
_, err = Merge(segsToMerge, drops, "/tmp/scorch3.zap", 1024, nil) _, _, err = Merge(segsToMerge, drops, "/tmp/scorch3.zap", 1024)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -219,7 +218,7 @@ func testMergeWithSelf(t *testing.T, segCur *Segment, expectedCount uint64) {
segsToMerge := make([]*Segment, 1) segsToMerge := make([]*Segment, 1)
segsToMerge[0] = segCur segsToMerge[0] = segCur
_, err := Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/"+fname, 1024, nil) _, _, err := Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/"+fname, 1024)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -591,7 +590,7 @@ func testMergeWithUpdates(t *testing.T, segmentDocIds [][]string, docsToDrop []*
func testMergeAndDropSegments(t *testing.T, segsToMerge []*Segment, docsToDrop []*roaring.Bitmap, expectedNumDocs uint64) { func testMergeAndDropSegments(t *testing.T, segsToMerge []*Segment, docsToDrop []*roaring.Bitmap, expectedNumDocs uint64) {
_ = os.RemoveAll("/tmp/scorch-merged.zap") _ = os.RemoveAll("/tmp/scorch-merged.zap")
_, err := Merge(segsToMerge, docsToDrop, "/tmp/scorch-merged.zap", 1024, nil) _, _, err := Merge(segsToMerge, docsToDrop, "/tmp/scorch-merged.zap", 1024)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -784,14 +783,6 @@ func buildMemSegmentMultiHelper(docIds []string) *mem.Segment {
return segment return segment
} }
type statTest struct {
totalWrittenBytes uint64
}
func (s *statTest) ReportBytesWritten(numBytesWritten uint64) {
atomic.AddUint64(&s.totalWrittenBytes, numBytesWritten)
}
func TestMergeBytesWritten(t *testing.T) { func TestMergeBytesWritten(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.zap") _ = os.RemoveAll("/tmp/scorch.zap")
_ = os.RemoveAll("/tmp/scorch2.zap") _ = os.RemoveAll("/tmp/scorch2.zap")
@ -835,14 +826,12 @@ func TestMergeBytesWritten(t *testing.T) {
segsToMerge[0] = segment.(*Segment) segsToMerge[0] = segment.(*Segment)
segsToMerge[1] = segment2.(*Segment) segsToMerge[1] = segment2.(*Segment)
reporter := &statTest{} _, nBytes, err := Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/scorch3.zap", 1024)
_, err = Merge(segsToMerge, []*roaring.Bitmap{nil, nil}, "/tmp/scorch3.zap", 1024, reporter)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if reporter.totalWrittenBytes == 0 { if nBytes == 0 {
t.Fatalf("expected a non zero total_compaction_written_bytes") t.Fatalf("expected a non zero total_compaction_written_bytes")
} }

View File

@ -87,6 +87,7 @@ type Stats struct {
TotFileMergeSegmentsEmpty uint64 TotFileMergeSegmentsEmpty uint64
TotFileMergeSegments uint64 TotFileMergeSegments uint64
TotFileMergeWrittenBytes uint64
TotFileMergeZapBeg uint64 TotFileMergeZapBeg uint64
TotFileMergeZapEnd uint64 TotFileMergeZapEnd uint64
@ -100,8 +101,6 @@ type Stats struct {
TotMemMergeZapBeg uint64 TotMemMergeZapBeg uint64
TotMemMergeZapEnd uint64 TotMemMergeZapEnd uint64
TotMemMergeSegments uint64 TotMemMergeSegments uint64
TotCompactionWrittenBytes uint64
} }
// atomically populates the returned map // atomically populates the returned map
@ -124,7 +123,3 @@ func (s *Stats) ToMap() map[string]interface{} {
func (s *Stats) MarshalJSON() ([]byte, error) { func (s *Stats) MarshalJSON() ([]byte, error) {
return json.Marshal(s.ToMap()) return json.Marshal(s.ToMap())
} }
func (s *Stats) ReportBytesWritten(numBytesWritten uint64) {
atomic.AddUint64(&s.TotCompactionWrittenBytes, numBytesWritten)
}