0
0
Fork 0

scorch APIs to support rollback

- PreviousPersistedSnapshot
- SnapshotRevert

+ unit test
This commit is contained in:
abhinavdangeti 2017-12-15 14:49:33 -08:00
parent bf833a5eb8
commit 679f1ce9c3
4 changed files with 275 additions and 0 deletions

View File

@ -37,6 +37,11 @@ type epochWatcher struct {
notifyCh notificationChan
}
type snapshotReversion struct {
snapshot *IndexSnapshot
applied chan error
}
func (s *Scorch) mainLoop() {
var epochWatchers []*epochWatcher
OUTER:
@ -56,6 +61,12 @@ OUTER:
if err != nil {
continue OUTER
}
case revertTo := <-s.revertToSnapshots:
err := s.revertToSnapshot(revertTo)
if err != nil {
continue OUTER
}
}
var epochCurr uint64
@ -241,3 +252,50 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
// notify merger we incorporated this
close(nextMerge.notify)
}
func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error {
if revertTo.snapshot == nil {
err := fmt.Errorf("Cannot revert to a nil snapshot")
revertTo.applied <- err
return err
}
// acquire lock
s.rootLock.Lock()
// prepare a new index snapshot, based on next snapshot
newSnapshot := &IndexSnapshot{
parent: s,
segment: make([]*SegmentSnapshot, len(revertTo.snapshot.segment)),
offsets: revertTo.snapshot.offsets,
internal: revertTo.snapshot.internal,
epoch: s.nextSnapshotEpoch,
refs: 1,
}
s.nextSnapshotEpoch++
// iterate through segments
for i, segmentSnapshot := range revertTo.snapshot.segment {
newSnapshot.segment[i] = &SegmentSnapshot{
id: segmentSnapshot.id,
segment: segmentSnapshot.segment,
deleted: segmentSnapshot.deleted,
cachedDocs: segmentSnapshot.cachedDocs,
}
segmentSnapshot.segment.AddRef()
}
// swap in new snapshot
rootPrev := s.root
s.root = newSnapshot
// release lock
s.rootLock.Unlock()
if rootPrev != nil {
_ = rootPrev.DecRef()
}
close(revertTo.applied)
return nil
}

View File

@ -57,6 +57,7 @@ type Scorch struct {
introductions chan *segmentIntroduction
merges chan *segmentMerge
introducerNotifier chan *epochWatcher
revertToSnapshots chan *snapshotReversion
persisterNotifier chan notificationChan
rootBolt *bolt.DB
asyncTasks sync.WaitGroup
@ -129,6 +130,7 @@ func (s *Scorch) Open() error {
s.introductions = make(chan *segmentIntroduction)
s.merges = make(chan *segmentMerge)
s.introducerNotifier = make(chan *epochWatcher, 1)
s.revertToSnapshots = make(chan *snapshotReversion)
s.persisterNotifier = make(chan notificationChan)
if !s.readOnly && s.path != "" {

View File

@ -0,0 +1,102 @@
// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scorch
import (
"bytes"
"log"
"github.com/blevesearch/bleve/index/scorch/segment"
)
// PreviousPersistedSnapshot returns the next older, previous
// IndexSnapshot based on the provided IndexSnapshot. If the provided
// argument is nil, the most recently persisted IndexSnapshot is returned.
// This API allows the application to walk backwards into the history
// of a store to previous points in time. A nil return value indicates
// that no previous snapshots are available.
func (s *Scorch) PreviousPersistedSnapshot(is *IndexSnapshot) (*IndexSnapshot, error) {
if s.rootBolt == nil {
return nil, nil
}
// start a read-only transaction
tx, err := s.rootBolt.Begin(false)
if err != nil {
return nil, err
}
// Read-only bolt transactions to be rolled back.
defer func() {
_ = tx.Rollback()
}()
snapshots := tx.Bucket(boltSnapshotsBucket)
if snapshots == nil {
return nil, nil
}
pos := []byte(nil)
if is != nil {
pos = segment.EncodeUvarintAscending(nil, is.epoch)
}
c := snapshots.Cursor()
for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
if pos == nil || bytes.Compare(k, pos) < 0 {
_, snapshotEpoch, err := segment.DecodeUvarintAscending(k)
if err != nil {
log.Printf("PreviousPersistedSnapshot:"+
" unable to parse segment epoch %x, continuing", k)
continue
}
snapshot := snapshots.Bucket(k)
if snapshot == nil {
log.Printf("PreviousPersistedSnapshot:"+
" snapshot key, but bucket missing %x, continuing", k)
continue
}
indexSnapshot, err := s.loadSnapshot(snapshot)
if err != nil {
log.Printf("PreviousPersistedSnapshot:"+
" unable to load snapshot, %v, continuing", err)
continue
}
indexSnapshot.epoch = snapshotEpoch
return indexSnapshot, nil
}
}
return nil, nil
}
// SnapshotRevert atomically brings the store back to the point in time
// as represented by the revertTo IndexSnapshot. SnapshotRevert() should
// only be passed an IndexSnapshot that came from the same store.
func (s *Scorch) SnapshotRevert(revertTo *IndexSnapshot) error {
revert := &snapshotReversion{
snapshot: revertTo,
applied: make(chan error),
}
s.revertToSnapshots <- revert
// block until this IndexSnapshot is applied
return <-revert.applied
}

View File

@ -0,0 +1,113 @@
// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scorch
import (
"testing"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
)
func TestIndexRollback(t *testing.T) {
defer func() {
err := DestroyTest()
if err != nil {
t.Fatal(err)
}
}()
analysisQueue := index.NewAnalysisQueue(1)
idx, err := NewScorch(Name, testConfig, analysisQueue)
if err != nil {
t.Fatal(err)
}
err = idx.Open()
if err != nil {
t.Fatalf("error opening index: %v", err)
}
defer func() {
err := idx.Close()
if err != nil {
t.Fatal(err)
}
}()
// create 2 docs
doc := document.NewDocument("1")
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test1")))
err = idx.Update(doc)
if err != nil {
t.Error(err)
}
doc = document.NewDocument("2")
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test2")))
err = idx.Update(doc)
if err != nil {
t.Error(err)
}
// create a batch, insert new doc, update existing doc, delete existing doc
batch := index.NewBatch()
doc = document.NewDocument("3")
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test3")))
batch.Update(doc)
doc = document.NewDocument("2")
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test2updated")))
batch.Update(doc)
batch.Delete("1")
err = idx.Batch(batch)
if err != nil {
t.Error(err)
}
sh, ok := idx.(*Scorch)
if !ok {
t.Errorf("Not a scorch index?")
}
// Get Last persisted snapshot
ss, err := sh.PreviousPersistedSnapshot(nil)
if err != nil {
t.Error(err)
}
// Retrieve the snapshot earlier
prev, err := sh.PreviousPersistedSnapshot(ss)
if err != nil {
t.Error(err)
}
err = sh.SnapshotRevert(prev)
if err != nil {
t.Error(err)
}
newRoot := sh.root
if newRoot != nil && prev != nil {
if newRoot.epoch <= prev.epoch {
t.Errorf("Unexpected epoch, %v <= %v", newRoot.epoch, prev.epoch)
}
} else {
if prev == nil {
t.Errorf("The last persisted snapshot before the revert was nil!")
}
if newRoot == nil {
t.Errorf("The new root has been set to nil?")
}
}
}