scorch fix persister for lost notifications on no-data batches
With the previous commit, there can be a scenario where batches that had internal-updates-only can be rapidly introduced by the app, but the persisted notifications on only the very last IndexSnapshot would be fired. The persisted notifications on the in-between batches might be missed. The solution was to track the persisted notification channels at a higher Scorch struct level, instead of tracking the persisted channels at the IndexSnapshot and SegmentSnapshot levels. Also, the persister double-check looping was simplified, which avoids a race where an introducer might incorrectly not notify the persister.
This commit is contained in:
parent
ecbb3d2df4
commit
34f5e2175f
|
@ -32,16 +32,21 @@ type segmentIntroduction struct {
|
|||
persisted chan error
|
||||
}
|
||||
|
||||
type epochWatcher struct {
|
||||
epoch uint64
|
||||
notifyCh notificationChan
|
||||
}
|
||||
|
||||
func (s *Scorch) mainLoop() {
|
||||
var notify notificationChan
|
||||
var epochWatchers []*epochWatcher
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case <-s.closeCh:
|
||||
break OUTER
|
||||
|
||||
case notify = <-s.introducerNotifier:
|
||||
continue
|
||||
case epochWatcher := <-s.introducerNotifier:
|
||||
epochWatchers = append(epochWatchers, epochWatcher)
|
||||
|
||||
case nextMerge := <-s.merges:
|
||||
s.introduceMerge(nextMerge)
|
||||
|
@ -52,11 +57,22 @@ OUTER:
|
|||
continue OUTER
|
||||
}
|
||||
}
|
||||
// notify persister
|
||||
if notify != nil {
|
||||
close(notify)
|
||||
notify = nil
|
||||
|
||||
var epochCurr uint64
|
||||
s.rootLock.RLock()
|
||||
if s.root != nil {
|
||||
epochCurr = s.root.epoch
|
||||
}
|
||||
s.rootLock.RUnlock()
|
||||
var epochWatchersNext []*epochWatcher
|
||||
for _, w := range epochWatchers {
|
||||
if w.epoch < epochCurr {
|
||||
close(w.notifyCh)
|
||||
} else {
|
||||
epochWatchersNext = append(epochWatchersNext, w)
|
||||
}
|
||||
}
|
||||
epochWatchers = epochWatchersNext
|
||||
}
|
||||
|
||||
s.asyncTasks.Done()
|
||||
|
@ -97,10 +113,10 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
|||
newSnapshot.segment[i] = &SegmentSnapshot{
|
||||
id: s.root.segment[i].id,
|
||||
segment: s.root.segment[i].segment,
|
||||
persisted: s.root.segment[i].persisted,
|
||||
cachedDocs: s.root.segment[i].cachedDocs,
|
||||
}
|
||||
s.root.segment[i].segment.AddRef()
|
||||
|
||||
// apply new obsoletions
|
||||
if s.root.segment[i].deleted == nil {
|
||||
newSnapshot.segment[i].deleted = delta
|
||||
|
@ -120,14 +136,6 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
|||
cachedDocs: &cachedDocs{cache: nil},
|
||||
})
|
||||
newSnapshot.offsets = append(newSnapshot.offsets, running)
|
||||
if next.persisted != nil {
|
||||
newSnapshot.segment[nsegs].persisted =
|
||||
append(newSnapshot.segment[nsegs].persisted, next.persisted)
|
||||
}
|
||||
} else { // new segment might be nil when it's an internal data update only
|
||||
if next.persisted != nil {
|
||||
newSnapshot.persisted = append(newSnapshot.persisted, next.persisted)
|
||||
}
|
||||
}
|
||||
// copy old values
|
||||
for key, oldVal := range s.root.internal {
|
||||
|
@ -141,7 +149,10 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
|||
delete(newSnapshot.internal, key)
|
||||
}
|
||||
}
|
||||
// swap in new segment
|
||||
if next.persisted != nil {
|
||||
s.rootPersisted = append(s.rootPersisted, next.persisted)
|
||||
}
|
||||
// swap in new index snapshot
|
||||
rootPrev := s.root
|
||||
s.root = newSnapshot
|
||||
// release lock
|
||||
|
@ -200,7 +211,6 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
|||
id: s.root.segment[i].id,
|
||||
segment: s.root.segment[i].segment,
|
||||
deleted: s.root.segment[i].deleted,
|
||||
persisted: s.root.segment[i].persisted,
|
||||
cachedDocs: s.root.segment[i].cachedDocs,
|
||||
})
|
||||
s.root.segment[i].segment.AddRef()
|
||||
|
|
|
@ -35,82 +35,85 @@ import (
|
|||
type notificationChan chan struct{}
|
||||
|
||||
func (s *Scorch) persisterLoop() {
|
||||
var notify notificationChan
|
||||
defer s.asyncTasks.Done()
|
||||
|
||||
var notifyChs []notificationChan
|
||||
var lastPersistedEpoch uint64
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case <-s.closeCh:
|
||||
break OUTER
|
||||
case notify = <-s.persisterNotifier:
|
||||
|
||||
case notifyCh := <-s.persisterNotifier:
|
||||
notifyChs = append(notifyChs, notifyCh)
|
||||
default:
|
||||
// check to see if there is a new snapshot to persist
|
||||
s.rootLock.RLock()
|
||||
ourSnapshot := s.root
|
||||
ourSnapshot.AddRef()
|
||||
s.rootLock.RUnlock()
|
||||
}
|
||||
|
||||
if ourSnapshot.epoch != lastPersistedEpoch {
|
||||
// lets get started
|
||||
err := s.persistSnapshot(ourSnapshot)
|
||||
if err != nil {
|
||||
log.Printf("got err persisting snapshot: %v", err)
|
||||
_ = ourSnapshot.DecRef()
|
||||
continue OUTER
|
||||
}
|
||||
lastPersistedEpoch = ourSnapshot.epoch
|
||||
if notify != nil {
|
||||
close(notify)
|
||||
notify = nil
|
||||
}
|
||||
}
|
||||
_ = ourSnapshot.DecRef()
|
||||
var ourSnapshot *IndexSnapshot
|
||||
var ourPersisted []chan error
|
||||
|
||||
// tell the introducer we're waiting for changes
|
||||
// first make a notification chan
|
||||
notifyUs := make(notificationChan)
|
||||
|
||||
// give it to the introducer
|
||||
select {
|
||||
case <-s.closeCh:
|
||||
break OUTER
|
||||
case s.introducerNotifier <- notifyUs:
|
||||
}
|
||||
|
||||
// check again
|
||||
s.rootLock.RLock()
|
||||
// check to see if there is a new snapshot to persist
|
||||
s.rootLock.Lock()
|
||||
if s.root != nil && s.root.epoch > lastPersistedEpoch {
|
||||
ourSnapshot = s.root
|
||||
ourSnapshot.AddRef()
|
||||
s.rootLock.RUnlock()
|
||||
ourPersisted = s.rootPersisted
|
||||
s.rootPersisted = nil
|
||||
}
|
||||
s.rootLock.Unlock()
|
||||
|
||||
if ourSnapshot.epoch != lastPersistedEpoch {
|
||||
// lets get started
|
||||
err := s.persistSnapshot(ourSnapshot)
|
||||
if ourSnapshot != nil {
|
||||
err := s.persistSnapshot(ourSnapshot)
|
||||
for _, ch := range ourPersisted {
|
||||
if err != nil {
|
||||
log.Printf("got err persisting snapshot: %v", err)
|
||||
_ = ourSnapshot.DecRef()
|
||||
continue OUTER
|
||||
}
|
||||
lastPersistedEpoch = ourSnapshot.epoch
|
||||
if notify != nil {
|
||||
close(notify)
|
||||
notify = nil
|
||||
ch <- err
|
||||
}
|
||||
close(ch)
|
||||
}
|
||||
if err != nil {
|
||||
log.Printf("got err persisting snapshot: %v", err)
|
||||
_ = ourSnapshot.DecRef()
|
||||
continue OUTER
|
||||
}
|
||||
lastPersistedEpoch = ourSnapshot.epoch
|
||||
for _, notifyCh := range notifyChs {
|
||||
close(notifyCh)
|
||||
}
|
||||
notifyChs = nil
|
||||
_ = ourSnapshot.DecRef()
|
||||
|
||||
// now wait for it (but also detect close)
|
||||
select {
|
||||
case <-s.closeCh:
|
||||
break OUTER
|
||||
case <-notifyUs:
|
||||
// woken up, next loop should pick up work
|
||||
changed := false
|
||||
s.rootLock.RLock()
|
||||
if s.root != nil && s.root.epoch != lastPersistedEpoch {
|
||||
changed = true
|
||||
}
|
||||
s.rootLock.RUnlock()
|
||||
if changed {
|
||||
continue OUTER
|
||||
}
|
||||
}
|
||||
s.removeOldData()
|
||||
|
||||
// tell the introducer we're waiting for changes
|
||||
w := &epochWatcher{
|
||||
epoch: lastPersistedEpoch,
|
||||
notifyCh: make(notificationChan, 1),
|
||||
}
|
||||
|
||||
select {
|
||||
case <-s.closeCh:
|
||||
break OUTER
|
||||
case s.introducerNotifier <- w:
|
||||
}
|
||||
|
||||
s.removeOldData() // might as well cleanup while waiting
|
||||
|
||||
select {
|
||||
case <-s.closeCh:
|
||||
break OUTER
|
||||
case <-w.notifyCh:
|
||||
// woken up, next loop should pick up work
|
||||
}
|
||||
}
|
||||
s.asyncTasks.Done()
|
||||
}
|
||||
|
||||
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
|
||||
|
@ -221,12 +224,6 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
|
|||
}
|
||||
}
|
||||
|
||||
// get write lock and update the current snapshot with disk-based versions
|
||||
snapshot.m.Lock()
|
||||
notifications := snapshot.persisted
|
||||
snapshot.persisted = nil
|
||||
snapshot.m.Unlock()
|
||||
|
||||
s.rootLock.Lock()
|
||||
newIndexSnapshot := &IndexSnapshot{
|
||||
parent: s,
|
||||
|
@ -240,16 +237,12 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
|
|||
// see if this segment has been replaced
|
||||
if replacement, ok := newSegments[segmentSnapshot.id]; ok {
|
||||
newSegmentSnapshot := &SegmentSnapshot{
|
||||
id: segmentSnapshot.id,
|
||||
segment: replacement,
|
||||
deleted: segmentSnapshot.deleted,
|
||||
id: segmentSnapshot.id,
|
||||
cachedDocs: segmentSnapshot.cachedDocs,
|
||||
}
|
||||
newIndexSnapshot.segment[i] = newSegmentSnapshot
|
||||
// add the old segment snapshots notifications to the list
|
||||
for _, notification := range segmentSnapshot.persisted {
|
||||
notifications = append(notifications, notification)
|
||||
}
|
||||
} else {
|
||||
newIndexSnapshot.segment[i] = s.root.segment[i]
|
||||
newIndexSnapshot.segment[i].segment.AddRef()
|
||||
|
@ -270,12 +263,6 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
|
|||
_ = rootPrev.DecRef()
|
||||
}
|
||||
|
||||
// now that we've given up the lock, notify everyone that we've safely
|
||||
// persisted their data
|
||||
for _, notification := range notifications {
|
||||
close(notification)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -49,13 +49,14 @@ type Scorch struct {
|
|||
|
||||
unsafeBatch bool
|
||||
|
||||
rootLock sync.RWMutex
|
||||
root *IndexSnapshot // holds 1 ref-count on the root
|
||||
rootLock sync.RWMutex
|
||||
root *IndexSnapshot // holds 1 ref-count on the root
|
||||
rootPersisted []chan error // closed when root is persisted
|
||||
|
||||
closeCh chan struct{}
|
||||
introductions chan *segmentIntroduction
|
||||
merges chan *segmentMerge
|
||||
introducerNotifier chan notificationChan
|
||||
introducerNotifier chan *epochWatcher
|
||||
persisterNotifier chan notificationChan
|
||||
rootBolt *bolt.DB
|
||||
asyncTasks sync.WaitGroup
|
||||
|
@ -127,7 +128,7 @@ func (s *Scorch) Open() error {
|
|||
|
||||
s.introductions = make(chan *segmentIntroduction)
|
||||
s.merges = make(chan *segmentMerge)
|
||||
s.introducerNotifier = make(chan notificationChan)
|
||||
s.introducerNotifier = make(chan *epochWatcher, 1)
|
||||
s.persisterNotifier = make(chan notificationChan)
|
||||
|
||||
if !s.readOnly && s.path != "" {
|
||||
|
@ -251,7 +252,7 @@ func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string,
|
|||
}
|
||||
|
||||
if !s.unsafeBatch {
|
||||
introduction.persisted = make(chan error)
|
||||
introduction.persisted = make(chan error, 1)
|
||||
}
|
||||
|
||||
// get read lock, to optimistically prepare obsoleted info
|
||||
|
|
|
@ -48,8 +48,6 @@ type IndexSnapshot struct {
|
|||
|
||||
m sync.Mutex // Protects the fields that follow.
|
||||
refs int64
|
||||
|
||||
persisted []chan error
|
||||
}
|
||||
|
||||
func (i *IndexSnapshot) AddRef() {
|
||||
|
|
|
@ -52,8 +52,6 @@ type SegmentSnapshot struct {
|
|||
segment segment.Segment
|
||||
deleted *roaring.Bitmap
|
||||
|
||||
persisted []chan error // closed when the segment is persisted
|
||||
|
||||
cachedDocs *cachedDocs
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue