diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 61a266ad..b19c1205 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -34,36 +34,40 @@ import ( var DefaultChunkFactor uint32 = 1024 +// Arbitrary number, need to make it configurable. +// Lower values like 10/making persister really slow +// doesn't work well as it is creating more files to +// persist for in next persist iteration and spikes the # FDs. +// Ideal value should let persister also proceed at +// an optimum pace so that the merger can skip +// many intermediate snapshots. +// This needs to be based on empirical data. +// With high segment count with snapshots, +// doubtful on the effectiveness of this approach. +var epochDistance = uint64(100) + type notificationChan chan struct{} func (s *Scorch) persisterLoop() { defer s.asyncTasks.Done() var persistWatchers []*epochWatcher - var lastPersistedEpoch uint64 - - notifyWatchers := func() { - var watchersNext []*epochWatcher - for _, w := range persistWatchers { - if w.epoch < lastPersistedEpoch { - close(w.notifyCh) - } else { - watchersNext = append(watchersNext, w) - } - } - persistWatchers = watchersNext - } - + var lastPersistedEpoch, lastMergedEpoch uint64 + var ew *epochWatcher OUTER: for { select { case <-s.closeCh: break OUTER - case ew := <-s.persisterNotifier: + case ew = <-s.persisterNotifier: persistWatchers = append(persistWatchers, ew) - notifyWatchers() default: } + if ew != nil { + lastMergedEpoch = ew.epoch + persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch, + &lastMergedEpoch, persistWatchers) + } var ourSnapshot *IndexSnapshot var ourPersisted []chan error @@ -136,16 +140,58 @@ OUTER: case <-w.notifyCh: // woken up, next loop should pick up work continue OUTER - case ew := <-s.persisterNotifier: + case ew = <-s.persisterNotifier: // if the watchers are already caught up then let them wait, // else let them continue to do the catch up persistWatchers = append(persistWatchers, ew) - notifyWatchers() } } } -func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { +func notifyMergeWatchers(lastPersistedEpoch uint64, + persistWatchers []*epochWatcher) []*epochWatcher { + var watchersNext []*epochWatcher + for _, w := range persistWatchers { + if w.epoch < lastPersistedEpoch { + close(w.notifyCh) + } else { + watchersNext = append(watchersNext, w) + } + } + return watchersNext +} + +func (s *Scorch) pausePersisterForMergerCatchUp(lastPersistedEpoch uint64, lastMergedEpoch *uint64, + persistWatchers []*epochWatcher) []*epochWatcher { + +OUTER: + for { + + // first, let the watchers proceed if they lag behind + persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers) + + // check for slow merger and pause persister until merger catch up + if lastPersistedEpoch > *lastMergedEpoch && + lastPersistedEpoch-*lastMergedEpoch > epochDistance { + + select { + case <-s.closeCh: + break OUTER + case ew := <-s.persisterNotifier: + persistWatchers = append(persistWatchers, ew) + *lastMergedEpoch = ew.epoch + log.Printf("persister waiting as lastPersistedEpoch->%d merger epoch->%d", lastPersistedEpoch, *lastMergedEpoch) + continue OUTER + } + } else { + break OUTER + } + } + + return persistWatchers +} + +func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { // start a write transaction tx, err := s.rootBolt.Begin(true) if err != nil {