Merge pull request #780 from sreekanth-cb/mergeplanner_options
configurable mergePlanner options
This commit is contained in:
commit
002df80357
|
@ -16,6 +16,8 @@ package scorch
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
|
||||
"fmt"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
|
@ -29,6 +31,13 @@ import (
|
|||
|
||||
func (s *Scorch) mergerLoop() {
|
||||
var lastEpochMergePlanned uint64
|
||||
mergePlannerOptions, err := s.parseMergePlannerOptions()
|
||||
if err != nil {
|
||||
s.fireAsyncError(fmt.Errorf("mergePlannerOption json parsing err: %v", err))
|
||||
s.asyncTasks.Done()
|
||||
return
|
||||
}
|
||||
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
|
@ -46,7 +55,7 @@ OUTER:
|
|||
startTime := time.Now()
|
||||
|
||||
// lets get started
|
||||
err := s.planMergeAtSnapshot(ourSnapshot)
|
||||
err := s.planMergeAtSnapshot(ourSnapshot, mergePlannerOptions)
|
||||
if err != nil {
|
||||
s.fireAsyncError(fmt.Errorf("merging err: %v", err))
|
||||
_ = ourSnapshot.DecRef()
|
||||
|
@ -83,7 +92,25 @@ OUTER:
|
|||
s.asyncTasks.Done()
|
||||
}
|
||||
|
||||
func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
|
||||
func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions,
|
||||
error) {
|
||||
mergePlannerOptions := mergeplan.DefaultMergePlanOptions
|
||||
if v, ok := s.config["scorchMergePlanOptions"]; ok {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return &mergePlannerOptions, err
|
||||
}
|
||||
|
||||
err = json.Unmarshal(b, &mergePlannerOptions)
|
||||
if err != nil {
|
||||
return &mergePlannerOptions, err
|
||||
}
|
||||
}
|
||||
return &mergePlannerOptions, nil
|
||||
}
|
||||
|
||||
func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
|
||||
options *mergeplan.MergePlanOptions) error {
|
||||
// build list of zap segments in this snapshot
|
||||
var onlyZapSnapshots []mergeplan.Segment
|
||||
for _, segmentSnapshot := range ourSnapshot.segment {
|
||||
|
@ -93,7 +120,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
|
|||
}
|
||||
|
||||
// give this list to the planner
|
||||
resultMergePlan, err := mergeplan.Plan(onlyZapSnapshots, nil)
|
||||
resultMergePlan, err := mergeplan.Plan(onlyZapSnapshots, options)
|
||||
if err != nil {
|
||||
return fmt.Errorf("merge planning err: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue