0
0
Fork 0

Merge pull request #720 from abhinavdangeti/scorch

Updated Rollback APIs
This commit is contained in:
Abhinav Dangeti 2018-01-04 14:51:33 -08:00 committed by GitHub
commit dee1dd9bc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 221 additions and 97 deletions

View File

@ -289,7 +289,11 @@ func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error {
deleted: segmentSnapshot.deleted,
cachedDocs: segmentSnapshot.cachedDocs,
}
segmentSnapshot.segment.AddRef()
newSnapshot.segment[i].segment.AddRef()
// remove segment from ineligibleForRemoval map
filename := zapFileName(segmentSnapshot.id)
delete(s.ineligibleForRemoval, filename)
}
if revertTo.persisted != nil {

View File

@ -15,98 +15,164 @@
package scorch
import (
"bytes"
"fmt"
"log"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/boltdb/bolt"
)
// PreviousPersistedSnapshot returns the next older, previous
// IndexSnapshot based on the provided IndexSnapshot. If the provided
// argument is nil, the most recently persisted IndexSnapshot is returned.
// This API allows the application to walk backwards into the history
// of a store to previous points in time. A nil return value indicates
// that no previous snapshots are available.
func (s *Scorch) PreviousPersistedSnapshot(is *IndexSnapshot) (*IndexSnapshot, error) {
type RollbackPoint struct {
epoch uint64
meta map[string][]byte
}
func (r *RollbackPoint) GetInternal(key []byte) []byte {
return r.meta[string(key)]
}
// RollbackPoints returns an array of rollback points available
// for the application to make a decision on where to rollback
// to. A nil return value indicates that there are no available
// rollback points.
func (s *Scorch) RollbackPoints() ([]*RollbackPoint, error) {
if s.rootBolt == nil {
return nil, nil
return nil, fmt.Errorf("RollbackPoints: root is nil")
}
// start a read-only transaction
// start a read-only bolt transaction
tx, err := s.rootBolt.Begin(false)
if err != nil {
return nil, err
return nil, fmt.Errorf("RollbackPoints: failed to start" +
" read-only transaction")
}
// Read-only bolt transactions to be rolled back.
// read-only bolt transactions to be rolled back
defer func() {
_ = tx.Rollback()
}()
snapshots := tx.Bucket(boltSnapshotsBucket)
if snapshots == nil {
return nil, nil
return nil, fmt.Errorf("RollbackPoints: no snapshots available")
}
pos := []byte(nil)
rollbackPoints := []*RollbackPoint{}
if is != nil {
pos = segment.EncodeUvarintAscending(nil, is.epoch)
}
c := snapshots.Cursor()
for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
if pos == nil || bytes.Compare(k, pos) < 0 {
_, snapshotEpoch, err := segment.DecodeUvarintAscending(k)
if err != nil {
log.Printf("PreviousPersistedSnapshot:"+
" unable to parse segment epoch %x, continuing", k)
continue
}
snapshot := snapshots.Bucket(k)
if snapshot == nil {
log.Printf("PreviousPersistedSnapshot:"+
" snapshot key, but bucket missing %x, continuing", k)
continue
}
indexSnapshot, err := s.loadSnapshot(snapshot)
if err != nil {
log.Printf("PreviousPersistedSnapshot:"+
" unable to load snapshot, %v, continuing", err)
continue
}
indexSnapshot.epoch = snapshotEpoch
return indexSnapshot, nil
c1 := snapshots.Cursor()
for k, _ := c1.Last(); k != nil; k, _ = c1.Prev() {
_, snapshotEpoch, err := segment.DecodeUvarintAscending(k)
if err != nil {
log.Printf("RollbackPoints:"+
" unable to parse segment epoch %x, continuing", k)
continue
}
snapshot := snapshots.Bucket(k)
if snapshot == nil {
log.Printf("RollbackPoints:"+
" snapshot key, but bucket missing %x, continuing", k)
continue
}
meta := map[string][]byte{}
c2 := snapshot.Cursor()
for j, _ := c2.First(); j != nil; j, _ = c2.Next() {
if j[0] == boltInternalKey[0] {
internalBucket := snapshot.Bucket(j)
err = internalBucket.ForEach(func(key []byte, val []byte) error {
copiedVal := append([]byte(nil), val...)
meta[string(key)] = copiedVal
return nil
})
if err != nil {
break
}
}
}
if err != nil {
log.Printf("RollbackPoints:"+
" failed in fetching internal data: %v", err)
continue
}
rollbackPoints = append(rollbackPoints, &RollbackPoint{
epoch: snapshotEpoch,
meta: meta,
})
}
return nil, nil
return rollbackPoints, nil
}
// SnapshotRevert atomically brings the store back to the point in time
// as represented by the revertTo IndexSnapshot. SnapshotRevert() should
// only be passed an IndexSnapshot that came from the same store.
func (s *Scorch) SnapshotRevert(revertTo *IndexSnapshot) error {
revert := &snapshotReversion{
snapshot: revertTo,
applied: make(chan error),
// Rollback atomically and durably (if unsafeBatch is unset) brings
// the store back to the point in time as represented by the
// RollbackPoint. Rollback() should only be passed a RollbackPoint
// that came from the same store using the RollbackPoints() API.
func (s *Scorch) Rollback(to *RollbackPoint) error {
if to == nil {
return fmt.Errorf("Rollback: RollbackPoint is nil")
}
if !s.unsafeBatch {
revert.persisted = make(chan error)
if s.rootBolt == nil {
return fmt.Errorf("Rollback: root is nil")
}
s.revertToSnapshots <- revert
revert := &snapshotReversion{}
s.rootLock.Lock()
err := s.rootBolt.View(func(tx *bolt.Tx) error {
snapshots := tx.Bucket(boltSnapshotsBucket)
if snapshots == nil {
return fmt.Errorf("Rollback: no snapshots available")
}
pos := segment.EncodeUvarintAscending(nil, to.epoch)
snapshot := snapshots.Bucket(pos)
if snapshot == nil {
return fmt.Errorf("Rollback: snapshot not found")
}
indexSnapshot, err := s.loadSnapshot(snapshot)
if err != nil {
return fmt.Errorf("Rollback: unable to load snapshot: %v", err)
}
// add segments referenced by loaded index snapshot to the
// ineligibleForRemoval map
for _, segSnap := range indexSnapshot.segment {
filename := zapFileName(segSnap.id)
s.ineligibleForRemoval[filename] = true
}
revert.snapshot = indexSnapshot
revert.applied = make(chan error)
if !s.unsafeBatch {
revert.persisted = make(chan error)
}
return nil
})
s.rootLock.Unlock()
// block until this IndexSnapshot is applied
err := <-revert.applied
if err != nil {
return err
}
// introduce the reversion
s.revertToSnapshots <- revert
// block until this snapshot is applied
err = <-revert.applied
if err != nil {
return fmt.Errorf("Rollback: failed with err: %v", err)
}
if revert.persisted != nil {
err = <-revert.persisted
}

View File

@ -45,70 +45,124 @@ func TestIndexRollback(t *testing.T) {
}
}()
// create 2 docs
// create a batch, insert 2 new documents
batch := index.NewBatch()
doc := document.NewDocument("1")
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test1")))
err = idx.Update(doc)
if err != nil {
t.Error(err)
}
batch.Update(doc)
doc = document.NewDocument("2")
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test2")))
err = idx.Update(doc)
batch.Update(doc)
err = idx.Batch(batch)
if err != nil {
t.Error(err)
t.Fatal(err)
}
// create a batch, insert new doc, update existing doc, delete existing doc
batch := index.NewBatch()
sh, ok := idx.(*Scorch)
if !ok {
t.Fatalf("Not a scorch index?")
}
// fetch rollback points available as of here
rollbackPoints, err := sh.RollbackPoints()
if err != nil || len(rollbackPoints) == 0 {
t.Fatal(err, len(rollbackPoints))
}
// set this as a rollback point for the future
rollbackPoint := rollbackPoints[0]
// create another batch, insert 2 new documents, and delete an existing one
batch = index.NewBatch()
doc = document.NewDocument("3")
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test3")))
batch.Update(doc)
doc = document.NewDocument("2")
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test2updated")))
doc = document.NewDocument("4")
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test4")))
batch.Update(doc)
batch.Delete("1")
err = idx.Batch(batch)
if err != nil {
t.Error(err)
t.Fatal(err)
}
sh, ok := idx.(*Scorch)
if !ok {
t.Errorf("Not a scorch index?")
}
// Get Last persisted snapshot
ss, err := sh.PreviousPersistedSnapshot(nil)
reader, err := idx.Reader()
if err != nil {
t.Error(err)
t.Fatal(err)
}
// Retrieve the snapshot earlier
prev, err := sh.PreviousPersistedSnapshot(ss)
docCount, err := reader.DocCount()
if err != nil {
t.Error(err)
t.Fatal(err)
}
if prev != nil {
err = sh.SnapshotRevert(prev)
if err != nil {
t.Error(err)
}
// expect docs 2, 3, 4
if docCount != 3 {
t.Fatalf("unexpected doc count: %v", docCount)
}
ret, err := reader.Document("1")
if err != nil || ret != nil {
t.Fatal(ret, err)
}
ret, err = reader.Document("2")
if err != nil || ret == nil {
t.Fatal(ret, err)
}
ret, err = reader.Document("3")
if err != nil || ret == nil {
t.Fatal(ret, err)
}
ret, err = reader.Document("4")
if err != nil || ret == nil {
t.Fatal(ret, err)
}
newRoot, err := sh.PreviousPersistedSnapshot(nil)
if err != nil {
t.Error(err)
}
err = reader.Close()
if err != nil {
t.Fatal(err)
}
if newRoot == nil {
t.Errorf("Failed to retrieve latest persisted snapshot")
}
// rollback to the selected rollback point
err = sh.Rollback(rollbackPoint)
if err != nil {
t.Fatal(err)
}
if newRoot.epoch <= prev.epoch {
t.Errorf("Unexpected epoch, %v <= %v", newRoot.epoch, prev.epoch)
}
reader, err = idx.Reader()
if err != nil {
t.Fatal(err)
}
docCount, err = reader.DocCount()
if err != nil {
t.Fatal(err)
}
// expect only docs 1, 2
if docCount != 2 {
t.Fatalf("unexpected doc count: %v", docCount)
}
ret, err = reader.Document("1")
if err != nil || ret == nil {
t.Fatal(ret, err)
}
ret, err = reader.Document("2")
if err != nil || ret == nil {
t.Fatal(ret, err)
}
ret, err = reader.Document("3")
if err != nil || ret != nil {
t.Fatal(ret, err)
}
ret, err = reader.Document("4")
if err != nil || ret != nil {
t.Fatal(ret, err)
}
err = reader.Close()
if err != nil {
t.Fatal(err)
}
}