From c0cc46a2be80bdf4538d1de27fb8683892f85abf Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 13 Dec 2017 14:54:58 -0800 Subject: [PATCH 1/2] scorch cleanup of the rootBolt of old snapshots A new global variable, NumSnapshotsToKeep, represents the default number of old snapshots that each scorch instance should maintain -- 0 is the default. Apps that need rollback'ability may want to increase this value in early initialization. The Scorch.eligibleForRemoval field tracks epoches which are safe to delete from the rootBolt. The eligibleForRemoval is appended to whenever the ref-count on an IndexSnapshot drops to 0. On startup, eligibleForRemoval is also initialized with any older epoch's found in the rootBolt. The newly introduced Scorch.removeOldSnapshots() method is called on every cycle of the persisterLoop(), where it maintains the eligibleForRemoval slice to under a size defined by the NumSnapshotsToKeep. A future commit will remove actual storage files in order to match the "source of truth" information found in the rootBolt. --- index/scorch/introducer.go | 2 + index/scorch/persister.go | 79 +++++++++++++++++++++++++++++++++- index/scorch/scorch.go | 12 +++++- index/scorch/snapshot_index.go | 4 ++ 4 files changed, 94 insertions(+), 3 deletions(-) diff --git a/index/scorch/introducer.go b/index/scorch/introducer.go index 9402e5a4..a80e5ac1 100644 --- a/index/scorch/introducer.go +++ b/index/scorch/introducer.go @@ -68,6 +68,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { // prepare new index snapshot, with curr size + 1 newSnapshot := &IndexSnapshot{ + parent: s, segment: make([]*SegmentSnapshot, len(s.root.segment)+1), offsets: make([]uint64, len(s.root.segment)+1), internal: make(map[string][]byte, len(s.root.segment)), @@ -155,6 +156,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { currSize := len(s.root.segment) newSize := currSize + 1 - len(nextMerge.old) newSnapshot := &IndexSnapshot{ + parent: s, segment: make([]*SegmentSnapshot, 0, newSize), offsets: make([]uint64, 0, newSize), internal: make(map[string][]byte, len(s.root.segment)), diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 964e1db9..3a6ce85b 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -19,6 +19,7 @@ import ( "fmt" "log" "os" + "sort" "strings" "github.com/RoaringBitmap/roaring" @@ -35,6 +36,11 @@ func (s *Scorch) persisterLoop() { var lastPersistedEpoch uint64 OUTER: for { + err := s.removeOldSnapshots() + if err != nil { + log.Printf("got err removing old snapshots: %v", err) + } + select { case <-s.closeCh: break OUTER @@ -50,7 +56,7 @@ OUTER: //for ourSnapshot.epoch != lastPersistedEpoch { if ourSnapshot.epoch != lastPersistedEpoch { // lets get started - err := s.persistSnapshot(ourSnapshot) + err = s.persistSnapshot(ourSnapshot) if err != nil { log.Printf("got err persisting snapshot: %v", err) _ = ourSnapshot.DecRef() @@ -217,6 +223,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { s.rootLock.Lock() newIndexSnapshot := &IndexSnapshot{ + parent: s, epoch: s.root.epoch, segment: make([]*SegmentSnapshot, len(s.root.segment)), offsets: make([]uint64, len(s.root.offsets)), @@ -275,6 +282,7 @@ func (s *Scorch) loadFromBolt() error { if snapshots == nil { return nil } + foundRoot := false c := snapshots.Cursor() for k, _ := c.Last(); k != nil; k, _ = c.Prev() { _, snapshotEpoch, err := segment.DecodeUvarintAscending(k) @@ -282,14 +290,20 @@ func (s *Scorch) loadFromBolt() error { log.Printf("unable to parse segment epoch %x, continuing", k) continue } + if foundRoot { + s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch) + continue + } snapshot := snapshots.Bucket(k) if snapshot == nil { log.Printf("snapshot key, but bucket missing %x, continuing", k) + s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch) continue } indexSnapshot, err := s.loadSnapshot(snapshot) if err != nil { log.Printf("unable to load snapshot, %v, continuing", err) + s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch) continue } indexSnapshot.epoch = snapshotEpoch @@ -301,8 +315,11 @@ func (s *Scorch) loadFromBolt() error { } s.nextSegmentID++ s.nextSnapshotEpoch = snapshotEpoch + 1 + if s.root != nil { + _ = s.root.DecRef() + } s.root = indexSnapshot - break + foundRoot = true } return nil }) @@ -311,6 +328,7 @@ func (s *Scorch) loadFromBolt() error { func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { rv := &IndexSnapshot{ + parent: s, internal: make(map[string][]byte), refs: 1, } @@ -380,3 +398,60 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro return rv, nil } + +type uint64Descending []uint64 + +func (p uint64Descending) Len() int { return len(p) } +func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] } +func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +// NumSnapshotsToKeep represents how many recent, old snapshots to +// keep around per Scorch instance. Useful for apps that require +// rollback'ability. +var NumSnapshotsToKeep int + +// Removes enough snapshots from the rootBolt so that the +// s.eligibleForRemoval stays under the NumSnapshotsToKeep policy. +func (s *Scorch) removeOldSnapshots() error { + var epochsToRemove []uint64 + + s.rootLock.Lock() + if len(s.eligibleForRemoval) > NumSnapshotsToKeep { + sort.Sort(uint64Descending(s.eligibleForRemoval)) + epochsToRemove = append([]uint64(nil), s.eligibleForRemoval[NumSnapshotsToKeep:]...) // Copy. + s.eligibleForRemoval = s.eligibleForRemoval[0:NumSnapshotsToKeep] + } + s.rootLock.Unlock() + + if len(epochsToRemove) <= 0 { + return nil + } + + tx, err := s.rootBolt.Begin(true) + if err != nil { + return err + } + defer func() { + if err == nil { + err = s.rootBolt.Sync() + } + }() + defer func() { + if err == nil { + err = tx.Commit() + } else { + _ = tx.Rollback() + } + }() + + for _, epochToRemove := range epochsToRemove { + k := segment.EncodeUvarintAscending(nil, epochToRemove) + err = tx.DeleteBucket(k) + if err == bolt.ErrBucketNotFound { + err = nil + } + } + + return err +} + diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index d59563be..06e788ca 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -59,6 +59,8 @@ type Scorch struct { persisterNotifier chan notificationChan rootBolt *bolt.DB asyncTasks sync.WaitGroup + + eligibleForRemoval []uint64 // Index snapshot epoch's that are safe to GC. } func NewScorch(storeName string, config map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) { @@ -67,9 +69,9 @@ func NewScorch(storeName string, config map[string]interface{}, analysisQueue *i config: config, analysisQueue: analysisQueue, stats: &Stats{}, - root: &IndexSnapshot{refs: 1}, nextSnapshotEpoch: 1, } + rv.root = &IndexSnapshot{parent: rv, refs: 1} ro, ok := config["read_only"].(bool) if ok { rv.readOnly = ro @@ -324,6 +326,14 @@ func (s *Scorch) Advanced() (store.KVStore, error) { return nil, nil } +func (s *Scorch) AddEligibleForRemoval(epoch uint64) { + s.rootLock.Lock() + if s.root == nil || s.root.epoch != epoch { + s.eligibleForRemoval = append(s.eligibleForRemoval, epoch) + } + s.rootLock.Unlock() +} + func init() { registry.RegisterIndexType(Name, NewScorch) } diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index 19581f75..6dd77ff4 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -40,6 +40,7 @@ type asynchSegmentResult struct { } type IndexSnapshot struct { + parent *Scorch segment []*SegmentSnapshot offsets []uint64 internal map[string][]byte @@ -67,6 +68,9 @@ func (i *IndexSnapshot) DecRef() (err error) { } } } + if i.parent != nil { + go i.parent.AddEligibleForRemoval(i.epoch) + } } i.m.Unlock() return err From b7dff6669f0770eb5b4115f19d1f973f5f08a58a Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 13 Dec 2017 16:58:36 -0800 Subject: [PATCH 2/2] scorch cleanup of *.zap files not listed in the rootBolt --- index/scorch/persister.go | 99 +++++++++++++++++++++++++++++++++++---- 1 file changed, 89 insertions(+), 10 deletions(-) diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 3a6ce85b..d66bdea0 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -17,8 +17,10 @@ package scorch import ( "bytes" "fmt" + "io/ioutil" "log" "os" + "path/filepath" "sort" "strings" @@ -32,15 +34,12 @@ import ( type notificationChan chan struct{} func (s *Scorch) persisterLoop() { + s.removeOldData(true) + var notify notificationChan var lastPersistedEpoch uint64 OUTER: for { - err := s.removeOldSnapshots() - if err != nil { - log.Printf("got err removing old snapshots: %v", err) - } - select { case <-s.closeCh: break OUTER @@ -56,7 +55,7 @@ OUTER: //for ourSnapshot.epoch != lastPersistedEpoch { if ourSnapshot.epoch != lastPersistedEpoch { // lets get started - err = s.persistSnapshot(ourSnapshot) + err := s.persistSnapshot(ourSnapshot) if err != nil { log.Printf("got err persisting snapshot: %v", err) _ = ourSnapshot.DecRef() @@ -111,6 +110,7 @@ OUTER: // woken up, next loop should pick up work } } + s.removeOldData(false) } s.asyncTasks.Done() } @@ -405,6 +405,20 @@ func (p uint64Descending) Len() int { return len(p) } func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] } func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (s *Scorch) removeOldData(force bool) { + removed, err := s.removeOldBoltSnapshots() + if err != nil { + log.Printf("got err removing old bolt snapshots: %v", err) + } + + if force || removed > 0 { + err = s.removeOldZapFiles() + if err != nil { + log.Printf("go err removing old zap files: %v", err) + } + } +} + // NumSnapshotsToKeep represents how many recent, old snapshots to // keep around per Scorch instance. Useful for apps that require // rollback'ability. @@ -412,7 +426,7 @@ var NumSnapshotsToKeep int // Removes enough snapshots from the rootBolt so that the // s.eligibleForRemoval stays under the NumSnapshotsToKeep policy. -func (s *Scorch) removeOldSnapshots() error { +func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) { var epochsToRemove []uint64 s.rootLock.Lock() @@ -424,12 +438,12 @@ func (s *Scorch) removeOldSnapshots() error { s.rootLock.Unlock() if len(epochsToRemove) <= 0 { - return nil + return 0, nil } tx, err := s.rootBolt.Begin(true) if err != nil { - return err + return 0, err } defer func() { if err == nil { @@ -450,8 +464,73 @@ func (s *Scorch) removeOldSnapshots() error { if err == bolt.ErrBucketNotFound { err = nil } + if err == nil { + numRemoved++ + } } - return err + return numRemoved, err } +// Removes any *.zap files which aren't listed in the rootBolt. +func (s *Scorch) removeOldZapFiles() error { + liveFileNames, err := s.loadZapFileNames() + if err != nil { + return err + } + + currFileInfos, err := ioutil.ReadDir(s.path) + if err != nil { + return err + } + + for _, finfo := range currFileInfos { + fname := finfo.Name() + if filepath.Ext(fname) == ".zap" { + if _, exists := liveFileNames[fname]; !exists { + err := os.Remove(s.path + string(os.PathSeparator) + fname) + if err != nil { + log.Printf("got err removing file: %s, err: %v", fname, err) + } + } + } + } + + return nil +} + +// Returns the *.zap file names that are listed in the rootBolt. +func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) { + rv := map[string]struct{}{} + err := s.rootBolt.View(func(tx *bolt.Tx) error { + snapshots := tx.Bucket(boltSnapshotsBucket) + if snapshots == nil { + return nil + } + sc := snapshots.Cursor() + for sk, _ := sc.First(); sk != nil; sk, _ = sc.Next() { + snapshot := snapshots.Bucket(sk) + if snapshot == nil { + continue + } + segc := snapshot.Cursor() + for segk, _ := segc.First(); segk != nil; segk, _ = segc.Next() { + if segk[0] == boltInternalKey[0] { + continue + } + segmentBucket := snapshot.Bucket(segk) + if segmentBucket == nil { + continue + } + pathBytes := segmentBucket.Get(boltPathKey) + if pathBytes == nil { + continue + } + rv[string(pathBytes)] = struct{}{} + } + } + return nil + }) + + return rv, err +}