a5253bfe2b
This change allows the introducer to become the only goroutine to modify the root, which in turn allows the introducer to greatly reduce its root lock holding surface area.
809 lines
20 KiB
Go
809 lines
20 KiB
Go
// Copyright (c) 2017 Couchbase, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package scorch
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/RoaringBitmap/roaring"
|
|
"github.com/blevesearch/bleve/index/scorch/segment"
|
|
"github.com/blevesearch/bleve/index/scorch/segment/zap"
|
|
"github.com/boltdb/bolt"
|
|
)
|
|
|
|
var DefaultChunkFactor uint32 = 1024
|
|
|
|
// Arbitrary number, need to make it configurable.
|
|
// Lower values like 10/making persister really slow
|
|
// doesn't work well as it is creating more files to
|
|
// persist for in next persist iteration and spikes the # FDs.
|
|
// Ideal value should let persister also proceed at
|
|
// an optimum pace so that the merger can skip
|
|
// many intermediate snapshots.
|
|
// This needs to be based on empirical data.
|
|
// TODO - may need to revisit this approach/value.
|
|
var epochDistance = uint64(5)
|
|
|
|
type notificationChan chan struct{}
|
|
|
|
func (s *Scorch) persisterLoop() {
|
|
defer s.asyncTasks.Done()
|
|
|
|
var persistWatchers []*epochWatcher
|
|
var lastPersistedEpoch, lastMergedEpoch uint64
|
|
var ew *epochWatcher
|
|
OUTER:
|
|
for {
|
|
atomic.AddUint64(&s.stats.TotPersistLoopBeg, 1)
|
|
|
|
select {
|
|
case <-s.closeCh:
|
|
break OUTER
|
|
case ew = <-s.persisterNotifier:
|
|
persistWatchers = append(persistWatchers, ew)
|
|
default:
|
|
}
|
|
if ew != nil && ew.epoch > lastMergedEpoch {
|
|
lastMergedEpoch = ew.epoch
|
|
}
|
|
lastMergedEpoch, persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch,
|
|
lastMergedEpoch, persistWatchers)
|
|
|
|
var ourSnapshot *IndexSnapshot
|
|
var ourPersisted []chan error
|
|
|
|
// check to see if there is a new snapshot to persist
|
|
s.rootLock.Lock()
|
|
if s.root != nil && s.root.epoch > lastPersistedEpoch {
|
|
ourSnapshot = s.root
|
|
ourSnapshot.AddRef()
|
|
ourPersisted = s.rootPersisted
|
|
s.rootPersisted = nil
|
|
}
|
|
s.rootLock.Unlock()
|
|
|
|
if ourSnapshot != nil {
|
|
startTime := time.Now()
|
|
|
|
err := s.persistSnapshot(ourSnapshot)
|
|
for _, ch := range ourPersisted {
|
|
if err != nil {
|
|
ch <- err
|
|
}
|
|
close(ch)
|
|
}
|
|
if err != nil {
|
|
s.fireAsyncError(fmt.Errorf("got err persisting snapshot: %v", err))
|
|
_ = ourSnapshot.DecRef()
|
|
atomic.AddUint64(&s.stats.TotPersistLoopErr, 1)
|
|
continue OUTER
|
|
}
|
|
|
|
lastPersistedEpoch = ourSnapshot.epoch
|
|
for _, ew := range persistWatchers {
|
|
close(ew.notifyCh)
|
|
}
|
|
|
|
persistWatchers = nil
|
|
_ = ourSnapshot.DecRef()
|
|
|
|
changed := false
|
|
s.rootLock.RLock()
|
|
if s.root != nil && s.root.epoch != lastPersistedEpoch {
|
|
changed = true
|
|
}
|
|
s.rootLock.RUnlock()
|
|
|
|
s.fireEvent(EventKindPersisterProgress, time.Since(startTime))
|
|
|
|
if changed {
|
|
atomic.AddUint64(&s.stats.TotPersistLoopProgress, 1)
|
|
continue OUTER
|
|
}
|
|
}
|
|
|
|
// tell the introducer we're waiting for changes
|
|
w := &epochWatcher{
|
|
epoch: lastPersistedEpoch,
|
|
notifyCh: make(notificationChan, 1),
|
|
}
|
|
|
|
select {
|
|
case <-s.closeCh:
|
|
break OUTER
|
|
case s.introducerNotifier <- w:
|
|
}
|
|
|
|
s.removeOldData() // might as well cleanup while waiting
|
|
|
|
atomic.AddUint64(&s.stats.TotPersistLoopWait, 1)
|
|
|
|
select {
|
|
case <-s.closeCh:
|
|
break OUTER
|
|
case <-w.notifyCh:
|
|
// woken up, next loop should pick up work
|
|
atomic.AddUint64(&s.stats.TotPersistLoopWaitNotified, 1)
|
|
case ew = <-s.persisterNotifier:
|
|
// if the watchers are already caught up then let them wait,
|
|
// else let them continue to do the catch up
|
|
persistWatchers = append(persistWatchers, ew)
|
|
}
|
|
|
|
atomic.AddUint64(&s.stats.TotPersistLoopEnd, 1)
|
|
}
|
|
}
|
|
|
|
func notifyMergeWatchers(lastPersistedEpoch uint64,
|
|
persistWatchers []*epochWatcher) []*epochWatcher {
|
|
var watchersNext []*epochWatcher
|
|
for _, w := range persistWatchers {
|
|
if w.epoch < lastPersistedEpoch {
|
|
close(w.notifyCh)
|
|
} else {
|
|
watchersNext = append(watchersNext, w)
|
|
}
|
|
}
|
|
return watchersNext
|
|
}
|
|
|
|
func (s *Scorch) pausePersisterForMergerCatchUp(lastPersistedEpoch uint64, lastMergedEpoch uint64,
|
|
persistWatchers []*epochWatcher) (uint64, []*epochWatcher) {
|
|
|
|
// first, let the watchers proceed if they lag behind
|
|
persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers)
|
|
|
|
OUTER:
|
|
// check for slow merger and await until the merger catch up
|
|
for lastPersistedEpoch > lastMergedEpoch+epochDistance {
|
|
atomic.AddUint64(&s.stats.TotPersisterSlowMergerPause, 1)
|
|
|
|
select {
|
|
case <-s.closeCh:
|
|
break OUTER
|
|
case ew := <-s.persisterNotifier:
|
|
persistWatchers = append(persistWatchers, ew)
|
|
lastMergedEpoch = ew.epoch
|
|
}
|
|
|
|
atomic.AddUint64(&s.stats.TotPersisterSlowMergerResume, 1)
|
|
|
|
// let the watchers proceed if they lag behind
|
|
persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers)
|
|
}
|
|
|
|
return lastMergedEpoch, persistWatchers
|
|
}
|
|
|
|
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
|
|
persisted, err := s.persistSnapshotMaybeMerge(snapshot)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if persisted {
|
|
return nil
|
|
}
|
|
|
|
return s.persistSnapshotDirect(snapshot)
|
|
}
|
|
|
|
// DefaultMinSegmentsForInMemoryMerge represents the default number of
|
|
// in-memory zap segments that persistSnapshotMaybeMerge() needs to
|
|
// see in an IndexSnapshot before it decides to merge and persist
|
|
// those segments
|
|
var DefaultMinSegmentsForInMemoryMerge = 2
|
|
|
|
// persistSnapshotMaybeMerge examines the snapshot and might merge and
|
|
// persist the in-memory zap segments if there are enough of them
|
|
func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
|
|
bool, error) {
|
|
// collect the in-memory zap segments (SegmentBase instances)
|
|
var sbs []*zap.SegmentBase
|
|
var sbsDrops []*roaring.Bitmap
|
|
var sbsIndexes []int
|
|
|
|
for i, segmentSnapshot := range snapshot.segment {
|
|
if sb, ok := segmentSnapshot.segment.(*zap.SegmentBase); ok {
|
|
sbs = append(sbs, sb)
|
|
sbsDrops = append(sbsDrops, segmentSnapshot.deleted)
|
|
sbsIndexes = append(sbsIndexes, i)
|
|
}
|
|
}
|
|
|
|
if len(sbs) < DefaultMinSegmentsForInMemoryMerge {
|
|
return false, nil
|
|
}
|
|
|
|
_, newSnapshot, newSegmentID, err := s.mergeSegmentBases(
|
|
snapshot, sbs, sbsDrops, sbsIndexes, DefaultChunkFactor)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if newSnapshot == nil {
|
|
return false, nil
|
|
}
|
|
|
|
defer func() {
|
|
_ = newSnapshot.DecRef()
|
|
}()
|
|
|
|
mergedSegmentIDs := map[uint64]struct{}{}
|
|
for _, idx := range sbsIndexes {
|
|
mergedSegmentIDs[snapshot.segment[idx].id] = struct{}{}
|
|
}
|
|
|
|
// construct a snapshot that's logically equivalent to the input
|
|
// snapshot, but with merged segments replaced by the new segment
|
|
equiv := &IndexSnapshot{
|
|
parent: snapshot.parent,
|
|
segment: make([]*SegmentSnapshot, 0, len(snapshot.segment)),
|
|
internal: snapshot.internal,
|
|
epoch: snapshot.epoch,
|
|
}
|
|
|
|
// copy to the equiv the segments that weren't replaced
|
|
for _, segment := range snapshot.segment {
|
|
if _, wasMerged := mergedSegmentIDs[segment.id]; !wasMerged {
|
|
equiv.segment = append(equiv.segment, segment)
|
|
}
|
|
}
|
|
|
|
// append to the equiv the new segment
|
|
for _, segment := range newSnapshot.segment {
|
|
if segment.id == newSegmentID {
|
|
equiv.segment = append(equiv.segment, &SegmentSnapshot{
|
|
id: newSegmentID,
|
|
segment: segment.segment,
|
|
deleted: nil, // nil since merging handled deletions
|
|
})
|
|
break
|
|
}
|
|
}
|
|
|
|
err = s.persistSnapshotDirect(equiv)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
|
|
// start a write transaction
|
|
tx, err := s.rootBolt.Begin(true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// defer rollback on error
|
|
defer func() {
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
}
|
|
}()
|
|
|
|
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newSnapshotKey := segment.EncodeUvarintAscending(nil, snapshot.epoch)
|
|
snapshotBucket, err := snapshotsBucket.CreateBucketIfNotExists(newSnapshotKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// persist internal values
|
|
internalBucket, err := snapshotBucket.CreateBucketIfNotExists(boltInternalKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// TODO optimize writing these in order?
|
|
for k, v := range snapshot.internal {
|
|
err = internalBucket.Put([]byte(k), v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
var filenames []string
|
|
newSegmentPaths := make(map[uint64]string)
|
|
|
|
// first ensure that each segment in this snapshot has been persisted
|
|
for _, segmentSnapshot := range snapshot.segment {
|
|
snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id)
|
|
snapshotSegmentBucket, err := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
switch seg := segmentSnapshot.segment.(type) {
|
|
case *zap.SegmentBase:
|
|
// need to persist this to disk
|
|
filename := zapFileName(segmentSnapshot.id)
|
|
path := s.path + string(os.PathSeparator) + filename
|
|
err = zap.PersistSegmentBase(seg, path)
|
|
if err != nil {
|
|
return fmt.Errorf("error persisting segment: %v", err)
|
|
}
|
|
newSegmentPaths[segmentSnapshot.id] = path
|
|
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
filenames = append(filenames, filename)
|
|
case *zap.Segment:
|
|
path := seg.Path()
|
|
filename := strings.TrimPrefix(path, s.path+string(os.PathSeparator))
|
|
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
filenames = append(filenames, filename)
|
|
default:
|
|
return fmt.Errorf("unknown segment type: %T", seg)
|
|
}
|
|
// store current deleted bits
|
|
var roaringBuf bytes.Buffer
|
|
if segmentSnapshot.deleted != nil {
|
|
_, err = segmentSnapshot.deleted.WriteTo(&roaringBuf)
|
|
if err != nil {
|
|
return fmt.Errorf("error persisting roaring bytes: %v", err)
|
|
}
|
|
err = snapshotSegmentBucket.Put(boltDeletedKey, roaringBuf.Bytes())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// we need to swap in a new root only when we've persisted 1 or
|
|
// more segments -- whereby the new root would have 1-for-1
|
|
// replacements of in-memory segments with file-based segments
|
|
//
|
|
// other cases like updates to internal values only, and/or when
|
|
// there are only deletions, are already covered and persisted by
|
|
// the newly populated boltdb snapshotBucket above
|
|
if len(newSegmentPaths) > 0 {
|
|
// now try to open all the new snapshots
|
|
newSegments := make(map[uint64]segment.Segment)
|
|
defer func() {
|
|
for _, s := range newSegments {
|
|
if s != nil {
|
|
// cleanup segments that were opened but not
|
|
// swapped into the new root
|
|
_ = s.Close()
|
|
}
|
|
}
|
|
}()
|
|
for segmentID, path := range newSegmentPaths {
|
|
newSegments[segmentID], err = zap.Open(path)
|
|
if err != nil {
|
|
return fmt.Errorf("error opening new segment at %s, %v", path, err)
|
|
}
|
|
}
|
|
|
|
persist := &persistIntroduction{
|
|
persisted: newSegments,
|
|
applied: make(notificationChan),
|
|
}
|
|
|
|
select {
|
|
case <-s.closeCh:
|
|
err = ErrClosed
|
|
return err
|
|
case s.persists <- persist:
|
|
}
|
|
|
|
select {
|
|
case <-s.closeCh:
|
|
err = ErrClosed
|
|
return err
|
|
case <-persist.applied:
|
|
}
|
|
}
|
|
|
|
err = tx.Commit()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.rootBolt.Sync()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// allow files to become eligible for removal after commit, such
|
|
// as file segments from snapshots that came from the merger
|
|
s.rootLock.Lock()
|
|
for _, filename := range filenames {
|
|
delete(s.ineligibleForRemoval, filename)
|
|
}
|
|
s.rootLock.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func zapFileName(epoch uint64) string {
|
|
return fmt.Sprintf("%012x.zap", epoch)
|
|
}
|
|
|
|
// bolt snapshot code
|
|
|
|
var boltSnapshotsBucket = []byte{'s'}
|
|
var boltPathKey = []byte{'p'}
|
|
var boltDeletedKey = []byte{'d'}
|
|
var boltInternalKey = []byte{'i'}
|
|
|
|
func (s *Scorch) loadFromBolt() error {
|
|
return s.rootBolt.View(func(tx *bolt.Tx) error {
|
|
snapshots := tx.Bucket(boltSnapshotsBucket)
|
|
if snapshots == nil {
|
|
return nil
|
|
}
|
|
foundRoot := false
|
|
c := snapshots.Cursor()
|
|
for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
|
|
_, snapshotEpoch, err := segment.DecodeUvarintAscending(k)
|
|
if err != nil {
|
|
log.Printf("unable to parse segment epoch %x, continuing", k)
|
|
continue
|
|
}
|
|
if foundRoot {
|
|
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
|
|
continue
|
|
}
|
|
snapshot := snapshots.Bucket(k)
|
|
if snapshot == nil {
|
|
log.Printf("snapshot key, but bucket missing %x, continuing", k)
|
|
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
|
|
continue
|
|
}
|
|
indexSnapshot, err := s.loadSnapshot(snapshot)
|
|
if err != nil {
|
|
log.Printf("unable to load snapshot, %v, continuing", err)
|
|
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
|
|
continue
|
|
}
|
|
indexSnapshot.epoch = snapshotEpoch
|
|
// set the nextSegmentID
|
|
s.nextSegmentID, err = s.maxSegmentIDOnDisk()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.nextSegmentID++
|
|
s.nextSnapshotEpoch = snapshotEpoch + 1
|
|
s.rootLock.Lock()
|
|
if s.root != nil {
|
|
_ = s.root.DecRef()
|
|
}
|
|
s.root = indexSnapshot
|
|
s.rootLock.Unlock()
|
|
foundRoot = true
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// LoadSnapshot loads the segment with the specified epoch
|
|
// NOTE: this is currently ONLY intended to be used by the command-line tool
|
|
func (s *Scorch) LoadSnapshot(epoch uint64) (rv *IndexSnapshot, err error) {
|
|
err = s.rootBolt.View(func(tx *bolt.Tx) error {
|
|
snapshots := tx.Bucket(boltSnapshotsBucket)
|
|
if snapshots == nil {
|
|
return nil
|
|
}
|
|
snapshotKey := segment.EncodeUvarintAscending(nil, epoch)
|
|
snapshot := snapshots.Bucket(snapshotKey)
|
|
if snapshot == nil {
|
|
return nil
|
|
}
|
|
rv, err = s.loadSnapshot(snapshot)
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return rv, nil
|
|
}
|
|
|
|
func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
|
|
|
|
rv := &IndexSnapshot{
|
|
parent: s,
|
|
internal: make(map[string][]byte),
|
|
refs: 1,
|
|
}
|
|
var running uint64
|
|
c := snapshot.Cursor()
|
|
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
|
if k[0] == boltInternalKey[0] {
|
|
internalBucket := snapshot.Bucket(k)
|
|
err := internalBucket.ForEach(func(key []byte, val []byte) error {
|
|
copiedVal := append([]byte(nil), val...)
|
|
rv.internal[string(key)] = copiedVal
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
_ = rv.DecRef()
|
|
return nil, err
|
|
}
|
|
} else {
|
|
segmentBucket := snapshot.Bucket(k)
|
|
if segmentBucket == nil {
|
|
_ = rv.DecRef()
|
|
return nil, fmt.Errorf("segment key, but bucket missing % x", k)
|
|
}
|
|
segmentSnapshot, err := s.loadSegment(segmentBucket)
|
|
if err != nil {
|
|
_ = rv.DecRef()
|
|
return nil, fmt.Errorf("failed to load segment: %v", err)
|
|
}
|
|
_, segmentSnapshot.id, err = segment.DecodeUvarintAscending(k)
|
|
if err != nil {
|
|
_ = rv.DecRef()
|
|
return nil, fmt.Errorf("failed to decode segment id: %v", err)
|
|
}
|
|
rv.segment = append(rv.segment, segmentSnapshot)
|
|
rv.offsets = append(rv.offsets, running)
|
|
running += segmentSnapshot.segment.Count()
|
|
}
|
|
}
|
|
return rv, nil
|
|
}
|
|
|
|
func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, error) {
|
|
pathBytes := segmentBucket.Get(boltPathKey)
|
|
if pathBytes == nil {
|
|
return nil, fmt.Errorf("segment path missing")
|
|
}
|
|
segmentPath := s.path + string(os.PathSeparator) + string(pathBytes)
|
|
segment, err := zap.Open(segmentPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error opening bolt segment: %v", err)
|
|
}
|
|
|
|
rv := &SegmentSnapshot{
|
|
segment: segment,
|
|
cachedDocs: &cachedDocs{cache: nil},
|
|
}
|
|
deletedBytes := segmentBucket.Get(boltDeletedKey)
|
|
if deletedBytes != nil {
|
|
deletedBitmap := roaring.NewBitmap()
|
|
r := bytes.NewReader(deletedBytes)
|
|
_, err := deletedBitmap.ReadFrom(r)
|
|
if err != nil {
|
|
_ = segment.Close()
|
|
return nil, fmt.Errorf("error reading deleted bytes: %v", err)
|
|
}
|
|
rv.deleted = deletedBitmap
|
|
}
|
|
|
|
return rv, nil
|
|
}
|
|
|
|
type uint64Descending []uint64
|
|
|
|
func (p uint64Descending) Len() int { return len(p) }
|
|
func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] }
|
|
func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
|
|
|
func (s *Scorch) removeOldData() {
|
|
removed, err := s.removeOldBoltSnapshots()
|
|
if err != nil {
|
|
s.fireAsyncError(fmt.Errorf("got err removing old bolt snapshots: %v", err))
|
|
}
|
|
|
|
if removed > 0 {
|
|
err = s.removeOldZapFiles()
|
|
if err != nil {
|
|
s.fireAsyncError(fmt.Errorf("got err removing old zap files: %v", err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// NumSnapshotsToKeep represents how many recent, old snapshots to
|
|
// keep around per Scorch instance. Useful for apps that require
|
|
// rollback'ability.
|
|
var NumSnapshotsToKeep = 1
|
|
|
|
// Removes enough snapshots from the rootBolt so that the
|
|
// s.eligibleForRemoval stays under the NumSnapshotsToKeep policy.
|
|
func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) {
|
|
persistedEpochs, err := s.RootBoltSnapshotEpochs()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if len(persistedEpochs) <= NumSnapshotsToKeep {
|
|
// we need to keep everything
|
|
return 0, nil
|
|
}
|
|
|
|
// make a map of epochs to protect from deletion
|
|
protectedEpochs := make(map[uint64]struct{}, NumSnapshotsToKeep)
|
|
for _, epoch := range persistedEpochs[0:NumSnapshotsToKeep] {
|
|
protectedEpochs[epoch] = struct{}{}
|
|
}
|
|
|
|
var epochsToRemove []uint64
|
|
var newEligible []uint64
|
|
s.rootLock.Lock()
|
|
for _, epoch := range s.eligibleForRemoval {
|
|
if _, ok := protectedEpochs[epoch]; ok {
|
|
// protected
|
|
newEligible = append(newEligible, epoch)
|
|
} else {
|
|
epochsToRemove = append(epochsToRemove, epoch)
|
|
}
|
|
}
|
|
s.eligibleForRemoval = newEligible
|
|
s.rootLock.Unlock()
|
|
|
|
if len(epochsToRemove) <= 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
tx, err := s.rootBolt.Begin(true)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer func() {
|
|
if err == nil {
|
|
err = tx.Commit()
|
|
} else {
|
|
_ = tx.Rollback()
|
|
}
|
|
if err == nil {
|
|
err = s.rootBolt.Sync()
|
|
}
|
|
}()
|
|
|
|
snapshots := tx.Bucket(boltSnapshotsBucket)
|
|
if snapshots == nil {
|
|
return 0, nil
|
|
}
|
|
|
|
for _, epochToRemove := range epochsToRemove {
|
|
k := segment.EncodeUvarintAscending(nil, epochToRemove)
|
|
err = snapshots.DeleteBucket(k)
|
|
if err == bolt.ErrBucketNotFound {
|
|
err = nil
|
|
}
|
|
if err == nil {
|
|
numRemoved++
|
|
}
|
|
}
|
|
|
|
return numRemoved, err
|
|
}
|
|
|
|
func (s *Scorch) maxSegmentIDOnDisk() (uint64, error) {
|
|
currFileInfos, err := ioutil.ReadDir(s.path)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
var rv uint64
|
|
for _, finfo := range currFileInfos {
|
|
fname := finfo.Name()
|
|
if filepath.Ext(fname) == ".zap" {
|
|
prefix := strings.TrimSuffix(fname, ".zap")
|
|
id, err2 := strconv.ParseUint(prefix, 16, 64)
|
|
if err2 != nil {
|
|
return 0, err2
|
|
}
|
|
if id > rv {
|
|
rv = id
|
|
}
|
|
}
|
|
}
|
|
return rv, err
|
|
}
|
|
|
|
// Removes any *.zap files which aren't listed in the rootBolt.
|
|
func (s *Scorch) removeOldZapFiles() error {
|
|
liveFileNames, err := s.loadZapFileNames()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
currFileInfos, err := ioutil.ReadDir(s.path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.rootLock.RLock()
|
|
|
|
for _, finfo := range currFileInfos {
|
|
fname := finfo.Name()
|
|
if filepath.Ext(fname) == ".zap" {
|
|
if _, exists := liveFileNames[fname]; !exists && !s.ineligibleForRemoval[fname] {
|
|
err := os.Remove(s.path + string(os.PathSeparator) + fname)
|
|
if err != nil {
|
|
log.Printf("got err removing file: %s, err: %v", fname, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
s.rootLock.RUnlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Scorch) RootBoltSnapshotEpochs() ([]uint64, error) {
|
|
var rv []uint64
|
|
err := s.rootBolt.View(func(tx *bolt.Tx) error {
|
|
snapshots := tx.Bucket(boltSnapshotsBucket)
|
|
if snapshots == nil {
|
|
return nil
|
|
}
|
|
sc := snapshots.Cursor()
|
|
for sk, _ := sc.Last(); sk != nil; sk, _ = sc.Prev() {
|
|
_, snapshotEpoch, err := segment.DecodeUvarintAscending(sk)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
rv = append(rv, snapshotEpoch)
|
|
}
|
|
return nil
|
|
})
|
|
return rv, err
|
|
}
|
|
|
|
// Returns the *.zap file names that are listed in the rootBolt.
|
|
func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) {
|
|
rv := map[string]struct{}{}
|
|
err := s.rootBolt.View(func(tx *bolt.Tx) error {
|
|
snapshots := tx.Bucket(boltSnapshotsBucket)
|
|
if snapshots == nil {
|
|
return nil
|
|
}
|
|
sc := snapshots.Cursor()
|
|
for sk, _ := sc.First(); sk != nil; sk, _ = sc.Next() {
|
|
snapshot := snapshots.Bucket(sk)
|
|
if snapshot == nil {
|
|
continue
|
|
}
|
|
segc := snapshot.Cursor()
|
|
for segk, _ := segc.First(); segk != nil; segk, _ = segc.Next() {
|
|
if segk[0] == boltInternalKey[0] {
|
|
continue
|
|
}
|
|
segmentBucket := snapshot.Bucket(segk)
|
|
if segmentBucket == nil {
|
|
continue
|
|
}
|
|
pathBytes := segmentBucket.Get(boltPathKey)
|
|
if pathBytes == nil {
|
|
continue
|
|
}
|
|
pathString := string(pathBytes)
|
|
rv[string(pathString)] = struct{}{}
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
return rv, err
|
|
}
|