0
0
Fork 0

adding maxsegment size limit checks

This commit is contained in:
Sreekanth Sivasankaran 2018-03-13 17:29:05 +05:30
parent a526fe70f3
commit debbcd7d47
3 changed files with 80 additions and 0 deletions

View File

@ -111,6 +111,11 @@ func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions,
if err != nil {
return &mergePlannerOptions, err
}
err = mergeplan.ValidateMergePlannerOptions(&mergePlannerOptions)
if err != nil {
return nil, err
}
}
return &mergePlannerOptions, nil
}

View File

@ -18,6 +18,7 @@
package mergeplan
import (
"errors"
"fmt"
"math"
"sort"
@ -115,6 +116,14 @@ func (o *MergePlanOptions) RaiseToFloorSegmentSize(s int64) int64 {
return o.FloorSegmentSize
}
// MaxSegmentSizeLimit represents the maximum size of a segment,
// this limit comes as the roaring lib supports uint32.
const MaxSegmentSizeLimit = 1<<32 - 1
// ErrMaxSegmentSizeTooLarge is returned when the size of the segment
// exceeds the MaxSegmentSizeLimit
var ErrMaxSegmentSizeTooLarge = errors.New("MaxSegmentSize exceeds the size limit")
// Suggested default options.
var DefaultMergePlanOptions = MergePlanOptions{
MaxSegmentsPerTier: 10,
@ -367,3 +376,11 @@ func ToBarChart(prefix string, barMax int, segments []Segment, plan *MergePlan)
return strings.Join(rv, "\n")
}
// ValidateMergePlannerOptions validates the merge planner options
func ValidateMergePlannerOptions(options *MergePlanOptions) error {
if options.MaxSegmentSize > MaxSegmentSizeLimit {
return ErrMaxSegmentSizeTooLarge
}
return nil
}

View File

@ -17,10 +17,12 @@ package mergeplan
import (
"encoding/json"
"fmt"
"math/rand"
"os"
"reflect"
"sort"
"testing"
"time"
)
// Implements the Segment interface for testing,
@ -401,6 +403,62 @@ func TestManySameSizedSegmentsWithDeletesBetweenMerges(t *testing.T) {
}
}
func TestValidateMergePlannerOptions(t *testing.T) {
o := &MergePlanOptions{
MaxSegmentSize: 1 << 32,
MaxSegmentsPerTier: 3,
TierGrowth: 3.0,
SegmentsPerMergeTask: 3,
}
err := ValidateMergePlannerOptions(o)
if err != ErrMaxSegmentSizeTooLarge {
t.Error("Validation expected to fail as the MaxSegmentSize exceeds limit")
}
}
func TestPlanMaxSegmentSizeLimit(t *testing.T) {
o := &MergePlanOptions{
MaxSegmentSize: 20,
MaxSegmentsPerTier: 5,
TierGrowth: 3.0,
SegmentsPerMergeTask: 5,
FloorSegmentSize: 5,
}
segments := makeLinearSegments(20)
s := rand.NewSource(time.Now().UnixNano())
r := rand.New(s)
max := 20
min := 5
randomInRange := func() int64 {
return int64(r.Intn(max-min) + min)
}
for i := 1; i < 20; i++ {
o.MaxSegmentSize = randomInRange()
plans, err := Plan(segments, o)
if err != nil {
t.Errorf("Plan failed, err: %v", err)
}
if len(plans.Tasks) == 0 {
t.Errorf("expected some plans with tasks")
}
for _, task := range plans.Tasks {
var totalLiveSize int64
for _, segs := range task.Segments {
totalLiveSize += segs.LiveSize()
}
if totalLiveSize >= o.MaxSegmentSize {
t.Errorf("merged segments size: %d exceeding the MaxSegmentSize"+
"limit: %d", totalLiveSize, o.MaxSegmentSize)
}
}
}
}
// ----------------------------------------
type testCyclesSpec struct {