fix for merger persister handshake stalemate
The slow merger was lagging behind the fast persister to a persister notify send-loop while the persister awaits for any new introductions from introducer totally blocking the merger This fix along with the deleted files eligibilty flipping makes the file count to around 6 to 11 files per shard for both travel and beer samples
This commit is contained in:
parent
ff210fbc6d
commit
feecce1eb2
|
@ -58,44 +58,24 @@ OUTER:
|
||||||
_ = ourSnapshot.DecRef()
|
_ = ourSnapshot.DecRef()
|
||||||
|
|
||||||
// tell the persister we're waiting for changes
|
// tell the persister we're waiting for changes
|
||||||
// first make a notification chan
|
// first make a epochWatcher chan
|
||||||
notifyUs := make(notificationChan)
|
ew := &epochWatcher{
|
||||||
|
epoch: lastEpochMergePlanned,
|
||||||
|
notifyCh: make(notificationChan, 1),
|
||||||
|
}
|
||||||
|
|
||||||
// give it to the persister
|
// give it to the persister
|
||||||
select {
|
select {
|
||||||
case <-s.closeCh:
|
case <-s.closeCh:
|
||||||
break OUTER
|
break OUTER
|
||||||
case s.persisterNotifier <- notifyUs:
|
case s.persisterNotifier <- ew:
|
||||||
}
|
}
|
||||||
|
|
||||||
// check again
|
// now wait for persister (but also detect close)
|
||||||
s.rootLock.RLock()
|
|
||||||
ourSnapshot = s.root
|
|
||||||
ourSnapshot.AddRef()
|
|
||||||
s.rootLock.RUnlock()
|
|
||||||
|
|
||||||
if ourSnapshot.epoch != lastEpochMergePlanned {
|
|
||||||
startTime := time.Now()
|
|
||||||
|
|
||||||
// lets get started
|
|
||||||
err := s.planMergeAtSnapshot(ourSnapshot)
|
|
||||||
if err != nil {
|
|
||||||
s.fireAsyncError(fmt.Errorf("merging err: %v", err))
|
|
||||||
_ = ourSnapshot.DecRef()
|
|
||||||
continue OUTER
|
|
||||||
}
|
|
||||||
lastEpochMergePlanned = ourSnapshot.epoch
|
|
||||||
|
|
||||||
s.fireEvent(EventKindMergerProgress, time.Since(startTime))
|
|
||||||
}
|
|
||||||
_ = ourSnapshot.DecRef()
|
|
||||||
|
|
||||||
// now wait for it (but also detect close)
|
|
||||||
select {
|
select {
|
||||||
case <-s.closeCh:
|
case <-s.closeCh:
|
||||||
break OUTER
|
break OUTER
|
||||||
case <-notifyUs:
|
case <-ew.notifyCh:
|
||||||
// woken up, next loop should pick up work
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,15 +39,29 @@ type notificationChan chan struct{}
|
||||||
func (s *Scorch) persisterLoop() {
|
func (s *Scorch) persisterLoop() {
|
||||||
defer s.asyncTasks.Done()
|
defer s.asyncTasks.Done()
|
||||||
|
|
||||||
var notifyChs []notificationChan
|
var persistWatchers []*epochWatcher
|
||||||
var lastPersistedEpoch uint64
|
var lastPersistedEpoch uint64
|
||||||
|
|
||||||
|
notifyWatchers := func() {
|
||||||
|
var watchersNext []*epochWatcher
|
||||||
|
for _, w := range persistWatchers {
|
||||||
|
if w.epoch < lastPersistedEpoch {
|
||||||
|
close(w.notifyCh)
|
||||||
|
} else {
|
||||||
|
watchersNext = append(watchersNext, w)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
persistWatchers = watchersNext
|
||||||
|
}
|
||||||
|
|
||||||
OUTER:
|
OUTER:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-s.closeCh:
|
case <-s.closeCh:
|
||||||
break OUTER
|
break OUTER
|
||||||
case notifyCh := <-s.persisterNotifier:
|
case ew := <-s.persisterNotifier:
|
||||||
notifyChs = append(notifyChs, notifyCh)
|
persistWatchers = append(persistWatchers, ew)
|
||||||
|
notifyWatchers()
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,10 +95,11 @@ OUTER:
|
||||||
}
|
}
|
||||||
|
|
||||||
lastPersistedEpoch = ourSnapshot.epoch
|
lastPersistedEpoch = ourSnapshot.epoch
|
||||||
for _, notifyCh := range notifyChs {
|
for _, ew := range persistWatchers {
|
||||||
close(notifyCh)
|
close(ew.notifyCh)
|
||||||
}
|
}
|
||||||
notifyChs = nil
|
|
||||||
|
persistWatchers = nil
|
||||||
_ = ourSnapshot.DecRef()
|
_ = ourSnapshot.DecRef()
|
||||||
|
|
||||||
changed := false
|
changed := false
|
||||||
|
@ -120,6 +135,12 @@ OUTER:
|
||||||
break OUTER
|
break OUTER
|
||||||
case <-w.notifyCh:
|
case <-w.notifyCh:
|
||||||
// woken up, next loop should pick up work
|
// woken up, next loop should pick up work
|
||||||
|
continue OUTER
|
||||||
|
case ew := <-s.persisterNotifier:
|
||||||
|
// if the watchers are already caught up then let them wait,
|
||||||
|
// else let them continue to do the catch up
|
||||||
|
persistWatchers = append(persistWatchers, ew)
|
||||||
|
notifyWatchers()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,7 +61,7 @@ type Scorch struct {
|
||||||
merges chan *segmentMerge
|
merges chan *segmentMerge
|
||||||
introducerNotifier chan *epochWatcher
|
introducerNotifier chan *epochWatcher
|
||||||
revertToSnapshots chan *snapshotReversion
|
revertToSnapshots chan *snapshotReversion
|
||||||
persisterNotifier chan notificationChan
|
persisterNotifier chan *epochWatcher
|
||||||
rootBolt *bolt.DB
|
rootBolt *bolt.DB
|
||||||
asyncTasks sync.WaitGroup
|
asyncTasks sync.WaitGroup
|
||||||
|
|
||||||
|
@ -156,7 +156,7 @@ func (s *Scorch) Open() error {
|
||||||
s.merges = make(chan *segmentMerge)
|
s.merges = make(chan *segmentMerge)
|
||||||
s.introducerNotifier = make(chan *epochWatcher, 1)
|
s.introducerNotifier = make(chan *epochWatcher, 1)
|
||||||
s.revertToSnapshots = make(chan *snapshotReversion)
|
s.revertToSnapshots = make(chan *snapshotReversion)
|
||||||
s.persisterNotifier = make(chan notificationChan)
|
s.persisterNotifier = make(chan *epochWatcher, 1)
|
||||||
|
|
||||||
if !s.readOnly && s.path != "" {
|
if !s.readOnly && s.path != "" {
|
||||||
err := s.removeOldZapFiles() // Before persister or merger create any new files.
|
err := s.removeOldZapFiles() // Before persister or merger create any new files.
|
||||||
|
|
Loading…
Reference in New Issue