From 6f5f90cd41720665b04e36805605638fef3120f9 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 7 Feb 2018 16:54:58 -0800 Subject: [PATCH] scorch zap segment cleanup handling for some edge cases Two cases in this commit... If we're shutting down, the merger might not have handed off its latest merged segment to the introducer yet, so the merger still owns the segment and needs to Close() that segment itself. In persistSnapshot(), there migth be cases where the persister might not be able to swap in its newly persisted segments -- so, the persistSnapshot() needs to Close() those segments itself. --- index/scorch/merge.go | 1 + index/scorch/persister.go | 15 ++++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 41abe065..fb4e80d2 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -146,6 +146,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error { // give it to the introducer select { case <-s.closeCh: + _ = segment.Close() return nil case s.merges <- sm: } diff --git a/index/scorch/persister.go b/index/scorch/persister.go index dab753d7..83909a88 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -241,14 +241,18 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { if len(newSegmentPaths) > 0 { // now try to open all the new snapshots newSegments := make(map[uint64]segment.Segment) + defer func() { + for _, s := range newSegments { + if s != nil { + // cleanup segments that were opened but not + // swapped into the new root + _ = s.Close() + } + } + }() for segmentID, path := range newSegmentPaths { newSegments[segmentID], err = zap.Open(path) if err != nil { - for _, s := range newSegments { - if s != nil { - _ = s.Close() // cleanup segments that were successfully opened - } - } return fmt.Errorf("error opening new segment at %s, %v", path, err) } } @@ -273,6 +277,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { cachedDocs: segmentSnapshot.cachedDocs, } newIndexSnapshot.segment[i] = newSegmentSnapshot + delete(newSegments, segmentSnapshot.id) // update items persisted incase of a new segment snapshot atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count()) } else {