0
0
Fork 0

Fix for empty segment merge handling

Avoid creating new files with emtpy segments tasks
during the merge operation, skips the
incorrect appending of a newer segment during merge.
This commit is contained in:
Sreekanth Sivasankaran 2018-02-12 21:54:33 +05:30
parent ff210fbc6d
commit 606a270669
2 changed files with 49 additions and 27 deletions

View File

@ -193,6 +193,12 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
// prepare new index snapshot
currSize := len(s.root.segment)
newSize := currSize + 1 - len(nextMerge.old)
// empty segments deletion
if nextMerge.new == nil {
newSize--
}
newSnapshot := &IndexSnapshot{
parent: s,
segment: make([]*SegmentSnapshot, 0, newSize),
@ -210,7 +216,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
segmentID := s.root.segment[i].id
if segSnapAtMerge, ok := nextMerge.old[segmentID]; ok {
// this segment is going away, see if anything else was deleted since we started the merge
if s.root.segment[i].deleted != nil {
if segSnapAtMerge != nil && s.root.segment[i].deleted != nil {
// assume all these deletes are new
deletedSince := s.root.segment[i].deleted
// if we already knew about some of them, remove
@ -238,14 +244,16 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
}
}
// put new segment at end
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
id: nextMerge.id,
segment: nextMerge.new, // take ownership for nextMerge.new's ref-count
deleted: newSegmentDeleted,
cachedDocs: &cachedDocs{cache: nil},
})
newSnapshot.offsets = append(newSnapshot.offsets, running)
if nextMerge.new != nil {
// put new segment at end
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
id: nextMerge.id,
segment: nextMerge.new, // take ownership for nextMerge.new's ref-count
deleted: newSegmentDeleted,
cachedDocs: &cachedDocs{cache: nil},
})
newSnapshot.offsets = append(newSnapshot.offsets, running)
}
// swap in new segment
rootPrev := s.root

View File

@ -124,6 +124,10 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
// process tasks in serial for now
var notifications []notificationChan
for _, task := range resultMergePlan.Tasks {
if len(task.Segments) == 0 {
continue
}
oldMap := make(map[uint64]*SegmentSnapshot)
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
segmentsToMerge := make([]*zap.Segment, 0, len(task.Segments))
@ -132,36 +136,46 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
if segSnapshot, ok := planSegment.(*SegmentSnapshot); ok {
oldMap[segSnapshot.id] = segSnapshot
if zapSeg, ok := segSnapshot.segment.(*zap.Segment); ok {
segmentsToMerge = append(segmentsToMerge, zapSeg)
docsToDrop = append(docsToDrop, segSnapshot.deleted)
if segSnapshot.LiveSize() == 0 {
oldMap[segSnapshot.id] = nil
} else {
segmentsToMerge = append(segmentsToMerge, zapSeg)
docsToDrop = append(docsToDrop, segSnapshot.deleted)
}
}
}
}
filename := zapFileName(newSegmentID)
s.markIneligibleForRemoval(filename)
path := s.path + string(os.PathSeparator) + filename
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, DefaultChunkFactor)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
return fmt.Errorf("merging failed: %v", err)
}
segment, err := zap.Open(path)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
return err
var oldNewDocNums map[uint64][]uint64
var segment segment.Segment
if len(segmentsToMerge) > 0 {
filename := zapFileName(newSegmentID)
s.markIneligibleForRemoval(filename)
path := s.path + string(os.PathSeparator) + filename
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
return fmt.Errorf("merging failed: %v", err)
}
segment, err = zap.Open(path)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
return err
}
oldNewDocNums = make(map[uint64][]uint64)
for i, segNewDocNums := range newDocNums {
oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
}
}
sm := &segmentMerge{
id: newSegmentID,
old: oldMap,
oldNewDocNums: make(map[uint64][]uint64),
oldNewDocNums: oldNewDocNums,
new: segment,
notify: make(notificationChan),
}
notifications = append(notifications, sm.notify)
for i, segNewDocNums := range newDocNums {
sm.oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
}
// give it to the introducer
select {