diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 41086ad3..42b5e950 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -17,7 +17,6 @@ package scorch import ( "bytes" "encoding/json" - "fmt" "os" "sync/atomic" @@ -59,6 +58,11 @@ OUTER: // lets get started err := s.planMergeAtSnapshot(ourSnapshot, mergePlannerOptions) if err != nil { + if err == ErrClosed { + // index has been closed + _ = ourSnapshot.DecRef() + break OUTER + } s.fireAsyncError(fmt.Errorf("merging err: %v", err)) _ = ourSnapshot.DecRef() atomic.AddUint64(&s.stats.TotFileMergeLoopErr, 1) @@ -231,7 +235,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot, select { case <-s.closeCh: _ = segment.Close() - return nil + return ErrClosed case s.merges <- sm: atomic.AddUint64(&s.stats.TotFileMergeIntroductions, 1) } @@ -242,7 +246,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot, for _, notification := range notifications { select { case <-s.closeCh: - return nil + return ErrClosed case newSnapshot := <-notification: atomic.AddUint64(&s.stats.TotFileMergeIntroductionsDone, 1) if newSnapshot != nil { @@ -338,13 +342,13 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot, select { // send to introducer case <-s.closeCh: _ = segment.DecRef() - return 0, nil, 0, nil // TODO: return ErrInterruptedClosed? + return 0, nil, 0, ErrClosed case s.merges <- sm: } select { // wait for introduction to complete case <-s.closeCh: - return 0, nil, 0, nil // TODO: return ErrInterruptedClosed? + return 0, nil, 0, ErrClosed case newSnapshot := <-sm.notify: atomic.AddUint64(&s.stats.TotMemMergeSegments, uint64(len(sbs))) atomic.AddUint64(&s.stats.TotMemMergeDone, 1) diff --git a/index/scorch/persister.go b/index/scorch/persister.go index dda4bdfb..2fab5324 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -94,6 +94,11 @@ OUTER: close(ch) } if err != nil { + if err == ErrClosed { + // index has been closed + _ = ourSnapshot.DecRef() + break OUTER + } s.fireAsyncError(fmt.Errorf("got err persisting snapshot: %v", err)) _ = ourSnapshot.DecRef() atomic.AddUint64(&s.stats.TotPersistLoopErr, 1)