0
0
Fork 0

Merge pull request #777 from sreekanth-cb/persister_pause

pausing persister until merging catches up
This commit is contained in:
Marty Schoch 2018-02-26 14:36:07 -05:00 committed by GitHub
commit eca31dfd27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 59 additions and 18 deletions

View File

@ -34,36 +34,40 @@ import (
var DefaultChunkFactor uint32 = 1024
// Arbitrary number, need to make it configurable.
// Lower values like 10/making persister really slow
// doesn't work well as it is creating more files to
// persist for in next persist iteration and spikes the # FDs.
// Ideal value should let persister also proceed at
// an optimum pace so that the merger can skip
// many intermediate snapshots.
// This needs to be based on empirical data.
// TODO - may need to revisit this approach/value.
var epochDistance = uint64(5)
type notificationChan chan struct{}
func (s *Scorch) persisterLoop() {
defer s.asyncTasks.Done()
var persistWatchers []*epochWatcher
var lastPersistedEpoch uint64
notifyWatchers := func() {
var watchersNext []*epochWatcher
for _, w := range persistWatchers {
if w.epoch < lastPersistedEpoch {
close(w.notifyCh)
} else {
watchersNext = append(watchersNext, w)
}
}
persistWatchers = watchersNext
}
var lastPersistedEpoch, lastMergedEpoch uint64
var ew *epochWatcher
OUTER:
for {
select {
case <-s.closeCh:
break OUTER
case ew := <-s.persisterNotifier:
case ew = <-s.persisterNotifier:
persistWatchers = append(persistWatchers, ew)
notifyWatchers()
default:
}
if ew != nil && ew.epoch > lastMergedEpoch {
lastMergedEpoch = ew.epoch
}
persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch,
&lastMergedEpoch, persistWatchers)
var ourSnapshot *IndexSnapshot
var ourPersisted []chan error
@ -136,15 +140,52 @@ OUTER:
case <-w.notifyCh:
// woken up, next loop should pick up work
continue OUTER
case ew := <-s.persisterNotifier:
case ew = <-s.persisterNotifier:
// if the watchers are already caught up then let them wait,
// else let them continue to do the catch up
persistWatchers = append(persistWatchers, ew)
notifyWatchers()
}
}
}
func notifyMergeWatchers(lastPersistedEpoch uint64,
persistWatchers []*epochWatcher) []*epochWatcher {
var watchersNext []*epochWatcher
for _, w := range persistWatchers {
if w.epoch < lastPersistedEpoch {
close(w.notifyCh)
} else {
watchersNext = append(watchersNext, w)
}
}
return watchersNext
}
func (s *Scorch) pausePersisterForMergerCatchUp(lastPersistedEpoch uint64, lastMergedEpoch *uint64,
persistWatchers []*epochWatcher) []*epochWatcher {
// first, let the watchers proceed if they lag behind
persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers)
OUTER:
// check for slow merger and await until the merger catch up
for lastPersistedEpoch > *lastMergedEpoch+epochDistance {
select {
case <-s.closeCh:
break OUTER
case ew := <-s.persisterNotifier:
persistWatchers = append(persistWatchers, ew)
*lastMergedEpoch = ew.epoch
}
// let the watchers proceed if they lag behind
persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers)
}
return persistWatchers
}
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
persisted, err := s.persistSnapshotMaybeMerge(snapshot)
if err != nil {