0
0
Fork 0

scorch zap segment cleanup handling for some edge cases

Two cases in this commit...

If we're shutting down, the merger might not have handed off its
latest merged segment to the introducer yet, so the merger still owns
the segment and needs to Close() that segment itself.

In persistSnapshot(), there migth be cases where the persister might
not be able to swap in its newly persisted segments -- so, the
persistSnapshot() needs to Close() those segments itself.
This commit is contained in:
Steve Yen 2018-02-07 16:54:58 -08:00
parent 83272a9629
commit 6f5f90cd41
2 changed files with 11 additions and 5 deletions

View File

@ -146,6 +146,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
// give it to the introducer
select {
case <-s.closeCh:
_ = segment.Close()
return nil
case s.merges <- sm:
}

View File

@ -241,14 +241,18 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) {
if len(newSegmentPaths) > 0 {
// now try to open all the new snapshots
newSegments := make(map[uint64]segment.Segment)
defer func() {
for _, s := range newSegments {
if s != nil {
// cleanup segments that were opened but not
// swapped into the new root
_ = s.Close()
}
}
}()
for segmentID, path := range newSegmentPaths {
newSegments[segmentID], err = zap.Open(path)
if err != nil {
for _, s := range newSegments {
if s != nil {
_ = s.Close() // cleanup segments that were successfully opened
}
}
return fmt.Errorf("error opening new segment at %s, %v", path, err)
}
}
@ -273,6 +277,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) {
cachedDocs: segmentSnapshot.cachedDocs,
}
newIndexSnapshot.segment[i] = newSegmentSnapshot
delete(newSegments, segmentSnapshot.id)
// update items persisted incase of a new segment snapshot
atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count())
} else {