0
0
Fork 0

adding empty segment handling during introduction

cleaning up the segment live size check
This commit is contained in:
Sreekanth Sivasankaran 2018-02-16 13:56:57 +05:30
parent 606a270669
commit 683e195ac4
1 changed files with 41 additions and 13 deletions

View File

@ -100,8 +100,8 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
// prepare new index snapshot
newSnapshot := &IndexSnapshot{
parent: s,
segment: make([]*SegmentSnapshot, nsegs, nsegs+1),
offsets: make([]uint64, nsegs, nsegs+1),
segment: make([]*SegmentSnapshot, 0, nsegs+1),
offsets: make([]uint64, 0, nsegs+1),
internal: make(map[string][]byte, len(s.root.internal)),
epoch: s.nextSnapshotEpoch,
refs: 1,
@ -124,24 +124,29 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
return err
}
}
newSnapshot.segment[i] = &SegmentSnapshot{
newss := &SegmentSnapshot{
id: s.root.segment[i].id,
segment: s.root.segment[i].segment,
cachedDocs: s.root.segment[i].cachedDocs,
}
s.root.segment[i].segment.AddRef()
// apply new obsoletions
if s.root.segment[i].deleted == nil {
newSnapshot.segment[i].deleted = delta
newss.deleted = delta
} else {
newSnapshot.segment[i].deleted = roaring.Or(s.root.segment[i].deleted, delta)
newss.deleted = roaring.Or(s.root.segment[i].deleted, delta)
}
// check for live size before copying
if newss.LiveSize() > 0 {
newSnapshot.segment = append(newSnapshot.segment, newss)
s.root.segment[i].segment.AddRef()
newSnapshot.offsets = append(newSnapshot.offsets, running)
running += s.root.segment[i].Count()
}
newSnapshot.offsets[i] = running
running += s.root.segment[i].Count()
}
// append new segment, if any, to end of the new index snapshot
if next.data != nil {
newSegmentSnapshot := &SegmentSnapshot{
@ -230,7 +235,13 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
newSegmentDeleted.Add(uint32(newDocNum))
}
}
} else {
// clean up the old segment map to figure out the
// obsolete segments wrt root in meantime, whatever
// segments left behind in old map after processing
// the root segments would be the obsolete segment set
delete(nextMerge.old, segmentID)
} else if s.root.segment[i].LiveSize() > 0 {
// this segment is staying
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
id: s.root.segment[i].id,
@ -244,7 +255,24 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
}
}
if nextMerge.new != nil {
// before the newMerge introduction, need to clean the newly
// merged segment wrt the current root segments, hence
// applying the obsolete segment contents to newly merged segment
for segID, ss := range nextMerge.old {
obsoleted := ss.DocNumbersLive()
if obsoleted != nil {
obsoletedIter := obsoleted.Iterator()
for obsoletedIter.HasNext() {
oldDocNum := obsoletedIter.Next()
newDocNum := nextMerge.oldNewDocNums[segID][oldDocNum]
newSegmentDeleted.Add(uint32(newDocNum))
}
}
}
// In case where all the docs in the newly merged segment getting
// deleted by the time we reach here, can skip the introduction.
if nextMerge.new != nil &&
nextMerge.new.Count() > newSegmentDeleted.GetCardinality() {
// put new segment at end
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
id: nextMerge.id,