scorch cleanup of *.zap files not listed in the rootBolt
This commit is contained in:
parent
c0cc46a2be
commit
b7dff6669f
|
@ -17,8 +17,10 @@ package scorch
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
@ -32,15 +34,12 @@ import (
|
||||||
type notificationChan chan struct{}
|
type notificationChan chan struct{}
|
||||||
|
|
||||||
func (s *Scorch) persisterLoop() {
|
func (s *Scorch) persisterLoop() {
|
||||||
|
s.removeOldData(true)
|
||||||
|
|
||||||
var notify notificationChan
|
var notify notificationChan
|
||||||
var lastPersistedEpoch uint64
|
var lastPersistedEpoch uint64
|
||||||
OUTER:
|
OUTER:
|
||||||
for {
|
for {
|
||||||
err := s.removeOldSnapshots()
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("got err removing old snapshots: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.closeCh:
|
case <-s.closeCh:
|
||||||
break OUTER
|
break OUTER
|
||||||
|
@ -56,7 +55,7 @@ OUTER:
|
||||||
//for ourSnapshot.epoch != lastPersistedEpoch {
|
//for ourSnapshot.epoch != lastPersistedEpoch {
|
||||||
if ourSnapshot.epoch != lastPersistedEpoch {
|
if ourSnapshot.epoch != lastPersistedEpoch {
|
||||||
// lets get started
|
// lets get started
|
||||||
err = s.persistSnapshot(ourSnapshot)
|
err := s.persistSnapshot(ourSnapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("got err persisting snapshot: %v", err)
|
log.Printf("got err persisting snapshot: %v", err)
|
||||||
_ = ourSnapshot.DecRef()
|
_ = ourSnapshot.DecRef()
|
||||||
|
@ -111,6 +110,7 @@ OUTER:
|
||||||
// woken up, next loop should pick up work
|
// woken up, next loop should pick up work
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
s.removeOldData(false)
|
||||||
}
|
}
|
||||||
s.asyncTasks.Done()
|
s.asyncTasks.Done()
|
||||||
}
|
}
|
||||||
|
@ -405,6 +405,20 @@ func (p uint64Descending) Len() int { return len(p) }
|
||||||
func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] }
|
func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] }
|
||||||
func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||||
|
|
||||||
|
func (s *Scorch) removeOldData(force bool) {
|
||||||
|
removed, err := s.removeOldBoltSnapshots()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("got err removing old bolt snapshots: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if force || removed > 0 {
|
||||||
|
err = s.removeOldZapFiles()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("go err removing old zap files: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NumSnapshotsToKeep represents how many recent, old snapshots to
|
// NumSnapshotsToKeep represents how many recent, old snapshots to
|
||||||
// keep around per Scorch instance. Useful for apps that require
|
// keep around per Scorch instance. Useful for apps that require
|
||||||
// rollback'ability.
|
// rollback'ability.
|
||||||
|
@ -412,7 +426,7 @@ var NumSnapshotsToKeep int
|
||||||
|
|
||||||
// Removes enough snapshots from the rootBolt so that the
|
// Removes enough snapshots from the rootBolt so that the
|
||||||
// s.eligibleForRemoval stays under the NumSnapshotsToKeep policy.
|
// s.eligibleForRemoval stays under the NumSnapshotsToKeep policy.
|
||||||
func (s *Scorch) removeOldSnapshots() error {
|
func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) {
|
||||||
var epochsToRemove []uint64
|
var epochsToRemove []uint64
|
||||||
|
|
||||||
s.rootLock.Lock()
|
s.rootLock.Lock()
|
||||||
|
@ -424,12 +438,12 @@ func (s *Scorch) removeOldSnapshots() error {
|
||||||
s.rootLock.Unlock()
|
s.rootLock.Unlock()
|
||||||
|
|
||||||
if len(epochsToRemove) <= 0 {
|
if len(epochsToRemove) <= 0 {
|
||||||
return nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tx, err := s.rootBolt.Begin(true)
|
tx, err := s.rootBolt.Begin(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return 0, err
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -450,8 +464,73 @@ func (s *Scorch) removeOldSnapshots() error {
|
||||||
if err == bolt.ErrBucketNotFound {
|
if err == bolt.ErrBucketNotFound {
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
|
if err == nil {
|
||||||
|
numRemoved++
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return numRemoved, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Removes any *.zap files which aren't listed in the rootBolt.
|
||||||
|
func (s *Scorch) removeOldZapFiles() error {
|
||||||
|
liveFileNames, err := s.loadZapFileNames()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
currFileInfos, err := ioutil.ReadDir(s.path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, finfo := range currFileInfos {
|
||||||
|
fname := finfo.Name()
|
||||||
|
if filepath.Ext(fname) == ".zap" {
|
||||||
|
if _, exists := liveFileNames[fname]; !exists {
|
||||||
|
err := os.Remove(s.path + string(os.PathSeparator) + fname)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("got err removing file: %s, err: %v", fname, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the *.zap file names that are listed in the rootBolt.
|
||||||
|
func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) {
|
||||||
|
rv := map[string]struct{}{}
|
||||||
|
err := s.rootBolt.View(func(tx *bolt.Tx) error {
|
||||||
|
snapshots := tx.Bucket(boltSnapshotsBucket)
|
||||||
|
if snapshots == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
sc := snapshots.Cursor()
|
||||||
|
for sk, _ := sc.First(); sk != nil; sk, _ = sc.Next() {
|
||||||
|
snapshot := snapshots.Bucket(sk)
|
||||||
|
if snapshot == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
segc := snapshot.Cursor()
|
||||||
|
for segk, _ := segc.First(); segk != nil; segk, _ = segc.Next() {
|
||||||
|
if segk[0] == boltInternalKey[0] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
segmentBucket := snapshot.Bucket(segk)
|
||||||
|
if segmentBucket == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pathBytes := segmentBucket.Get(boltPathKey)
|
||||||
|
if pathBytes == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rv[string(pathBytes)] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return rv, err
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user