// Copyright (c) 2017 Couchbase, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package mergeplan import ( "math" "sort" ) // A Segment represents the information that the planner needs to // calculate segment merging. type Segment interface { // Unique id of the segment -- used for sorting. Id() uint64 // Full segment size (the size before any logical deletions). FullSize() int64 // Size of the live data of the segment; i.e., FullSize() minus // any logical deletions. LiveSize() int64 } // Plan() will functionally compute a merge plan. A segment will be // assigned to at most a single MergeTask in the output MergePlan. A // segment not assigned to any MergeTask means the segment should // remain unmerged. func Plan(segments []Segment, o *MergePlanOptions) (*MergePlan, error) { return plan(segments, o) } // A MergePlan is the result of the Plan() API. // // The planner doesn’t know how or whether these tasks are executed -- // that’s up to a separate merge execution system, which might execute // these tasks concurrently or not, and which might execute all the // tasks or not. type MergePlan struct { Tasks []*MergeTask } // A MergeTask represents several segments that should be merged // together into a single segment. type MergeTask struct { Segments []Segment } // The MergePlanOptions is designed to be reusable between planning calls. type MergePlanOptions struct { // Max # segments per logarithmic tier, or max width of any // logarithmic “step”. Smaller values mean more merging but fewer // segments. Should be >= SegmentsPerMergeTask, else you'll have // too much merging. MaxSegmentsPerTier int // Max size of any segment produced after merging. Actual // merging, however, may produce segment sizes different than the // planner’s predicted sizes. MaxSegmentSize int64 // The number of segments in any resulting MergeTask. e.g., // len(result.Tasks[ * ].Segments) == SegmentsPerMergeTask. SegmentsPerMergeTask int // Small segments are rounded up to this size, i.e., treated as // equal (floor) size for consideration. This is to prevent lots // of tiny segments from resulting in a long tail in the index. FloorSegmentSize int64 // Controls how aggressively merges that reclaim more deletions // are favored. Higher values will more aggressively target // merges that reclaim deletions, but be careful not to go so high // that way too much merging takes place; a value of 3.0 is // probably nearly too high. A value of 0.0 means deletions don't // impact merge selection. ReclaimDeletesWeight float64 // Optional, defaults to mergeplan.CalcBudget(). CalcBudget func(totalSize int64, firstTierSize int64, o *MergePlanOptions) (budgetNumSegments int) // Optional, defaults to mergeplan.ScoreSegments(). ScoreSegments func(segments []Segment, o *MergePlanOptions) float64 // Optional. Logger func(string) } // Returns the higher of the input or FloorSegmentSize. func (o *MergePlanOptions) RaiseToFloorSegmentSize(s int64) int64 { if s > o.FloorSegmentSize { return s } return o.FloorSegmentSize } // Suggested default options. var DefaultMergePlanOptions = MergePlanOptions{ MaxSegmentsPerTier: 10, MaxSegmentSize: 5000000, SegmentsPerMergeTask: 10, FloorSegmentSize: 2000, ReclaimDeletesWeight: 2.0, } // ------------------------------------------- func plan(segmentsIn []Segment, o *MergePlanOptions) (*MergePlan, error) { if len(segmentsIn) <= 1 { return nil, nil } if o == nil { o = &DefaultMergePlanOptions } segments := append([]Segment(nil), segmentsIn...) // Copy. sort.Sort(byLiveSizeDescending(segments)) var segmentsLiveSize int64 var minLiveSize int64 = math.MaxInt64 var eligibles []Segment var eligiblesLiveSize int64 for _, segment := range segments { segmentsLiveSize += segment.LiveSize() if minLiveSize > segment.LiveSize() { minLiveSize = segment.LiveSize() } // Only small-enough segments are eligible. if segment.LiveSize() < o.MaxSegmentSize/2 { eligibles = append(eligibles, segment) eligiblesLiveSize += segment.LiveSize() } } minLiveSize = o.RaiseToFloorSegmentSize(minLiveSize) calcBudget := o.CalcBudget if calcBudget == nil { calcBudget = CalcBudget } budgetNumSegments := CalcBudget(eligiblesLiveSize, minLiveSize, o) scoreSegments := o.ScoreSegments if scoreSegments == nil { scoreSegments = ScoreSegments } rv := &MergePlan{} // While we’re over budget, keep looping, which might produce // another MergeTask. for len(eligibles) > budgetNumSegments { // Track a current best roster as we examine and score // potential rosters of merges. var bestRoster []Segment var bestRosterScore float64 // Lower score is better. for startIdx := 0; startIdx < len(eligibles)-o.SegmentsPerMergeTask; startIdx++ { var roster []Segment var rosterLiveSize int64 for idx := startIdx; idx < len(eligibles) && len(roster) < o.SegmentsPerMergeTask; idx++ { eligible := eligibles[idx] if rosterLiveSize+eligible.LiveSize() < o.MaxSegmentSize { roster = append(roster, eligible) rosterLiveSize += eligible.LiveSize() } } if len(roster) > 0 { rosterScore := scoreSegments(roster, o) if len(bestRoster) <= 0 || rosterScore < bestRosterScore { bestRoster = roster bestRosterScore = rosterScore } } } if len(bestRoster) <= 0 { return rv, nil } rv.Tasks = append(rv.Tasks, &MergeTask{Segments: bestRoster}) eligibles = removeSegments(eligibles, bestRoster) } return rv, nil } // Compute the number of segments that would be needed to cover the // totalSize, by climbing up a logarithmic staircase of segment tiers. func CalcBudget(totalSize int64, firstTierSize int64, o *MergePlanOptions) ( budgetNumSegments int) { tierSize := firstTierSize if tierSize < 1 { tierSize = 1 } maxSegmentsPerTier := o.MaxSegmentsPerTier if maxSegmentsPerTier < 1 { maxSegmentsPerTier = 1 } segmentsPerMergeTask := int64(o.SegmentsPerMergeTask) if segmentsPerMergeTask < 2 { segmentsPerMergeTask = 2 } for totalSize > 0 { segmentsInTier := float64(totalSize) / float64(tierSize) if segmentsInTier < float64(maxSegmentsPerTier) { budgetNumSegments += int(math.Ceil(segmentsInTier)) break } budgetNumSegments += maxSegmentsPerTier totalSize -= int64(maxSegmentsPerTier) * tierSize tierSize *= segmentsPerMergeTask } return budgetNumSegments } // Of note, removeSegments() keeps the ordering of the results stable. func removeSegments(segments []Segment, toRemove []Segment) []Segment { rv := make([]Segment, 0, len(segments)-len(toRemove)) OUTER: for _, segment := range segments { for _, r := range toRemove { if segment == r { continue OUTER } } rv = append(rv, segment) } return rv } // Smaller result score is better. func ScoreSegments(segments []Segment, o *MergePlanOptions) float64 { var totBeforeSize int64 var totAfterSize int64 var totAfterSizeFloored int64 for _, segment := range segments { totBeforeSize += segment.FullSize() totAfterSize += segment.LiveSize() totAfterSizeFloored += o.RaiseToFloorSegmentSize(segment.LiveSize()) } // Roughly guess the "balance" of the segments -- whether the // segments are about the same size. balance := float64(o.RaiseToFloorSegmentSize(segments[0].LiveSize())) / float64(totAfterSizeFloored) // Gently favor smaller merges over bigger ones. We don't want to // make the exponent too large else we end up with poor merges of // small segments in order to avoid the large merges. score := balance * math.Pow(float64(totAfterSize), 0.05) // Strongly favor merges that reclaim deletes. nonDelRatio := float64(totAfterSize) / float64(totBeforeSize) score *= math.Pow(nonDelRatio, o.ReclaimDeletesWeight) return score }