diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 08fffa25..77f982a3 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -114,6 +114,25 @@ func (s *Scorch) fireAsyncError(err error) { } func (s *Scorch) Open() error { + err := s.openBolt() + if err != nil { + return err + } + + s.asyncTasks.Add(1) + go s.mainLoop() + + if !s.readOnly && s.path != "" { + s.asyncTasks.Add(1) + go s.persisterLoop() + s.asyncTasks.Add(1) + go s.mergerLoop() + } + + return nil +} + +func (s *Scorch) openBolt() error { var ok bool s.path, ok = s.config["path"].(string) if !ok { @@ -136,6 +155,7 @@ func (s *Scorch) Open() error { } } } + rootBoltPath := s.path + string(os.PathSeparator) + "root.bolt" var err error if s.path != "" { @@ -166,16 +186,6 @@ func (s *Scorch) Open() error { } } - s.asyncTasks.Add(1) - go s.mainLoop() - - if !s.readOnly && s.path != "" { - s.asyncTasks.Add(1) - go s.persisterLoop() - s.asyncTasks.Add(1) - go s.mergerLoop() - } - return nil } diff --git a/index/scorch/snapshot_rollback_test.go b/index/scorch/snapshot_rollback_test.go index 9816a51e..42d90824 100644 --- a/index/scorch/snapshot_rollback_test.go +++ b/index/scorch/snapshot_rollback_test.go @@ -22,7 +22,12 @@ import ( ) func TestIndexRollback(t *testing.T) { + numSnapshotsToKeepOrig := NumSnapshotsToKeep + NumSnapshotsToKeep = 1000 + defer func() { + NumSnapshotsToKeep = numSnapshotsToKeepOrig + err := DestroyTest() if err != nil { t.Fatal(err) @@ -34,10 +39,6 @@ func TestIndexRollback(t *testing.T) { if err != nil { t.Fatal(err) } - err = idx.Open() - if err != nil { - t.Fatalf("error opening index: %v", err) - } defer func() { err := idx.Close() if err != nil { @@ -45,6 +46,22 @@ func TestIndexRollback(t *testing.T) { } }() + sh, ok := idx.(*Scorch) + if !ok { + t.Fatalf("Not a scorch index?") + } + + err = sh.openBolt() + if err != nil { + t.Fatalf("error opening index: %v", err) + } + + // start background goroutines except for the merger, which + // simulates a super slow merger + sh.asyncTasks.Add(2) + go sh.mainLoop() + go sh.persisterLoop() + // create a batch, insert 2 new documents batch := index.NewBatch() doc := document.NewDocument("1") @@ -59,14 +76,17 @@ func TestIndexRollback(t *testing.T) { t.Fatal(err) } - sh, ok := idx.(*Scorch) - if !ok { - t.Fatalf("Not a scorch index?") + readerSlow, err := idx.Reader() // keep snapshot around so it's not cleaned up + if err != nil { + t.Fatal(err) } + defer func() { + _ = readerSlow.Close() + }() // fetch rollback points available as of here rollbackPoints, err := sh.RollbackPoints() - if err != nil || len(rollbackPoints) == 0 { + if err != nil || len(rollbackPoints) != 1 { t.Fatal(err, len(rollbackPoints)) } @@ -88,6 +108,21 @@ func TestIndexRollback(t *testing.T) { t.Fatal(err) } + rollbackPointsB, err := sh.RollbackPoints() + if err != nil || len(rollbackPointsB) != 3 { + t.Fatal(err, len(rollbackPointsB)) + } + + found := false + for _, p := range rollbackPointsB { + if rollbackPoint.epoch == p.epoch { + found = true + } + } + if !found { + t.Fatalf("expected rollbackPoint epoch to still be available") + } + reader, err := idx.Reader() if err != nil { t.Fatal(err)