From 18cfcd11d102a83271ab1d96b9035bdfe0e3f019 Mon Sep 17 00:00:00 2001 From: abhinavdangeti Date: Wed, 21 Mar 2018 12:12:49 -0700 Subject: [PATCH] MB-28782: Error handling in merger/persister when index is closed When the index is closed, do not fire an AsyncError (fatal) from either the merger or the persister that is actively working. This is quite a probable situation, so exit the loop within the goroutine. --- index/scorch/merge.go | 14 +++++++++----- index/scorch/persister.go | 5 +++++ 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 41086ad3..42b5e950 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -17,7 +17,6 @@ package scorch import ( "bytes" "encoding/json" - "fmt" "os" "sync/atomic" @@ -59,6 +58,11 @@ OUTER: // lets get started err := s.planMergeAtSnapshot(ourSnapshot, mergePlannerOptions) if err != nil { + if err == ErrClosed { + // index has been closed + _ = ourSnapshot.DecRef() + break OUTER + } s.fireAsyncError(fmt.Errorf("merging err: %v", err)) _ = ourSnapshot.DecRef() atomic.AddUint64(&s.stats.TotFileMergeLoopErr, 1) @@ -231,7 +235,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot, select { case <-s.closeCh: _ = segment.Close() - return nil + return ErrClosed case s.merges <- sm: atomic.AddUint64(&s.stats.TotFileMergeIntroductions, 1) } @@ -242,7 +246,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot, for _, notification := range notifications { select { case <-s.closeCh: - return nil + return ErrClosed case newSnapshot := <-notification: atomic.AddUint64(&s.stats.TotFileMergeIntroductionsDone, 1) if newSnapshot != nil { @@ -338,13 +342,13 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot, select { // send to introducer case <-s.closeCh: _ = segment.DecRef() - return 0, nil, 0, nil // TODO: return ErrInterruptedClosed? + return 0, nil, 0, ErrClosed case s.merges <- sm: } select { // wait for introduction to complete case <-s.closeCh: - return 0, nil, 0, nil // TODO: return ErrInterruptedClosed? + return 0, nil, 0, ErrClosed case newSnapshot := <-sm.notify: atomic.AddUint64(&s.stats.TotMemMergeSegments, uint64(len(sbs))) atomic.AddUint64(&s.stats.TotMemMergeDone, 1) diff --git a/index/scorch/persister.go b/index/scorch/persister.go index dda4bdfb..2fab5324 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -94,6 +94,11 @@ OUTER: close(ch) } if err != nil { + if err == ErrClosed { + // index has been closed + _ = ourSnapshot.DecRef() + break OUTER + } s.fireAsyncError(fmt.Errorf("got err persisting snapshot: %v", err)) _ = ourSnapshot.DecRef() atomic.AddUint64(&s.stats.TotPersistLoopErr, 1)