diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 41abe065..fb4e80d2 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -146,6 +146,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error { // give it to the introducer select { case <-s.closeCh: + _ = segment.Close() return nil case s.merges <- sm: } diff --git a/index/scorch/persister.go b/index/scorch/persister.go index dab753d7..83909a88 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -241,14 +241,18 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { if len(newSegmentPaths) > 0 { // now try to open all the new snapshots newSegments := make(map[uint64]segment.Segment) + defer func() { + for _, s := range newSegments { + if s != nil { + // cleanup segments that were opened but not + // swapped into the new root + _ = s.Close() + } + } + }() for segmentID, path := range newSegmentPaths { newSegments[segmentID], err = zap.Open(path) if err != nil { - for _, s := range newSegments { - if s != nil { - _ = s.Close() // cleanup segments that were successfully opened - } - } return fmt.Errorf("error opening new segment at %s, %v", path, err) } } @@ -273,6 +277,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { cachedDocs: segmentSnapshot.cachedDocs, } newIndexSnapshot.segment[i] = newSegmentSnapshot + delete(newSegments, segmentSnapshot.id) // update items persisted incase of a new segment snapshot atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count()) } else {