diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 5ded29b5..41abe065 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -58,44 +58,24 @@ OUTER: _ = ourSnapshot.DecRef() // tell the persister we're waiting for changes - // first make a notification chan - notifyUs := make(notificationChan) + // first make a epochWatcher chan + ew := &epochWatcher{ + epoch: lastEpochMergePlanned, + notifyCh: make(notificationChan, 1), + } // give it to the persister select { case <-s.closeCh: break OUTER - case s.persisterNotifier <- notifyUs: + case s.persisterNotifier <- ew: } - // check again - s.rootLock.RLock() - ourSnapshot = s.root - ourSnapshot.AddRef() - s.rootLock.RUnlock() - - if ourSnapshot.epoch != lastEpochMergePlanned { - startTime := time.Now() - - // lets get started - err := s.planMergeAtSnapshot(ourSnapshot) - if err != nil { - s.fireAsyncError(fmt.Errorf("merging err: %v", err)) - _ = ourSnapshot.DecRef() - continue OUTER - } - lastEpochMergePlanned = ourSnapshot.epoch - - s.fireEvent(EventKindMergerProgress, time.Since(startTime)) - } - _ = ourSnapshot.DecRef() - - // now wait for it (but also detect close) + // now wait for persister (but also detect close) select { case <-s.closeCh: break OUTER - case <-notifyUs: - // woken up, next loop should pick up work + case <-ew.notifyCh: } } } diff --git a/index/scorch/persister.go b/index/scorch/persister.go index cdcee37c..c9d91dc5 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -39,15 +39,29 @@ type notificationChan chan struct{} func (s *Scorch) persisterLoop() { defer s.asyncTasks.Done() - var notifyChs []notificationChan + var persistWatchers []*epochWatcher var lastPersistedEpoch uint64 + + notifyWatchers := func() { + var watchersNext []*epochWatcher + for _, w := range persistWatchers { + if w.epoch < lastPersistedEpoch { + close(w.notifyCh) + } else { + watchersNext = append(watchersNext, w) + } + } + persistWatchers = watchersNext + } + OUTER: for { select { case <-s.closeCh: break OUTER - case notifyCh := <-s.persisterNotifier: - notifyChs = append(notifyChs, notifyCh) + case ew := <-s.persisterNotifier: + persistWatchers = append(persistWatchers, ew) + notifyWatchers() default: } @@ -81,10 +95,11 @@ OUTER: } lastPersistedEpoch = ourSnapshot.epoch - for _, notifyCh := range notifyChs { - close(notifyCh) + for _, ew := range persistWatchers { + close(ew.notifyCh) } - notifyChs = nil + + persistWatchers = nil _ = ourSnapshot.DecRef() changed := false @@ -120,6 +135,12 @@ OUTER: break OUTER case <-w.notifyCh: // woken up, next loop should pick up work + continue OUTER + case ew := <-s.persisterNotifier: + // if the watchers are already caught up then let them wait, + // else let them continue to do the catch up + persistWatchers = append(persistWatchers, ew) + notifyWatchers() } } } diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 31107765..b4e06fc2 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -61,7 +61,7 @@ type Scorch struct { merges chan *segmentMerge introducerNotifier chan *epochWatcher revertToSnapshots chan *snapshotReversion - persisterNotifier chan notificationChan + persisterNotifier chan *epochWatcher rootBolt *bolt.DB asyncTasks sync.WaitGroup @@ -156,7 +156,7 @@ func (s *Scorch) Open() error { s.merges = make(chan *segmentMerge) s.introducerNotifier = make(chan *epochWatcher, 1) s.revertToSnapshots = make(chan *snapshotReversion) - s.persisterNotifier = make(chan notificationChan) + s.persisterNotifier = make(chan *epochWatcher, 1) if !s.readOnly && s.path != "" { err := s.removeOldZapFiles() // Before persister or merger create any new files.