Updated Rollback APIs
New APIs: + RollbackPoints() - Retrieves the available list of rollback points: epoch+meta. - The application will need to check with the meta to decide on the rollback point. + Rollback() - API requires a rollback point identified by the first API. - Atomically & Durably rolls back the index to specified point, provided the specified rollback point is still available. + Unit test: TestIndexRollback - Writes a batch. - Sets the rollback point. - Writes second batch. - Rollback to previously decided point. - Ensure that data is as is before the second batch.
This commit is contained in:
parent
6ad679bbb5
commit
111f0d0721
|
@ -289,7 +289,11 @@ func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error {
|
|||
deleted: segmentSnapshot.deleted,
|
||||
cachedDocs: segmentSnapshot.cachedDocs,
|
||||
}
|
||||
segmentSnapshot.segment.AddRef()
|
||||
newSnapshot.segment[i].segment.AddRef()
|
||||
|
||||
// remove segment from ineligibleForRemoval map
|
||||
filename := zapFileName(segmentSnapshot.id)
|
||||
delete(s.ineligibleForRemoval, filename)
|
||||
}
|
||||
|
||||
if revertTo.persisted != nil {
|
||||
|
|
|
@ -15,98 +15,164 @@
|
|||
package scorch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/blevesearch/bleve/index/scorch/segment"
|
||||
"github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
// PreviousPersistedSnapshot returns the next older, previous
|
||||
// IndexSnapshot based on the provided IndexSnapshot. If the provided
|
||||
// argument is nil, the most recently persisted IndexSnapshot is returned.
|
||||
// This API allows the application to walk backwards into the history
|
||||
// of a store to previous points in time. A nil return value indicates
|
||||
// that no previous snapshots are available.
|
||||
func (s *Scorch) PreviousPersistedSnapshot(is *IndexSnapshot) (*IndexSnapshot, error) {
|
||||
type RollbackPoint struct {
|
||||
epoch uint64
|
||||
meta map[string][]byte
|
||||
}
|
||||
|
||||
func (r *RollbackPoint) GetInternal(key []byte) []byte {
|
||||
return r.meta[string(key)]
|
||||
}
|
||||
|
||||
// RollbackPoints returns an array of rollback points available
|
||||
// for the application to make a decision on where to rollback
|
||||
// to. A nil return value indicates that there are no available
|
||||
// rollback points.
|
||||
func (s *Scorch) RollbackPoints() ([]*RollbackPoint, error) {
|
||||
if s.rootBolt == nil {
|
||||
return nil, nil
|
||||
return nil, fmt.Errorf("RollbackPoints: root is nil")
|
||||
}
|
||||
|
||||
// start a read-only transaction
|
||||
// start a read-only bolt transaction
|
||||
tx, err := s.rootBolt.Begin(false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("RollbackPoints: failed to start" +
|
||||
" read-only transaction")
|
||||
}
|
||||
|
||||
// Read-only bolt transactions to be rolled back.
|
||||
// read-only bolt transactions to be rolled back
|
||||
defer func() {
|
||||
_ = tx.Rollback()
|
||||
}()
|
||||
|
||||
snapshots := tx.Bucket(boltSnapshotsBucket)
|
||||
if snapshots == nil {
|
||||
return nil, nil
|
||||
return nil, fmt.Errorf("RollbackPoints: no snapshots available")
|
||||
}
|
||||
|
||||
pos := []byte(nil)
|
||||
rollbackPoints := []*RollbackPoint{}
|
||||
|
||||
if is != nil {
|
||||
pos = segment.EncodeUvarintAscending(nil, is.epoch)
|
||||
}
|
||||
|
||||
c := snapshots.Cursor()
|
||||
for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
|
||||
if pos == nil || bytes.Compare(k, pos) < 0 {
|
||||
_, snapshotEpoch, err := segment.DecodeUvarintAscending(k)
|
||||
if err != nil {
|
||||
log.Printf("PreviousPersistedSnapshot:"+
|
||||
" unable to parse segment epoch %x, continuing", k)
|
||||
continue
|
||||
}
|
||||
|
||||
snapshot := snapshots.Bucket(k)
|
||||
if snapshot == nil {
|
||||
log.Printf("PreviousPersistedSnapshot:"+
|
||||
" snapshot key, but bucket missing %x, continuing", k)
|
||||
continue
|
||||
}
|
||||
|
||||
indexSnapshot, err := s.loadSnapshot(snapshot)
|
||||
if err != nil {
|
||||
log.Printf("PreviousPersistedSnapshot:"+
|
||||
" unable to load snapshot, %v, continuing", err)
|
||||
continue
|
||||
}
|
||||
|
||||
indexSnapshot.epoch = snapshotEpoch
|
||||
return indexSnapshot, nil
|
||||
c1 := snapshots.Cursor()
|
||||
for k, _ := c1.Last(); k != nil; k, _ = c1.Prev() {
|
||||
_, snapshotEpoch, err := segment.DecodeUvarintAscending(k)
|
||||
if err != nil {
|
||||
log.Printf("RollbackPoints:"+
|
||||
" unable to parse segment epoch %x, continuing", k)
|
||||
continue
|
||||
}
|
||||
|
||||
snapshot := snapshots.Bucket(k)
|
||||
if snapshot == nil {
|
||||
log.Printf("RollbackPoints:"+
|
||||
" snapshot key, but bucket missing %x, continuing", k)
|
||||
continue
|
||||
}
|
||||
|
||||
meta := map[string][]byte{}
|
||||
c2 := snapshot.Cursor()
|
||||
for j, _ := c2.First(); j != nil; j, _ = c2.Next() {
|
||||
if j[0] == boltInternalKey[0] {
|
||||
internalBucket := snapshot.Bucket(j)
|
||||
err = internalBucket.ForEach(func(key []byte, val []byte) error {
|
||||
copiedVal := append([]byte(nil), val...)
|
||||
meta[string(key)] = copiedVal
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Printf("RollbackPoints:"+
|
||||
" failed in fetching internal data: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
rollbackPoints = append(rollbackPoints, &RollbackPoint{
|
||||
epoch: snapshotEpoch,
|
||||
meta: meta,
|
||||
})
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
return rollbackPoints, nil
|
||||
}
|
||||
|
||||
// SnapshotRevert atomically brings the store back to the point in time
|
||||
// as represented by the revertTo IndexSnapshot. SnapshotRevert() should
|
||||
// only be passed an IndexSnapshot that came from the same store.
|
||||
func (s *Scorch) SnapshotRevert(revertTo *IndexSnapshot) error {
|
||||
revert := &snapshotReversion{
|
||||
snapshot: revertTo,
|
||||
applied: make(chan error),
|
||||
// Rollback atomically and durably (if unsafeBatch is unset) brings
|
||||
// the store back to the point in time as represented by the
|
||||
// RollbackPoint. Rollback() should only be passed a RollbackPoint
|
||||
// that came from the same store using the RollbackPoints() API.
|
||||
func (s *Scorch) Rollback(to *RollbackPoint) error {
|
||||
if to == nil {
|
||||
return fmt.Errorf("Rollback: RollbackPoint is nil")
|
||||
}
|
||||
|
||||
if !s.unsafeBatch {
|
||||
revert.persisted = make(chan error)
|
||||
if s.rootBolt == nil {
|
||||
return fmt.Errorf("Rollback: root is nil")
|
||||
}
|
||||
|
||||
s.revertToSnapshots <- revert
|
||||
revert := &snapshotReversion{}
|
||||
|
||||
s.rootLock.Lock()
|
||||
|
||||
err := s.rootBolt.View(func(tx *bolt.Tx) error {
|
||||
snapshots := tx.Bucket(boltSnapshotsBucket)
|
||||
if snapshots == nil {
|
||||
return fmt.Errorf("Rollback: no snapshots available")
|
||||
}
|
||||
|
||||
pos := segment.EncodeUvarintAscending(nil, to.epoch)
|
||||
|
||||
snapshot := snapshots.Bucket(pos)
|
||||
if snapshot == nil {
|
||||
return fmt.Errorf("Rollback: snapshot not found")
|
||||
}
|
||||
|
||||
indexSnapshot, err := s.loadSnapshot(snapshot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Rollback: unable to load snapshot: %v", err)
|
||||
}
|
||||
|
||||
// add segments referenced by loaded index snapshot to the
|
||||
// ineligibleForRemoval map
|
||||
for _, segSnap := range indexSnapshot.segment {
|
||||
filename := zapFileName(segSnap.id)
|
||||
s.ineligibleForRemoval[filename] = true
|
||||
}
|
||||
|
||||
revert.snapshot = indexSnapshot
|
||||
revert.applied = make(chan error)
|
||||
|
||||
if !s.unsafeBatch {
|
||||
revert.persisted = make(chan error)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
s.rootLock.Unlock()
|
||||
|
||||
// block until this IndexSnapshot is applied
|
||||
err := <-revert.applied
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// introduce the reversion
|
||||
s.revertToSnapshots <- revert
|
||||
|
||||
// block until this snapshot is applied
|
||||
err = <-revert.applied
|
||||
if err != nil {
|
||||
return fmt.Errorf("Rollback: failed with err: %v", err)
|
||||
}
|
||||
|
||||
if revert.persisted != nil {
|
||||
err = <-revert.persisted
|
||||
}
|
||||
|
|
|
@ -45,70 +45,124 @@ func TestIndexRollback(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
// create 2 docs
|
||||
// create a batch, insert 2 new documents
|
||||
batch := index.NewBatch()
|
||||
doc := document.NewDocument("1")
|
||||
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test1")))
|
||||
err = idx.Update(doc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
batch.Update(doc)
|
||||
doc = document.NewDocument("2")
|
||||
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test2")))
|
||||
err = idx.Update(doc)
|
||||
batch.Update(doc)
|
||||
|
||||
err = idx.Batch(batch)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// create a batch, insert new doc, update existing doc, delete existing doc
|
||||
batch := index.NewBatch()
|
||||
sh, ok := idx.(*Scorch)
|
||||
if !ok {
|
||||
t.Fatalf("Not a scorch index?")
|
||||
}
|
||||
|
||||
// fetch rollback points available as of here
|
||||
rollbackPoints, err := sh.RollbackPoints()
|
||||
if err != nil || len(rollbackPoints) == 0 {
|
||||
t.Fatal(err, len(rollbackPoints))
|
||||
}
|
||||
|
||||
// set this as a rollback point for the future
|
||||
rollbackPoint := rollbackPoints[0]
|
||||
|
||||
// create another batch, insert 2 new documents, and delete an existing one
|
||||
batch = index.NewBatch()
|
||||
doc = document.NewDocument("3")
|
||||
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test3")))
|
||||
batch.Update(doc)
|
||||
doc = document.NewDocument("2")
|
||||
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test2updated")))
|
||||
doc = document.NewDocument("4")
|
||||
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test4")))
|
||||
batch.Update(doc)
|
||||
batch.Delete("1")
|
||||
|
||||
err = idx.Batch(batch)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
sh, ok := idx.(*Scorch)
|
||||
if !ok {
|
||||
t.Errorf("Not a scorch index?")
|
||||
}
|
||||
|
||||
// Get Last persisted snapshot
|
||||
ss, err := sh.PreviousPersistedSnapshot(nil)
|
||||
reader, err := idx.Reader()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Retrieve the snapshot earlier
|
||||
prev, err := sh.PreviousPersistedSnapshot(ss)
|
||||
docCount, err := reader.DocCount()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if prev != nil {
|
||||
err = sh.SnapshotRevert(prev)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
// expect docs 2, 3, 4
|
||||
if docCount != 3 {
|
||||
t.Fatalf("unexpected doc count: %v", docCount)
|
||||
}
|
||||
ret, err := reader.Document("1")
|
||||
if err != nil || ret != nil {
|
||||
t.Fatal(ret, err)
|
||||
}
|
||||
ret, err = reader.Document("2")
|
||||
if err != nil || ret == nil {
|
||||
t.Fatal(ret, err)
|
||||
}
|
||||
ret, err = reader.Document("3")
|
||||
if err != nil || ret == nil {
|
||||
t.Fatal(ret, err)
|
||||
}
|
||||
ret, err = reader.Document("4")
|
||||
if err != nil || ret == nil {
|
||||
t.Fatal(ret, err)
|
||||
}
|
||||
|
||||
newRoot, err := sh.PreviousPersistedSnapshot(nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
err = reader.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if newRoot == nil {
|
||||
t.Errorf("Failed to retrieve latest persisted snapshot")
|
||||
}
|
||||
// rollback to the selected rollback point
|
||||
err = sh.Rollback(rollbackPoint)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if newRoot.epoch <= prev.epoch {
|
||||
t.Errorf("Unexpected epoch, %v <= %v", newRoot.epoch, prev.epoch)
|
||||
}
|
||||
reader, err = idx.Reader()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
docCount, err = reader.DocCount()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// expect only docs 1, 2
|
||||
if docCount != 2 {
|
||||
t.Fatalf("unexpected doc count: %v", docCount)
|
||||
}
|
||||
ret, err = reader.Document("1")
|
||||
if err != nil || ret == nil {
|
||||
t.Fatal(ret, err)
|
||||
}
|
||||
ret, err = reader.Document("2")
|
||||
if err != nil || ret == nil {
|
||||
t.Fatal(ret, err)
|
||||
}
|
||||
ret, err = reader.Document("3")
|
||||
if err != nil || ret != nil {
|
||||
t.Fatal(ret, err)
|
||||
}
|
||||
ret, err = reader.Document("4")
|
||||
if err != nil || ret != nil {
|
||||
t.Fatal(ret, err)
|
||||
}
|
||||
|
||||
err = reader.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue