0
0
Fork 0

lowering epochDistance to 5,

fixing the lastMergedEpoch value updates
This commit is contained in:
Sreekanth Sivasankaran 2018-02-21 17:25:14 +05:30
parent 35611f4287
commit a8ebf2a553
1 changed files with 6 additions and 7 deletions

View File

@ -44,7 +44,7 @@ var DefaultChunkFactor uint32 = 1024
// This needs to be based on empirical data.
// With high segment count with snapshots,
// doubtful on the effectiveness of this approach.
var epochDistance = uint64(100)
var epochDistance = uint64(5)
type notificationChan chan struct{}
@ -63,11 +63,12 @@ OUTER:
persistWatchers = append(persistWatchers, ew)
default:
}
if ew != nil {
if ew != nil && ew.epoch > lastMergedEpoch {
lastMergedEpoch = ew.epoch
persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch,
&lastMergedEpoch, persistWatchers)
}
persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch,
&lastMergedEpoch, persistWatchers)
var ourSnapshot *IndexSnapshot
var ourPersisted []chan error
@ -171,8 +172,7 @@ OUTER:
persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers)
// check for slow merger and pause persister until merger catch up
if lastPersistedEpoch > *lastMergedEpoch &&
lastPersistedEpoch-*lastMergedEpoch > epochDistance {
if lastPersistedEpoch > *lastMergedEpoch+epochDistance {
select {
case <-s.closeCh:
@ -180,7 +180,6 @@ OUTER:
case ew := <-s.persisterNotifier:
persistWatchers = append(persistWatchers, ew)
*lastMergedEpoch = ew.epoch
log.Printf("persister waiting as lastPersistedEpoch->%d merger epoch->%d", lastPersistedEpoch, *lastMergedEpoch)
continue OUTER
}
} else {