diff --git a/index/scorch/introducer.go b/index/scorch/introducer.go index 9402e5a4..a80e5ac1 100644 --- a/index/scorch/introducer.go +++ b/index/scorch/introducer.go @@ -68,6 +68,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { // prepare new index snapshot, with curr size + 1 newSnapshot := &IndexSnapshot{ + parent: s, segment: make([]*SegmentSnapshot, len(s.root.segment)+1), offsets: make([]uint64, len(s.root.segment)+1), internal: make(map[string][]byte, len(s.root.segment)), @@ -155,6 +156,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { currSize := len(s.root.segment) newSize := currSize + 1 - len(nextMerge.old) newSnapshot := &IndexSnapshot{ + parent: s, segment: make([]*SegmentSnapshot, 0, newSize), offsets: make([]uint64, 0, newSize), internal: make(map[string][]byte, len(s.root.segment)), diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 964e1db9..d66bdea0 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -17,8 +17,11 @@ package scorch import ( "bytes" "fmt" + "io/ioutil" "log" "os" + "path/filepath" + "sort" "strings" "github.com/RoaringBitmap/roaring" @@ -31,6 +34,8 @@ import ( type notificationChan chan struct{} func (s *Scorch) persisterLoop() { + s.removeOldData(true) + var notify notificationChan var lastPersistedEpoch uint64 OUTER: @@ -105,6 +110,7 @@ OUTER: // woken up, next loop should pick up work } } + s.removeOldData(false) } s.asyncTasks.Done() } @@ -217,6 +223,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { s.rootLock.Lock() newIndexSnapshot := &IndexSnapshot{ + parent: s, epoch: s.root.epoch, segment: make([]*SegmentSnapshot, len(s.root.segment)), offsets: make([]uint64, len(s.root.offsets)), @@ -275,6 +282,7 @@ func (s *Scorch) loadFromBolt() error { if snapshots == nil { return nil } + foundRoot := false c := snapshots.Cursor() for k, _ := c.Last(); k != nil; k, _ = c.Prev() { _, snapshotEpoch, err := segment.DecodeUvarintAscending(k) @@ -282,14 +290,20 @@ func (s *Scorch) loadFromBolt() error { log.Printf("unable to parse segment epoch %x, continuing", k) continue } + if foundRoot { + s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch) + continue + } snapshot := snapshots.Bucket(k) if snapshot == nil { log.Printf("snapshot key, but bucket missing %x, continuing", k) + s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch) continue } indexSnapshot, err := s.loadSnapshot(snapshot) if err != nil { log.Printf("unable to load snapshot, %v, continuing", err) + s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch) continue } indexSnapshot.epoch = snapshotEpoch @@ -301,8 +315,11 @@ func (s *Scorch) loadFromBolt() error { } s.nextSegmentID++ s.nextSnapshotEpoch = snapshotEpoch + 1 + if s.root != nil { + _ = s.root.DecRef() + } s.root = indexSnapshot - break + foundRoot = true } return nil }) @@ -311,6 +328,7 @@ func (s *Scorch) loadFromBolt() error { func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { rv := &IndexSnapshot{ + parent: s, internal: make(map[string][]byte), refs: 1, } @@ -380,3 +398,139 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro return rv, nil } + +type uint64Descending []uint64 + +func (p uint64Descending) Len() int { return len(p) } +func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] } +func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func (s *Scorch) removeOldData(force bool) { + removed, err := s.removeOldBoltSnapshots() + if err != nil { + log.Printf("got err removing old bolt snapshots: %v", err) + } + + if force || removed > 0 { + err = s.removeOldZapFiles() + if err != nil { + log.Printf("go err removing old zap files: %v", err) + } + } +} + +// NumSnapshotsToKeep represents how many recent, old snapshots to +// keep around per Scorch instance. Useful for apps that require +// rollback'ability. +var NumSnapshotsToKeep int + +// Removes enough snapshots from the rootBolt so that the +// s.eligibleForRemoval stays under the NumSnapshotsToKeep policy. +func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) { + var epochsToRemove []uint64 + + s.rootLock.Lock() + if len(s.eligibleForRemoval) > NumSnapshotsToKeep { + sort.Sort(uint64Descending(s.eligibleForRemoval)) + epochsToRemove = append([]uint64(nil), s.eligibleForRemoval[NumSnapshotsToKeep:]...) // Copy. + s.eligibleForRemoval = s.eligibleForRemoval[0:NumSnapshotsToKeep] + } + s.rootLock.Unlock() + + if len(epochsToRemove) <= 0 { + return 0, nil + } + + tx, err := s.rootBolt.Begin(true) + if err != nil { + return 0, err + } + defer func() { + if err == nil { + err = s.rootBolt.Sync() + } + }() + defer func() { + if err == nil { + err = tx.Commit() + } else { + _ = tx.Rollback() + } + }() + + for _, epochToRemove := range epochsToRemove { + k := segment.EncodeUvarintAscending(nil, epochToRemove) + err = tx.DeleteBucket(k) + if err == bolt.ErrBucketNotFound { + err = nil + } + if err == nil { + numRemoved++ + } + } + + return numRemoved, err +} + +// Removes any *.zap files which aren't listed in the rootBolt. +func (s *Scorch) removeOldZapFiles() error { + liveFileNames, err := s.loadZapFileNames() + if err != nil { + return err + } + + currFileInfos, err := ioutil.ReadDir(s.path) + if err != nil { + return err + } + + for _, finfo := range currFileInfos { + fname := finfo.Name() + if filepath.Ext(fname) == ".zap" { + if _, exists := liveFileNames[fname]; !exists { + err := os.Remove(s.path + string(os.PathSeparator) + fname) + if err != nil { + log.Printf("got err removing file: %s, err: %v", fname, err) + } + } + } + } + + return nil +} + +// Returns the *.zap file names that are listed in the rootBolt. +func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) { + rv := map[string]struct{}{} + err := s.rootBolt.View(func(tx *bolt.Tx) error { + snapshots := tx.Bucket(boltSnapshotsBucket) + if snapshots == nil { + return nil + } + sc := snapshots.Cursor() + for sk, _ := sc.First(); sk != nil; sk, _ = sc.Next() { + snapshot := snapshots.Bucket(sk) + if snapshot == nil { + continue + } + segc := snapshot.Cursor() + for segk, _ := segc.First(); segk != nil; segk, _ = segc.Next() { + if segk[0] == boltInternalKey[0] { + continue + } + segmentBucket := snapshot.Bucket(segk) + if segmentBucket == nil { + continue + } + pathBytes := segmentBucket.Get(boltPathKey) + if pathBytes == nil { + continue + } + rv[string(pathBytes)] = struct{}{} + } + } + return nil + }) + + return rv, err +} diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index d59563be..06e788ca 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -59,6 +59,8 @@ type Scorch struct { persisterNotifier chan notificationChan rootBolt *bolt.DB asyncTasks sync.WaitGroup + + eligibleForRemoval []uint64 // Index snapshot epoch's that are safe to GC. } func NewScorch(storeName string, config map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) { @@ -67,9 +69,9 @@ func NewScorch(storeName string, config map[string]interface{}, analysisQueue *i config: config, analysisQueue: analysisQueue, stats: &Stats{}, - root: &IndexSnapshot{refs: 1}, nextSnapshotEpoch: 1, } + rv.root = &IndexSnapshot{parent: rv, refs: 1} ro, ok := config["read_only"].(bool) if ok { rv.readOnly = ro @@ -324,6 +326,14 @@ func (s *Scorch) Advanced() (store.KVStore, error) { return nil, nil } +func (s *Scorch) AddEligibleForRemoval(epoch uint64) { + s.rootLock.Lock() + if s.root == nil || s.root.epoch != epoch { + s.eligibleForRemoval = append(s.eligibleForRemoval, epoch) + } + s.rootLock.Unlock() +} + func init() { registry.RegisterIndexType(Name, NewScorch) } diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index 19581f75..6dd77ff4 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -40,6 +40,7 @@ type asynchSegmentResult struct { } type IndexSnapshot struct { + parent *Scorch segment []*SegmentSnapshot offsets []uint64 internal map[string][]byte @@ -67,6 +68,9 @@ func (i *IndexSnapshot) DecRef() (err error) { } } } + if i.parent != nil { + go i.parent.AddEligibleForRemoval(i.epoch) + } } i.m.Unlock() return err