0
0
Fork 0

configurable mergePlanner options

mergePlanner options are parsed from the
scorch configs parameters
This commit is contained in:
Sreekanth Sivasankaran 2018-02-23 15:35:58 +05:30
parent 030469a351
commit a1db057656
1 changed files with 33 additions and 3 deletions

View File

@ -28,6 +28,7 @@ import (
func (s *Scorch) mergerLoop() {
var lastEpochMergePlanned uint64
mergePlannerOptions := s.parseMergePlannerOptions()
OUTER:
for {
select {
@ -45,7 +46,7 @@ OUTER:
startTime := time.Now()
// lets get started
err := s.planMergeAtSnapshot(ourSnapshot)
err := s.planMergeAtSnapshot(ourSnapshot, mergePlannerOptions)
if err != nil {
s.fireAsyncError(fmt.Errorf("merging err: %v", err))
_ = ourSnapshot.DecRef()
@ -82,7 +83,36 @@ OUTER:
s.asyncTasks.Done()
}
func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
func (s *Scorch) parseMergePlannerOptions() *mergeplan.MergePlanOptions {
mergePlannerOptions := &mergeplan.DefaultMergePlanOptions
scorchOptions := map[string]interface{}{}
if v, ok := s.config["scorchOptions"]; ok {
if scorchOptions, ok = v.(map[string]interface{}); ok {
if v, ok := scorchOptions["maxSegmentsPerTier"].(float64); ok {
mergePlannerOptions.MaxSegmentsPerTier = int(v)
}
if v, ok := scorchOptions["maxSegmentSize"].(float64); ok {
mergePlannerOptions.MaxSegmentSize = int64(v)
}
if v, ok := scorchOptions["tierGrowth"].(float64); ok {
mergePlannerOptions.TierGrowth = v
}
if v, ok := scorchOptions["segmentsPerMergeTask"].(float64); ok {
mergePlannerOptions.SegmentsPerMergeTask = int(v)
}
if v, ok := scorchOptions["floorSegmentSize"].(float64); ok {
mergePlannerOptions.FloorSegmentSize = int64(v)
}
if v, ok := scorchOptions["reclaimDeletesWeight"].(float64); ok {
mergePlannerOptions.ReclaimDeletesWeight = v
}
}
}
return mergePlannerOptions
}
func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
options *mergeplan.MergePlanOptions) error {
// build list of zap segments in this snapshot
var onlyZapSnapshots []mergeplan.Segment
for _, segmentSnapshot := range ourSnapshot.segment {
@ -92,7 +122,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
}
// give this list to the planner
resultMergePlan, err := mergeplan.Plan(onlyZapSnapshots, nil)
resultMergePlan, err := mergeplan.Plan(onlyZapSnapshots, options)
if err != nil {
return fmt.Errorf("merge planning err: %v", err)
}