Merge pull request #653 from steveyen/scorch
scorch cleanup of the rootBolt of old snapshots
This commit is contained in:
commit
2b92e5ff99
|
@ -68,6 +68,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
||||||
|
|
||||||
// prepare new index snapshot, with curr size + 1
|
// prepare new index snapshot, with curr size + 1
|
||||||
newSnapshot := &IndexSnapshot{
|
newSnapshot := &IndexSnapshot{
|
||||||
|
parent: s,
|
||||||
segment: make([]*SegmentSnapshot, len(s.root.segment)+1),
|
segment: make([]*SegmentSnapshot, len(s.root.segment)+1),
|
||||||
offsets: make([]uint64, len(s.root.segment)+1),
|
offsets: make([]uint64, len(s.root.segment)+1),
|
||||||
internal: make(map[string][]byte, len(s.root.segment)),
|
internal: make(map[string][]byte, len(s.root.segment)),
|
||||||
|
@ -155,6 +156,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
||||||
currSize := len(s.root.segment)
|
currSize := len(s.root.segment)
|
||||||
newSize := currSize + 1 - len(nextMerge.old)
|
newSize := currSize + 1 - len(nextMerge.old)
|
||||||
newSnapshot := &IndexSnapshot{
|
newSnapshot := &IndexSnapshot{
|
||||||
|
parent: s,
|
||||||
segment: make([]*SegmentSnapshot, 0, newSize),
|
segment: make([]*SegmentSnapshot, 0, newSize),
|
||||||
offsets: make([]uint64, 0, newSize),
|
offsets: make([]uint64, 0, newSize),
|
||||||
internal: make(map[string][]byte, len(s.root.segment)),
|
internal: make(map[string][]byte, len(s.root.segment)),
|
||||||
|
|
|
@ -17,8 +17,11 @@ package scorch
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/RoaringBitmap/roaring"
|
"github.com/RoaringBitmap/roaring"
|
||||||
|
@ -31,6 +34,8 @@ import (
|
||||||
type notificationChan chan struct{}
|
type notificationChan chan struct{}
|
||||||
|
|
||||||
func (s *Scorch) persisterLoop() {
|
func (s *Scorch) persisterLoop() {
|
||||||
|
s.removeOldData(true)
|
||||||
|
|
||||||
var notify notificationChan
|
var notify notificationChan
|
||||||
var lastPersistedEpoch uint64
|
var lastPersistedEpoch uint64
|
||||||
OUTER:
|
OUTER:
|
||||||
|
@ -105,6 +110,7 @@ OUTER:
|
||||||
// woken up, next loop should pick up work
|
// woken up, next loop should pick up work
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
s.removeOldData(false)
|
||||||
}
|
}
|
||||||
s.asyncTasks.Done()
|
s.asyncTasks.Done()
|
||||||
}
|
}
|
||||||
|
@ -217,6 +223,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
|
||||||
|
|
||||||
s.rootLock.Lock()
|
s.rootLock.Lock()
|
||||||
newIndexSnapshot := &IndexSnapshot{
|
newIndexSnapshot := &IndexSnapshot{
|
||||||
|
parent: s,
|
||||||
epoch: s.root.epoch,
|
epoch: s.root.epoch,
|
||||||
segment: make([]*SegmentSnapshot, len(s.root.segment)),
|
segment: make([]*SegmentSnapshot, len(s.root.segment)),
|
||||||
offsets: make([]uint64, len(s.root.offsets)),
|
offsets: make([]uint64, len(s.root.offsets)),
|
||||||
|
@ -275,6 +282,7 @@ func (s *Scorch) loadFromBolt() error {
|
||||||
if snapshots == nil {
|
if snapshots == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
foundRoot := false
|
||||||
c := snapshots.Cursor()
|
c := snapshots.Cursor()
|
||||||
for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
|
for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
|
||||||
_, snapshotEpoch, err := segment.DecodeUvarintAscending(k)
|
_, snapshotEpoch, err := segment.DecodeUvarintAscending(k)
|
||||||
|
@ -282,14 +290,20 @@ func (s *Scorch) loadFromBolt() error {
|
||||||
log.Printf("unable to parse segment epoch %x, continuing", k)
|
log.Printf("unable to parse segment epoch %x, continuing", k)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if foundRoot {
|
||||||
|
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
|
||||||
|
continue
|
||||||
|
}
|
||||||
snapshot := snapshots.Bucket(k)
|
snapshot := snapshots.Bucket(k)
|
||||||
if snapshot == nil {
|
if snapshot == nil {
|
||||||
log.Printf("snapshot key, but bucket missing %x, continuing", k)
|
log.Printf("snapshot key, but bucket missing %x, continuing", k)
|
||||||
|
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
indexSnapshot, err := s.loadSnapshot(snapshot)
|
indexSnapshot, err := s.loadSnapshot(snapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("unable to load snapshot, %v, continuing", err)
|
log.Printf("unable to load snapshot, %v, continuing", err)
|
||||||
|
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
indexSnapshot.epoch = snapshotEpoch
|
indexSnapshot.epoch = snapshotEpoch
|
||||||
|
@ -301,8 +315,11 @@ func (s *Scorch) loadFromBolt() error {
|
||||||
}
|
}
|
||||||
s.nextSegmentID++
|
s.nextSegmentID++
|
||||||
s.nextSnapshotEpoch = snapshotEpoch + 1
|
s.nextSnapshotEpoch = snapshotEpoch + 1
|
||||||
|
if s.root != nil {
|
||||||
|
_ = s.root.DecRef()
|
||||||
|
}
|
||||||
s.root = indexSnapshot
|
s.root = indexSnapshot
|
||||||
break
|
foundRoot = true
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -311,6 +328,7 @@ func (s *Scorch) loadFromBolt() error {
|
||||||
func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
|
func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
|
||||||
|
|
||||||
rv := &IndexSnapshot{
|
rv := &IndexSnapshot{
|
||||||
|
parent: s,
|
||||||
internal: make(map[string][]byte),
|
internal: make(map[string][]byte),
|
||||||
refs: 1,
|
refs: 1,
|
||||||
}
|
}
|
||||||
|
@ -380,3 +398,139 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro
|
||||||
|
|
||||||
return rv, nil
|
return rv, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type uint64Descending []uint64
|
||||||
|
|
||||||
|
func (p uint64Descending) Len() int { return len(p) }
|
||||||
|
func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] }
|
||||||
|
func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||||
|
|
||||||
|
func (s *Scorch) removeOldData(force bool) {
|
||||||
|
removed, err := s.removeOldBoltSnapshots()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("got err removing old bolt snapshots: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if force || removed > 0 {
|
||||||
|
err = s.removeOldZapFiles()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("go err removing old zap files: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NumSnapshotsToKeep represents how many recent, old snapshots to
|
||||||
|
// keep around per Scorch instance. Useful for apps that require
|
||||||
|
// rollback'ability.
|
||||||
|
var NumSnapshotsToKeep int
|
||||||
|
|
||||||
|
// Removes enough snapshots from the rootBolt so that the
|
||||||
|
// s.eligibleForRemoval stays under the NumSnapshotsToKeep policy.
|
||||||
|
func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) {
|
||||||
|
var epochsToRemove []uint64
|
||||||
|
|
||||||
|
s.rootLock.Lock()
|
||||||
|
if len(s.eligibleForRemoval) > NumSnapshotsToKeep {
|
||||||
|
sort.Sort(uint64Descending(s.eligibleForRemoval))
|
||||||
|
epochsToRemove = append([]uint64(nil), s.eligibleForRemoval[NumSnapshotsToKeep:]...) // Copy.
|
||||||
|
s.eligibleForRemoval = s.eligibleForRemoval[0:NumSnapshotsToKeep]
|
||||||
|
}
|
||||||
|
s.rootLock.Unlock()
|
||||||
|
|
||||||
|
if len(epochsToRemove) <= 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
tx, err := s.rootBolt.Begin(true)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err == nil {
|
||||||
|
err = s.rootBolt.Sync()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
defer func() {
|
||||||
|
if err == nil {
|
||||||
|
err = tx.Commit()
|
||||||
|
} else {
|
||||||
|
_ = tx.Rollback()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for _, epochToRemove := range epochsToRemove {
|
||||||
|
k := segment.EncodeUvarintAscending(nil, epochToRemove)
|
||||||
|
err = tx.DeleteBucket(k)
|
||||||
|
if err == bolt.ErrBucketNotFound {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
numRemoved++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return numRemoved, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Removes any *.zap files which aren't listed in the rootBolt.
|
||||||
|
func (s *Scorch) removeOldZapFiles() error {
|
||||||
|
liveFileNames, err := s.loadZapFileNames()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
currFileInfos, err := ioutil.ReadDir(s.path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, finfo := range currFileInfos {
|
||||||
|
fname := finfo.Name()
|
||||||
|
if filepath.Ext(fname) == ".zap" {
|
||||||
|
if _, exists := liveFileNames[fname]; !exists {
|
||||||
|
err := os.Remove(s.path + string(os.PathSeparator) + fname)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("got err removing file: %s, err: %v", fname, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the *.zap file names that are listed in the rootBolt.
|
||||||
|
func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) {
|
||||||
|
rv := map[string]struct{}{}
|
||||||
|
err := s.rootBolt.View(func(tx *bolt.Tx) error {
|
||||||
|
snapshots := tx.Bucket(boltSnapshotsBucket)
|
||||||
|
if snapshots == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
sc := snapshots.Cursor()
|
||||||
|
for sk, _ := sc.First(); sk != nil; sk, _ = sc.Next() {
|
||||||
|
snapshot := snapshots.Bucket(sk)
|
||||||
|
if snapshot == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
segc := snapshot.Cursor()
|
||||||
|
for segk, _ := segc.First(); segk != nil; segk, _ = segc.Next() {
|
||||||
|
if segk[0] == boltInternalKey[0] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
segmentBucket := snapshot.Bucket(segk)
|
||||||
|
if segmentBucket == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pathBytes := segmentBucket.Get(boltPathKey)
|
||||||
|
if pathBytes == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rv[string(pathBytes)] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return rv, err
|
||||||
|
}
|
||||||
|
|
|
@ -59,6 +59,8 @@ type Scorch struct {
|
||||||
persisterNotifier chan notificationChan
|
persisterNotifier chan notificationChan
|
||||||
rootBolt *bolt.DB
|
rootBolt *bolt.DB
|
||||||
asyncTasks sync.WaitGroup
|
asyncTasks sync.WaitGroup
|
||||||
|
|
||||||
|
eligibleForRemoval []uint64 // Index snapshot epoch's that are safe to GC.
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewScorch(storeName string, config map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
|
func NewScorch(storeName string, config map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
|
||||||
|
@ -67,9 +69,9 @@ func NewScorch(storeName string, config map[string]interface{}, analysisQueue *i
|
||||||
config: config,
|
config: config,
|
||||||
analysisQueue: analysisQueue,
|
analysisQueue: analysisQueue,
|
||||||
stats: &Stats{},
|
stats: &Stats{},
|
||||||
root: &IndexSnapshot{refs: 1},
|
|
||||||
nextSnapshotEpoch: 1,
|
nextSnapshotEpoch: 1,
|
||||||
}
|
}
|
||||||
|
rv.root = &IndexSnapshot{parent: rv, refs: 1}
|
||||||
ro, ok := config["read_only"].(bool)
|
ro, ok := config["read_only"].(bool)
|
||||||
if ok {
|
if ok {
|
||||||
rv.readOnly = ro
|
rv.readOnly = ro
|
||||||
|
@ -324,6 +326,14 @@ func (s *Scorch) Advanced() (store.KVStore, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Scorch) AddEligibleForRemoval(epoch uint64) {
|
||||||
|
s.rootLock.Lock()
|
||||||
|
if s.root == nil || s.root.epoch != epoch {
|
||||||
|
s.eligibleForRemoval = append(s.eligibleForRemoval, epoch)
|
||||||
|
}
|
||||||
|
s.rootLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
registry.RegisterIndexType(Name, NewScorch)
|
registry.RegisterIndexType(Name, NewScorch)
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ type asynchSegmentResult struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type IndexSnapshot struct {
|
type IndexSnapshot struct {
|
||||||
|
parent *Scorch
|
||||||
segment []*SegmentSnapshot
|
segment []*SegmentSnapshot
|
||||||
offsets []uint64
|
offsets []uint64
|
||||||
internal map[string][]byte
|
internal map[string][]byte
|
||||||
|
@ -67,6 +68,9 @@ func (i *IndexSnapshot) DecRef() (err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if i.parent != nil {
|
||||||
|
go i.parent.AddEligibleForRemoval(i.epoch)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
i.m.Unlock()
|
i.m.Unlock()
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in New Issue
Block a user