0
0
Fork 0

Merge pull request #771 from sreekanth-cb/merge_handling_empty_seg_tasks

Fix for empty segment merge handling
This commit is contained in:
Sreekanth Sivasankaran 2018-02-24 10:48:31 +05:30 committed by GitHub
commit 4109e327ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 89 additions and 39 deletions

View File

@ -100,8 +100,8 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
// prepare new index snapshot
newSnapshot := &IndexSnapshot{
parent: s,
segment: make([]*SegmentSnapshot, nsegs, nsegs+1),
offsets: make([]uint64, nsegs, nsegs+1),
segment: make([]*SegmentSnapshot, 0, nsegs+1),
offsets: make([]uint64, 0, nsegs+1),
internal: make(map[string][]byte, len(s.root.internal)),
epoch: s.nextSnapshotEpoch,
refs: 1,
@ -124,24 +124,29 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
return err
}
}
newSnapshot.segment[i] = &SegmentSnapshot{
newss := &SegmentSnapshot{
id: s.root.segment[i].id,
segment: s.root.segment[i].segment,
cachedDocs: s.root.segment[i].cachedDocs,
}
s.root.segment[i].segment.AddRef()
// apply new obsoletions
if s.root.segment[i].deleted == nil {
newSnapshot.segment[i].deleted = delta
newss.deleted = delta
} else {
newSnapshot.segment[i].deleted = roaring.Or(s.root.segment[i].deleted, delta)
newss.deleted = roaring.Or(s.root.segment[i].deleted, delta)
}
// check for live size before copying
if newss.LiveSize() > 0 {
newSnapshot.segment = append(newSnapshot.segment, newss)
s.root.segment[i].segment.AddRef()
newSnapshot.offsets = append(newSnapshot.offsets, running)
running += s.root.segment[i].Count()
}
newSnapshot.offsets[i] = running
running += s.root.segment[i].Count()
}
// append new segment, if any, to end of the new index snapshot
if next.data != nil {
newSegmentSnapshot := &SegmentSnapshot{
@ -193,6 +198,12 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
// prepare new index snapshot
currSize := len(s.root.segment)
newSize := currSize + 1 - len(nextMerge.old)
// empty segments deletion
if nextMerge.new == nil {
newSize--
}
newSnapshot := &IndexSnapshot{
parent: s,
segment: make([]*SegmentSnapshot, 0, newSize),
@ -210,7 +221,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
segmentID := s.root.segment[i].id
if segSnapAtMerge, ok := nextMerge.old[segmentID]; ok {
// this segment is going away, see if anything else was deleted since we started the merge
if s.root.segment[i].deleted != nil {
if segSnapAtMerge != nil && s.root.segment[i].deleted != nil {
// assume all these deletes are new
deletedSince := s.root.segment[i].deleted
// if we already knew about some of them, remove
@ -224,7 +235,13 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
newSegmentDeleted.Add(uint32(newDocNum))
}
}
} else {
// clean up the old segment map to figure out the
// obsolete segments wrt root in meantime, whatever
// segments left behind in old map after processing
// the root segments would be the obsolete segment set
delete(nextMerge.old, segmentID)
} else if s.root.segment[i].LiveSize() > 0 {
// this segment is staying
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
id: s.root.segment[i].id,
@ -238,14 +255,33 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
}
}
// put new segment at end
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
id: nextMerge.id,
segment: nextMerge.new, // take ownership for nextMerge.new's ref-count
deleted: newSegmentDeleted,
cachedDocs: &cachedDocs{cache: nil},
})
newSnapshot.offsets = append(newSnapshot.offsets, running)
// before the newMerge introduction, need to clean the newly
// merged segment wrt the current root segments, hence
// applying the obsolete segment contents to newly merged segment
for segID, ss := range nextMerge.old {
obsoleted := ss.DocNumbersLive()
if obsoleted != nil {
obsoletedIter := obsoleted.Iterator()
for obsoletedIter.HasNext() {
oldDocNum := obsoletedIter.Next()
newDocNum := nextMerge.oldNewDocNums[segID][oldDocNum]
newSegmentDeleted.Add(uint32(newDocNum))
}
}
}
// In case where all the docs in the newly merged segment getting
// deleted by the time we reach here, can skip the introduction.
if nextMerge.new != nil &&
nextMerge.new.Count() > newSegmentDeleted.GetCardinality() {
// put new segment at end
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
id: nextMerge.id,
segment: nextMerge.new, // take ownership for nextMerge.new's ref-count
deleted: newSegmentDeleted,
cachedDocs: &cachedDocs{cache: nil},
})
newSnapshot.offsets = append(newSnapshot.offsets, running)
}
newSnapshot.AddRef() // 1 ref for the nextMerge.notify response

View File

@ -105,6 +105,10 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
// process tasks in serial for now
var notifications []chan *IndexSnapshot
for _, task := range resultMergePlan.Tasks {
if len(task.Segments) == 0 {
continue
}
oldMap := make(map[uint64]*SegmentSnapshot)
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
segmentsToMerge := make([]*zap.Segment, 0, len(task.Segments))
@ -113,36 +117,46 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
if segSnapshot, ok := planSegment.(*SegmentSnapshot); ok {
oldMap[segSnapshot.id] = segSnapshot
if zapSeg, ok := segSnapshot.segment.(*zap.Segment); ok {
segmentsToMerge = append(segmentsToMerge, zapSeg)
docsToDrop = append(docsToDrop, segSnapshot.deleted)
if segSnapshot.LiveSize() == 0 {
oldMap[segSnapshot.id] = nil
} else {
segmentsToMerge = append(segmentsToMerge, zapSeg)
docsToDrop = append(docsToDrop, segSnapshot.deleted)
}
}
}
}
filename := zapFileName(newSegmentID)
s.markIneligibleForRemoval(filename)
path := s.path + string(os.PathSeparator) + filename
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, DefaultChunkFactor)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
return fmt.Errorf("merging failed: %v", err)
}
segment, err := zap.Open(path)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
return err
var oldNewDocNums map[uint64][]uint64
var segment segment.Segment
if len(segmentsToMerge) > 0 {
filename := zapFileName(newSegmentID)
s.markIneligibleForRemoval(filename)
path := s.path + string(os.PathSeparator) + filename
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
return fmt.Errorf("merging failed: %v", err)
}
segment, err = zap.Open(path)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
return err
}
oldNewDocNums = make(map[uint64][]uint64)
for i, segNewDocNums := range newDocNums {
oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
}
}
sm := &segmentMerge{
id: newSegmentID,
old: oldMap,
oldNewDocNums: make(map[uint64][]uint64),
oldNewDocNums: oldNewDocNums,
new: segment,
notify: make(chan *IndexSnapshot, 1),
}
notifications = append(notifications, sm.notify)
for i, segNewDocNums := range newDocNums {
sm.oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
}
// give it to the introducer
select {