0
0

Merge pull request #649 from steveyen/scorch

mergeplan: scoring implemented
This commit is contained in:
Marty Schoch 2017-12-13 07:30:28 -05:00 committed by GitHub
commit a681314740
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 519 additions and 92 deletions

View File

@ -37,21 +37,8 @@ type Segment interface {
// assigned to at most a single MergeTask in the output MergePlan. A
// segment not assigned to any MergeTask means the segment should
// remain unmerged.
func Plan(segments []Segment, o *MergePlanOptions) (
result *MergePlan, err error) {
if len(segments) <= 1 {
return nil, nil
}
// TODO: PLACEHOLDER implementation for now, that always merges
// all the candidates.
return &MergePlan{
Tasks: []*MergeTask{
&MergeTask{
Segments: segments,
},
},
}, nil
func Plan(segments []Segment, o *MergePlanOptions) (*MergePlan, error) {
return plan(segments, o)
}
// A MergePlan is the result of the Plan() API.
@ -100,10 +87,6 @@ type MergePlanOptions struct {
// impact merge selection.
ReclaimDeletesWeight float64
// Only consider a segment for merging if its delete percentage is
// over this threshold.
MinDeletesPct float64
// Optional, defaults to mergeplan.CalcBudget().
CalcBudget func(totalSize int64, firstTierSize int64,
o *MergePlanOptions) (budgetNumSegments int)
@ -130,13 +113,11 @@ var DefaultMergePlanOptions = MergePlanOptions{
SegmentsPerMergeTask: 10,
FloorSegmentSize: 2000,
ReclaimDeletesWeight: 2.0,
MinDeletesPct: 10.0,
}
// -------------------------------------------
func plan(segmentsIn []Segment, o *MergePlanOptions) (
result *MergePlan, err error) {
func plan(segmentsIn []Segment, o *MergePlanOptions) (*MergePlan, error) {
if len(segmentsIn) <= 1 {
return nil, nil
}
@ -149,24 +130,20 @@ func plan(segmentsIn []Segment, o *MergePlanOptions) (
sort.Sort(byLiveSizeDescending(segments))
var segmentsLiveSize int64
var minLiveSize int64 = math.MaxInt64
var eligible []Segment
var eligibleLiveSize int64
var eligibles []Segment
var eligiblesLiveSize int64
for _, segment := range segments {
segmentsLiveSize += segment.LiveSize()
if minLiveSize > segment.LiveSize() {
minLiveSize = segment.LiveSize()
}
// Only small-enough segments are eligible.
if segment.LiveSize() < o.MaxSegmentSize/2 {
eligible = append(eligible, segment)
eligibleLiveSize += segment.LiveSize()
eligibles = append(eligibles, segment)
eligiblesLiveSize += segment.LiveSize()
}
}
@ -177,7 +154,7 @@ func plan(segmentsIn []Segment, o *MergePlanOptions) (
calcBudget = CalcBudget
}
budgetNumSegments := CalcBudget(eligibleLiveSize, minLiveSize, o)
budgetNumSegments := CalcBudget(eligiblesLiveSize, minLiveSize, o)
scoreSegments := o.ScoreSegments
if scoreSegments == nil {
@ -188,36 +165,32 @@ func plan(segmentsIn []Segment, o *MergePlanOptions) (
// While were over budget, keep looping, which might produce
// another MergeTask.
for len(eligible) > budgetNumSegments {
for len(eligibles) > budgetNumSegments {
// Track a current best roster as we examine and score
// potential rosters of merges.
var bestRoster []Segment
var bestRosterScore float64 // Lower score is better.
for startIdx := 0; startIdx < len(eligible)-o.SegmentsPerMergeTask; startIdx++ {
for startIdx := 0; startIdx < len(eligibles)-o.SegmentsPerMergeTask; startIdx++ {
var roster []Segment
var rosterLiveSize int64
for idx := startIdx; idx < len(eligible) && len(roster) < o.SegmentsPerMergeTask; idx++ {
rosterCandidate := eligible[idx]
for idx := startIdx; idx < len(eligibles) && len(roster) < o.SegmentsPerMergeTask; idx++ {
eligible := eligibles[idx]
if rosterLiveSize+rosterCandidate.LiveSize() > o.MaxSegmentSize {
// NOTE: We continue the loop, to try to “pack”
// the roster with smaller segments to get closer
// to the max size; but, we aren't doing full,
// comprehensive "bin-packing" permutations.
continue
if rosterLiveSize+eligible.LiveSize() < o.MaxSegmentSize {
roster = append(roster, eligible)
rosterLiveSize += eligible.LiveSize()
}
roster = append(roster, rosterCandidate)
rosterLiveSize += rosterCandidate.LiveSize()
}
rosterScore := scoreSegments(roster, o)
if len(roster) > 0 {
rosterScore := scoreSegments(roster, o)
if len(bestRoster) <= 0 || rosterScore < bestRosterScore {
bestRoster = roster
bestRosterScore = rosterScore
if len(bestRoster) <= 0 || rosterScore < bestRosterScore {
bestRoster = roster
bestRosterScore = rosterScore
}
}
}
@ -225,11 +198,9 @@ func plan(segmentsIn []Segment, o *MergePlanOptions) (
return rv, nil
}
rv.Tasks = append(rv.Tasks, &MergeTask{
Segments: bestRoster,
})
rv.Tasks = append(rv.Tasks, &MergeTask{Segments: bestRoster})
eligible = removeSegments(eligible, bestRoster)
eligibles = removeSegments(eligibles, bestRoster)
}
return rv, nil
@ -240,24 +211,38 @@ func plan(segmentsIn []Segment, o *MergePlanOptions) (
func CalcBudget(totalSize int64, firstTierSize int64, o *MergePlanOptions) (
budgetNumSegments int) {
tierSize := firstTierSize
if tierSize < 1 {
tierSize = 1
}
maxSegmentsPerTier := o.MaxSegmentsPerTier
if maxSegmentsPerTier < 1 {
maxSegmentsPerTier = 1
}
segmentsPerMergeTask := int64(o.SegmentsPerMergeTask)
if segmentsPerMergeTask < 2 {
segmentsPerMergeTask = 2
}
for totalSize > 0 {
segmentsInTier := float64(totalSize) / float64(tierSize)
if segmentsInTier < float64(o.MaxSegmentsPerTier) {
if segmentsInTier < float64(maxSegmentsPerTier) {
budgetNumSegments += int(math.Ceil(segmentsInTier))
break
}
budgetNumSegments += o.MaxSegmentsPerTier
totalSize -= int64(o.MaxSegmentsPerTier) * tierSize
tierSize *= int64(o.SegmentsPerMergeTask)
budgetNumSegments += maxSegmentsPerTier
totalSize -= int64(maxSegmentsPerTier) * tierSize
tierSize *= segmentsPerMergeTask
}
return budgetNumSegments
}
// removeSegments() keeps the ordering of the result segments stable.
func removeSegments(segments []Segment, toRemove []Segment) (rv []Segment) {
// Of note, removeSegments() keeps the ordering of the results stable.
func removeSegments(segments []Segment, toRemove []Segment) []Segment {
rv := make([]Segment, 0, len(segments)-len(toRemove))
OUTER:
for _, segment := range segments {
for _, r := range toRemove {
@ -270,6 +255,37 @@ OUTER:
return rv
}
// Smaller result score is better.
func ScoreSegments(segments []Segment, o *MergePlanOptions) float64 {
return 0 // TODO. Bogus score.
var totBeforeSize int64
var totAfterSize int64
var totAfterSizeFloored int64
for _, segment := range segments {
totBeforeSize += segment.FullSize()
totAfterSize += segment.LiveSize()
totAfterSizeFloored += o.RaiseToFloorSegmentSize(segment.LiveSize())
}
if totBeforeSize <= 0 || totAfterSize <= 0 || totAfterSizeFloored <= 0 {
return 0
}
// Roughly guess the "balance" of the segments -- whether the
// segments are about the same size.
balance :=
float64(o.RaiseToFloorSegmentSize(segments[0].LiveSize())) /
float64(totAfterSizeFloored)
// Gently favor smaller merges over bigger ones. We don't want to
// make the exponent too large else we end up with poor merges of
// small segments in order to avoid the large merges.
score := balance * math.Pow(float64(totAfterSize), 0.05)
// Strongly favor merges that reclaim deletes.
nonDelRatio := float64(totAfterSize) / float64(totBeforeSize)
score *= math.Pow(nonDelRatio, o.ReclaimDeletesWeight)
return score
}

View File

@ -12,64 +12,105 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// The segment merge planning approach was inspired by Lucene's
// TieredMergePolicy.java and descriptions like
// http://blog.mikemccandless.com/2011/02/visualizing-lucenes-segment-merges.html
package mergeplan
import (
"encoding/json"
"fmt"
"os"
"reflect"
"sort"
"strings"
"testing"
)
// Implements the Segment interface for testing,
type segment struct {
id uint64
fullSize int64
liveSize int64
MyId uint64
MyFullSize int64
MyLiveSize int64
}
func (s *segment) Id() uint64 { return s.id }
func (s *segment) FullSize() int64 { return s.fullSize }
func (s *segment) LiveSize() int64 { return s.liveSize }
func (s *segment) Id() uint64 { return s.MyId }
func (s *segment) FullSize() int64 { return s.MyFullSize }
func (s *segment) LiveSize() int64 { return s.MyLiveSize }
func makeLinearSegments(n int) (rv []Segment) {
for i := 0; i < n; i++ {
rv = append(rv, &segment{
id: uint64(i),
fullSize: int64(i),
liveSize: int64(i),
MyId: uint64(i),
MyFullSize: int64(i),
MyLiveSize: int64(i),
})
}
return rv
}
// ----------------------------------------
func TestSimplePlan(t *testing.T) {
segs := makeLinearSegments(10)
tests := []struct {
desc string
segments []Segment
expectPlan *MergePlan
expectErr error
Desc string
Segments []Segment
Options *MergePlanOptions
ExpectPlan *MergePlan
ExpectErr error
}{
{"nil candidates",
nil, nil, nil},
{"empty candidates",
[]Segment{}, nil, nil},
{"1 candidate",
[]Segment{segs[0]},
{"nil segments",
nil, nil, nil, nil},
{"empty segments",
[]Segment{}, nil, nil, nil},
{"1 segment",
[]Segment{segs[1]},
nil,
nil,
nil,
},
{"2 candidates",
{"2 segments",
[]Segment{
segs[0],
segs[1],
segs[2],
},
nil,
&MergePlan{},
nil,
},
{"3 segments",
[]Segment{
segs[1],
segs[2],
segs[9],
},
nil,
&MergePlan{},
nil,
},
{"many segments",
[]Segment{
segs[1],
segs[2],
segs[3],
segs[4],
segs[5],
segs[6],
},
&MergePlanOptions{
MaxSegmentsPerTier: 1,
MaxSegmentSize: 1000,
SegmentsPerMergeTask: 2,
FloorSegmentSize: 1,
},
&MergePlan{
[]*MergeTask{
Tasks: []*MergeTask{
&MergeTask{
Segments: []Segment{
segs[0],
segs[1],
segs[6],
segs[5],
},
},
},
@ -79,14 +120,383 @@ func TestSimplePlan(t *testing.T) {
}
for testi, test := range tests {
plan, err := Plan(test.segments, &DefaultMergePlanOptions)
if err != test.expectErr {
t.Errorf("testi: %d, test: %v, got err: %v",
testi, test, err)
plan, err := Plan(test.Segments, test.Options)
if err != test.ExpectErr {
testj, _ := json.Marshal(&test)
t.Errorf("testi: %d, test: %s, got err: %v",
testi, testj, err)
}
if !reflect.DeepEqual(plan, test.expectPlan) {
t.Errorf("testi: %d, test: %v, got plan: %v",
testi, test, plan)
if !reflect.DeepEqual(plan, test.ExpectPlan) {
testj, _ := json.Marshal(&test)
planj, _ := json.Marshal(&plan)
t.Errorf("testi: %d, test: %s, got plan: %s",
testi, testj, planj)
}
}
}
// ----------------------------------------
func TestSort(t *testing.T) {
segs := makeLinearSegments(10)
sort.Sort(byLiveSizeDescending(segs))
for i := 1; i < len(segs); i++ {
if segs[i].LiveSize() >= segs[i-1].LiveSize() {
t.Errorf("not descending")
}
}
}
// ----------------------------------------
func TestCalcBudget(t *testing.T) {
tests := []struct {
totalSize int64
firstTierSize int64
o MergePlanOptions
expect int
}{
{0, 0, MergePlanOptions{}, 0},
{1, 0, MergePlanOptions{}, 1},
{9, 0, MergePlanOptions{}, 4},
{1, 1,
MergePlanOptions{
MaxSegmentsPerTier: 1,
MaxSegmentSize: 1000,
SegmentsPerMergeTask: 2,
FloorSegmentSize: 1,
},
1,
},
{21, 1,
MergePlanOptions{
MaxSegmentsPerTier: 1,
MaxSegmentSize: 1000,
SegmentsPerMergeTask: 2,
FloorSegmentSize: 1,
},
5,
},
{21, 1,
MergePlanOptions{
MaxSegmentsPerTier: 2,
MaxSegmentSize: 1000,
SegmentsPerMergeTask: 2,
FloorSegmentSize: 1,
},
7,
},
}
for testi, test := range tests {
res := CalcBudget(test.totalSize, test.firstTierSize, &test.o)
if res != test.expect {
t.Errorf("testi: %d, test: %#v, res: %v",
testi, test, res)
}
}
}
// ----------------------------------------
func TestInsert1SameSizedSegmentBetweenMerges(t *testing.T) {
o := &MergePlanOptions{
MaxSegmentSize: 1000,
MaxSegmentsPerTier: 3,
SegmentsPerMergeTask: 3,
}
spec := testCyclesSpec{
descrip: "i1sssbm",
verbose: os.Getenv("VERBOSE") == "i1sssbm" || os.Getenv("VERBOSE") == "y",
n: 200,
o: o,
beforePlan: func(spec *testCyclesSpec) {
spec.segments = append(spec.segments, &segment{
MyId: spec.nextSegmentId,
MyFullSize: 1,
MyLiveSize: 1,
})
spec.nextSegmentId++
},
}
spec.runCycles(t)
}
func TestInsertManySameSizedSegmentsBetweenMerges(t *testing.T) {
o := &MergePlanOptions{
MaxSegmentSize: 1000,
MaxSegmentsPerTier: 3,
SegmentsPerMergeTask: 3,
}
spec := testCyclesSpec{
descrip: "imsssbm",
verbose: os.Getenv("VERBOSE") == "imsssbm" || os.Getenv("VERBOSE") == "y",
n: 20,
o: o,
beforePlan: func(spec *testCyclesSpec) {
for i := 0; i < 10; i++ {
spec.segments = append(spec.segments, &segment{
MyId: spec.nextSegmentId,
MyFullSize: 1,
MyLiveSize: 1,
})
spec.nextSegmentId++
}
},
}
spec.runCycles(t)
}
func TestInsertManySameSizedSegmentsWithDeletionsBetweenMerges(t *testing.T) {
o := &MergePlanOptions{
MaxSegmentSize: 1000,
MaxSegmentsPerTier: 3,
SegmentsPerMergeTask: 3,
}
spec := testCyclesSpec{
descrip: "imssswdbm",
verbose: os.Getenv("VERBOSE") == "imssswdbm" || os.Getenv("VERBOSE") == "y",
n: 20,
o: o,
beforePlan: func(spec *testCyclesSpec) {
for i := 0; i < 10; i++ {
// Deletions are a shrinking of the live size.
for i, seg := range spec.segments {
if (spec.cycle+i)%5 == 0 {
s := seg.(*segment)
if s.MyLiveSize > 0 {
s.MyLiveSize -= 1
}
}
}
spec.segments = append(spec.segments, &segment{
MyId: spec.nextSegmentId,
MyFullSize: 1,
MyLiveSize: 1,
})
spec.nextSegmentId++
}
},
}
spec.runCycles(t)
}
func TestInsertManyDifferentSizedSegmentsBetweenMerges(t *testing.T) {
o := &MergePlanOptions{
MaxSegmentSize: 1000,
MaxSegmentsPerTier: 3,
SegmentsPerMergeTask: 3,
}
spec := testCyclesSpec{
descrip: "imdssbm",
verbose: os.Getenv("VERBOSE") == "imdssbm" || os.Getenv("VERBOSE") == "y",
n: 20,
o: o,
beforePlan: func(spec *testCyclesSpec) {
for i := 0; i < 10; i++ {
spec.segments = append(spec.segments, &segment{
MyId: spec.nextSegmentId,
MyFullSize: int64(1 + (i % 5)),
MyLiveSize: int64(1 + (i % 5)),
})
spec.nextSegmentId++
}
},
}
spec.runCycles(t)
}
func TestManySameSizedSegmentsWithDeletesBetweenMerges(t *testing.T) {
o := &MergePlanOptions{
MaxSegmentSize: 1000,
MaxSegmentsPerTier: 3,
SegmentsPerMergeTask: 3,
}
var numPlansWithTasks int
spec := testCyclesSpec{
descrip: "mssswdbm",
verbose: os.Getenv("VERBOSE") == "mssswdbm" || os.Getenv("VERBOSE") == "y",
n: 20,
o: o,
beforePlan: func(spec *testCyclesSpec) {
// Deletions are a shrinking of the live size.
for i, seg := range spec.segments {
if (spec.cycle+i)%5 == 0 {
s := seg.(*segment)
if s.MyLiveSize > 0 {
s.MyLiveSize -= 1
}
}
}
for i := 0; i < 10; i++ {
spec.segments = append(spec.segments, &segment{
MyId: spec.nextSegmentId,
MyFullSize: 1,
MyLiveSize: 1,
})
spec.nextSegmentId++
}
},
afterPlan: func(spec *testCyclesSpec, plan *MergePlan) {
if plan != nil && len(plan.Tasks) > 0 {
numPlansWithTasks++
}
},
}
spec.runCycles(t)
if numPlansWithTasks <= 0 {
t.Errorf("expected some plans with tasks")
}
}
// ----------------------------------------
type testCyclesSpec struct {
descrip string
verbose bool
n int // Number of cycles to run.
o *MergePlanOptions
beforePlan func(*testCyclesSpec)
afterPlan func(*testCyclesSpec, *MergePlan)
cycle int
segments []Segment
nextSegmentId uint64
}
func (spec *testCyclesSpec) runCycles(t *testing.T) {
numPlansWithTasks := 0
for spec.cycle < spec.n {
if spec.verbose {
emit(spec.descrip, spec.cycle, 0, spec.segments, nil)
}
if spec.beforePlan != nil {
spec.beforePlan(spec)
}
if spec.verbose {
emit(spec.descrip, spec.cycle, 1, spec.segments, nil)
}
plan, err := Plan(spec.segments, spec.o)
if err != nil {
t.Fatalf("expected no err, got: %v", err)
}
if spec.afterPlan != nil {
spec.afterPlan(spec, plan)
}
if spec.verbose {
emit(spec.descrip, spec.cycle, 2, spec.segments, plan)
}
if plan != nil {
if len(plan.Tasks) > 0 {
numPlansWithTasks++
}
for _, task := range plan.Tasks {
spec.segments = removeSegments(spec.segments, task.Segments)
var totLiveSize int64
for _, segment := range task.Segments {
totLiveSize += segment.LiveSize()
}
if totLiveSize > 0 {
spec.segments = append(spec.segments, &segment{
MyId: spec.nextSegmentId,
MyFullSize: totLiveSize,
MyLiveSize: totLiveSize,
})
spec.nextSegmentId++
}
}
}
spec.cycle++
}
if numPlansWithTasks <= 0 {
t.Errorf("expected some plans with tasks")
}
}
func emit(descrip string, cycle int, step int, segments []Segment, plan *MergePlan) {
if os.Getenv("VERBOSE") == "" {
return
}
suffix := ""
if plan != nil && len(plan.Tasks) > 0 {
suffix = "hasPlan"
}
fmt.Printf("%s %d.%d ---------- %s\n", descrip, cycle, step, suffix)
var maxFullSize int64
for _, segment := range segments {
if maxFullSize < segment.FullSize() {
maxFullSize = segment.FullSize()
}
}
barMax := 100
for _, segment := range segments {
barFull := int(segment.FullSize())
barLive := int(segment.LiveSize())
if maxFullSize > int64(barMax) {
barFull = int(float64(barMax) * float64(barFull) / float64(maxFullSize))
barLive = int(float64(barMax) * float64(barLive) / float64(maxFullSize))
}
barKind := " "
barChar := "."
if plan != nil {
TASK_LOOP:
for taski, task := range plan.Tasks {
for _, taskSegment := range task.Segments {
if taskSegment == segment {
barKind = "*"
barChar = fmt.Sprintf("%d", taski)
break TASK_LOOP
}
}
}
}
bar :=
strings.Repeat(barChar, barLive)[0:barLive] +
strings.Repeat("x", barFull-barLive)[0:barFull-barLive]
fmt.Printf("%s %5d: %5d /%5d - %s %s\n", descrip,
segment.Id(),
segment.LiveSize(),
segment.FullSize(),
barKind, bar)
}
}

View File

@ -16,12 +16,13 @@ package mergeplan
type byLiveSizeDescending []Segment
func (a byLiveSizeDescending) Len() int { return len(a) }
func (a byLiveSizeDescending) Len() int { return len(a) }
func (a byLiveSizeDescending) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byLiveSizeDescending) Less(i, j int) bool {
if a[i].LiveSize() != a[j].LiveSize() {
return a[i].LiveSize() < a[j].LiveSize()
return a[i].LiveSize() > a[j].LiveSize()
}
return a[i].Id() < a[j].Id()
}