Merge pull request #753 from steveyen/zap-rollback-test-fixes
scorch zap TestIndexRollback fixes
This commit is contained in:
commit
534bd5ef4d
|
@ -114,6 +114,25 @@ func (s *Scorch) fireAsyncError(err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scorch) Open() error {
|
func (s *Scorch) Open() error {
|
||||||
|
err := s.openBolt()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.asyncTasks.Add(1)
|
||||||
|
go s.mainLoop()
|
||||||
|
|
||||||
|
if !s.readOnly && s.path != "" {
|
||||||
|
s.asyncTasks.Add(1)
|
||||||
|
go s.persisterLoop()
|
||||||
|
s.asyncTasks.Add(1)
|
||||||
|
go s.mergerLoop()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scorch) openBolt() error {
|
||||||
var ok bool
|
var ok bool
|
||||||
s.path, ok = s.config["path"].(string)
|
s.path, ok = s.config["path"].(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -136,6 +155,7 @@ func (s *Scorch) Open() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rootBoltPath := s.path + string(os.PathSeparator) + "root.bolt"
|
rootBoltPath := s.path + string(os.PathSeparator) + "root.bolt"
|
||||||
var err error
|
var err error
|
||||||
if s.path != "" {
|
if s.path != "" {
|
||||||
|
@ -166,16 +186,6 @@ func (s *Scorch) Open() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.asyncTasks.Add(1)
|
|
||||||
go s.mainLoop()
|
|
||||||
|
|
||||||
if !s.readOnly && s.path != "" {
|
|
||||||
s.asyncTasks.Add(1)
|
|
||||||
go s.persisterLoop()
|
|
||||||
s.asyncTasks.Add(1)
|
|
||||||
go s.mergerLoop()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestIndexRollback(t *testing.T) {
|
func TestIndexRollback(t *testing.T) {
|
||||||
|
numSnapshotsToKeepOrig := NumSnapshotsToKeep
|
||||||
|
NumSnapshotsToKeep = 1000
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
NumSnapshotsToKeep = numSnapshotsToKeepOrig
|
||||||
|
|
||||||
err := DestroyTest()
|
err := DestroyTest()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -34,10 +39,6 @@ func TestIndexRollback(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
err = idx.Open()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("error opening index: %v", err)
|
|
||||||
}
|
|
||||||
defer func() {
|
defer func() {
|
||||||
err := idx.Close()
|
err := idx.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -45,6 +46,22 @@ func TestIndexRollback(t *testing.T) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
sh, ok := idx.(*Scorch)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Not a scorch index?")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = sh.openBolt()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error opening index: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// start background goroutines except for the merger, which
|
||||||
|
// simulates a super slow merger
|
||||||
|
sh.asyncTasks.Add(2)
|
||||||
|
go sh.mainLoop()
|
||||||
|
go sh.persisterLoop()
|
||||||
|
|
||||||
// create a batch, insert 2 new documents
|
// create a batch, insert 2 new documents
|
||||||
batch := index.NewBatch()
|
batch := index.NewBatch()
|
||||||
doc := document.NewDocument("1")
|
doc := document.NewDocument("1")
|
||||||
|
@ -59,14 +76,17 @@ func TestIndexRollback(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sh, ok := idx.(*Scorch)
|
readerSlow, err := idx.Reader() // keep snapshot around so it's not cleaned up
|
||||||
if !ok {
|
if err != nil {
|
||||||
t.Fatalf("Not a scorch index?")
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
defer func() {
|
||||||
|
_ = readerSlow.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
// fetch rollback points available as of here
|
// fetch rollback points available as of here
|
||||||
rollbackPoints, err := sh.RollbackPoints()
|
rollbackPoints, err := sh.RollbackPoints()
|
||||||
if err != nil || len(rollbackPoints) == 0 {
|
if err != nil || len(rollbackPoints) != 1 {
|
||||||
t.Fatal(err, len(rollbackPoints))
|
t.Fatal(err, len(rollbackPoints))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,6 +108,21 @@ func TestIndexRollback(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rollbackPointsB, err := sh.RollbackPoints()
|
||||||
|
if err != nil || len(rollbackPointsB) != 3 {
|
||||||
|
t.Fatal(err, len(rollbackPointsB))
|
||||||
|
}
|
||||||
|
|
||||||
|
found := false
|
||||||
|
for _, p := range rollbackPointsB {
|
||||||
|
if rollbackPoint.epoch == p.epoch {
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
t.Fatalf("expected rollbackPoint epoch to still be available")
|
||||||
|
}
|
||||||
|
|
||||||
reader, err := idx.Reader()
|
reader, err := idx.Reader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|
Loading…
Reference in New Issue