diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 3a6ce85b..d66bdea0 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -17,8 +17,10 @@ package scorch import ( "bytes" "fmt" + "io/ioutil" "log" "os" + "path/filepath" "sort" "strings" @@ -32,15 +34,12 @@ import ( type notificationChan chan struct{} func (s *Scorch) persisterLoop() { + s.removeOldData(true) + var notify notificationChan var lastPersistedEpoch uint64 OUTER: for { - err := s.removeOldSnapshots() - if err != nil { - log.Printf("got err removing old snapshots: %v", err) - } - select { case <-s.closeCh: break OUTER @@ -56,7 +55,7 @@ OUTER: //for ourSnapshot.epoch != lastPersistedEpoch { if ourSnapshot.epoch != lastPersistedEpoch { // lets get started - err = s.persistSnapshot(ourSnapshot) + err := s.persistSnapshot(ourSnapshot) if err != nil { log.Printf("got err persisting snapshot: %v", err) _ = ourSnapshot.DecRef() @@ -111,6 +110,7 @@ OUTER: // woken up, next loop should pick up work } } + s.removeOldData(false) } s.asyncTasks.Done() } @@ -405,6 +405,20 @@ func (p uint64Descending) Len() int { return len(p) } func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] } func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (s *Scorch) removeOldData(force bool) { + removed, err := s.removeOldBoltSnapshots() + if err != nil { + log.Printf("got err removing old bolt snapshots: %v", err) + } + + if force || removed > 0 { + err = s.removeOldZapFiles() + if err != nil { + log.Printf("go err removing old zap files: %v", err) + } + } +} + // NumSnapshotsToKeep represents how many recent, old snapshots to // keep around per Scorch instance. Useful for apps that require // rollback'ability. @@ -412,7 +426,7 @@ var NumSnapshotsToKeep int // Removes enough snapshots from the rootBolt so that the // s.eligibleForRemoval stays under the NumSnapshotsToKeep policy. -func (s *Scorch) removeOldSnapshots() error { +func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) { var epochsToRemove []uint64 s.rootLock.Lock() @@ -424,12 +438,12 @@ func (s *Scorch) removeOldSnapshots() error { s.rootLock.Unlock() if len(epochsToRemove) <= 0 { - return nil + return 0, nil } tx, err := s.rootBolt.Begin(true) if err != nil { - return err + return 0, err } defer func() { if err == nil { @@ -450,8 +464,73 @@ func (s *Scorch) removeOldSnapshots() error { if err == bolt.ErrBucketNotFound { err = nil } + if err == nil { + numRemoved++ + } } - return err + return numRemoved, err } +// Removes any *.zap files which aren't listed in the rootBolt. +func (s *Scorch) removeOldZapFiles() error { + liveFileNames, err := s.loadZapFileNames() + if err != nil { + return err + } + + currFileInfos, err := ioutil.ReadDir(s.path) + if err != nil { + return err + } + + for _, finfo := range currFileInfos { + fname := finfo.Name() + if filepath.Ext(fname) == ".zap" { + if _, exists := liveFileNames[fname]; !exists { + err := os.Remove(s.path + string(os.PathSeparator) + fname) + if err != nil { + log.Printf("got err removing file: %s, err: %v", fname, err) + } + } + } + } + + return nil +} + +// Returns the *.zap file names that are listed in the rootBolt. +func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) { + rv := map[string]struct{}{} + err := s.rootBolt.View(func(tx *bolt.Tx) error { + snapshots := tx.Bucket(boltSnapshotsBucket) + if snapshots == nil { + return nil + } + sc := snapshots.Cursor() + for sk, _ := sc.First(); sk != nil; sk, _ = sc.Next() { + snapshot := snapshots.Bucket(sk) + if snapshot == nil { + continue + } + segc := snapshot.Cursor() + for segk, _ := segc.First(); segk != nil; segk, _ = segc.Next() { + if segk[0] == boltInternalKey[0] { + continue + } + segmentBucket := snapshot.Bucket(segk) + if segmentBucket == nil { + continue + } + pathBytes := segmentBucket.Get(boltPathKey) + if pathBytes == nil { + continue + } + rv[string(pathBytes)] = struct{}{} + } + } + return nil + }) + + return rv, err +}