0
0
Fork 0

Merge pull request #699 from abhinavdangeti/scorch1

Wait for rollback'ed snapshot to persist
This commit is contained in:
Marty Schoch 2017-12-28 06:51:26 -08:00 committed by GitHub
commit ee9cc24a6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 16 deletions

View File

@ -38,8 +38,9 @@ type epochWatcher struct {
}
type snapshotReversion struct {
snapshot *IndexSnapshot
applied chan error
snapshot *IndexSnapshot
applied chan error
persisted chan error
}
func (s *Scorch) mainLoop() {
@ -285,6 +286,10 @@ func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error {
segmentSnapshot.segment.AddRef()
}
if revertTo.persisted != nil {
s.rootPersisted = append(s.rootPersisted, revertTo.persisted)
}
// swap in new snapshot
rootPrev := s.root
s.root = newSnapshot

View File

@ -95,8 +95,21 @@ func (s *Scorch) SnapshotRevert(revertTo *IndexSnapshot) error {
applied: make(chan error),
}
if !s.unsafeBatch {
revert.persisted = make(chan error)
}
s.revertToSnapshots <- revert
// block until this IndexSnapshot is applied
return <-revert.applied
err := <-revert.applied
if err != nil {
return err
}
if revert.persisted != nil {
err = <-revert.persisted
}
return err
}

View File

@ -92,22 +92,23 @@ func TestIndexRollback(t *testing.T) {
t.Error(err)
}
err = sh.SnapshotRevert(prev)
if err != nil {
t.Error(err)
}
if prev != nil {
err = sh.SnapshotRevert(prev)
if err != nil {
t.Error(err)
}
newRoot, err := sh.PreviousPersistedSnapshot(nil)
if err != nil {
t.Error(err)
}
if newRoot == nil {
t.Errorf("Failed to retrieve latest persisted snapshot")
}
newRoot := sh.root
if newRoot != nil && prev != nil {
if newRoot.epoch <= prev.epoch {
t.Errorf("Unexpected epoch, %v <= %v", newRoot.epoch, prev.epoch)
}
} else {
if prev == nil {
t.Errorf("The last persisted snapshot before the revert was nil!")
}
if newRoot == nil {
t.Errorf("The new root has been set to nil?")
}
}
}