Merge pull request #751 from sreekanth-cb/merger_persister_handshake_fix
fix for merger persister handshake stalemate
This commit is contained in:
commit
1af90936c4
|
@ -58,44 +58,24 @@ OUTER:
|
|||
_ = ourSnapshot.DecRef()
|
||||
|
||||
// tell the persister we're waiting for changes
|
||||
// first make a notification chan
|
||||
notifyUs := make(notificationChan)
|
||||
// first make a epochWatcher chan
|
||||
ew := &epochWatcher{
|
||||
epoch: lastEpochMergePlanned,
|
||||
notifyCh: make(notificationChan, 1),
|
||||
}
|
||||
|
||||
// give it to the persister
|
||||
select {
|
||||
case <-s.closeCh:
|
||||
break OUTER
|
||||
case s.persisterNotifier <- notifyUs:
|
||||
case s.persisterNotifier <- ew:
|
||||
}
|
||||
|
||||
// check again
|
||||
s.rootLock.RLock()
|
||||
ourSnapshot = s.root
|
||||
ourSnapshot.AddRef()
|
||||
s.rootLock.RUnlock()
|
||||
|
||||
if ourSnapshot.epoch != lastEpochMergePlanned {
|
||||
startTime := time.Now()
|
||||
|
||||
// lets get started
|
||||
err := s.planMergeAtSnapshot(ourSnapshot)
|
||||
if err != nil {
|
||||
s.fireAsyncError(fmt.Errorf("merging err: %v", err))
|
||||
_ = ourSnapshot.DecRef()
|
||||
continue OUTER
|
||||
}
|
||||
lastEpochMergePlanned = ourSnapshot.epoch
|
||||
|
||||
s.fireEvent(EventKindMergerProgress, time.Since(startTime))
|
||||
}
|
||||
_ = ourSnapshot.DecRef()
|
||||
|
||||
// now wait for it (but also detect close)
|
||||
// now wait for persister (but also detect close)
|
||||
select {
|
||||
case <-s.closeCh:
|
||||
break OUTER
|
||||
case <-notifyUs:
|
||||
// woken up, next loop should pick up work
|
||||
case <-ew.notifyCh:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,15 +39,29 @@ type notificationChan chan struct{}
|
|||
func (s *Scorch) persisterLoop() {
|
||||
defer s.asyncTasks.Done()
|
||||
|
||||
var notifyChs []notificationChan
|
||||
var persistWatchers []*epochWatcher
|
||||
var lastPersistedEpoch uint64
|
||||
|
||||
notifyWatchers := func() {
|
||||
var watchersNext []*epochWatcher
|
||||
for _, w := range persistWatchers {
|
||||
if w.epoch < lastPersistedEpoch {
|
||||
close(w.notifyCh)
|
||||
} else {
|
||||
watchersNext = append(watchersNext, w)
|
||||
}
|
||||
}
|
||||
persistWatchers = watchersNext
|
||||
}
|
||||
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case <-s.closeCh:
|
||||
break OUTER
|
||||
case notifyCh := <-s.persisterNotifier:
|
||||
notifyChs = append(notifyChs, notifyCh)
|
||||
case ew := <-s.persisterNotifier:
|
||||
persistWatchers = append(persistWatchers, ew)
|
||||
notifyWatchers()
|
||||
default:
|
||||
}
|
||||
|
||||
|
@ -81,10 +95,11 @@ OUTER:
|
|||
}
|
||||
|
||||
lastPersistedEpoch = ourSnapshot.epoch
|
||||
for _, notifyCh := range notifyChs {
|
||||
close(notifyCh)
|
||||
for _, ew := range persistWatchers {
|
||||
close(ew.notifyCh)
|
||||
}
|
||||
notifyChs = nil
|
||||
|
||||
persistWatchers = nil
|
||||
_ = ourSnapshot.DecRef()
|
||||
|
||||
changed := false
|
||||
|
@ -120,6 +135,12 @@ OUTER:
|
|||
break OUTER
|
||||
case <-w.notifyCh:
|
||||
// woken up, next loop should pick up work
|
||||
continue OUTER
|
||||
case ew := <-s.persisterNotifier:
|
||||
// if the watchers are already caught up then let them wait,
|
||||
// else let them continue to do the catch up
|
||||
persistWatchers = append(persistWatchers, ew)
|
||||
notifyWatchers()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,7 +61,7 @@ type Scorch struct {
|
|||
merges chan *segmentMerge
|
||||
introducerNotifier chan *epochWatcher
|
||||
revertToSnapshots chan *snapshotReversion
|
||||
persisterNotifier chan notificationChan
|
||||
persisterNotifier chan *epochWatcher
|
||||
rootBolt *bolt.DB
|
||||
asyncTasks sync.WaitGroup
|
||||
|
||||
|
@ -176,7 +176,7 @@ func (s *Scorch) openBolt() error {
|
|||
s.merges = make(chan *segmentMerge)
|
||||
s.introducerNotifier = make(chan *epochWatcher, 1)
|
||||
s.revertToSnapshots = make(chan *snapshotReversion)
|
||||
s.persisterNotifier = make(chan notificationChan)
|
||||
s.persisterNotifier = make(chan *epochWatcher, 1)
|
||||
|
||||
if !s.readOnly && s.path != "" {
|
||||
err := s.removeOldZapFiles() // Before persister or merger create any new files.
|
||||
|
|
Loading…
Reference in New Issue