Steve Yen 2be5eb4427 scorch tracks zap files that can't be removed yet
A race & solution found by Marty Schoch... consider a case when the
merger might grab a nextSegmentID, like 4, but takes awhile to
complete.  Meanwhile, the persister grabs the nextSegmentID of 5, but
finishes its persistence work fast, and then loops to cleanup any old
files.  The simple approach of checking a "highest segment ID" of 5 is
wrong now, because the deleter now thinks that segment 4's zap file is
(incorrectly) ok to delete.

The solution in this commit is to track an ephemeral map of filenames
which are ineligibleForRemoval, because they're still being written
(by the merger) and haven't been fully incorporated into the rootBolt

The merger adds to that ineligibleForRemoval map as it starts a merged
zap file, the persister cleans up entries from that map when it
persists zap filenames into the rootBolt, and the deleter (part of the
persister's loop) consults the map before performing any actual zap
file deletions.
2017-12-14 10:49:33 -08:00

553 lines
14 KiB

// Copyright (c) 2017 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package scorch
import (
type notificationChan chan struct{}
func (s *Scorch) persisterLoop() {
var notify notificationChan
var lastPersistedEpoch uint64
for {
select {
case <-s.closeCh:
break OUTER
case notify = <-s.persisterNotifier:
// check to see if there is a new snapshot to persist
ourSnapshot := s.root
if ourSnapshot.epoch != lastPersistedEpoch {
// lets get started
err := s.persistSnapshot(ourSnapshot)
if err != nil {
log.Printf("got err persisting snapshot: %v", err)
_ = ourSnapshot.DecRef()
continue OUTER
lastPersistedEpoch = ourSnapshot.epoch
if notify != nil {
notify = nil
_ = ourSnapshot.DecRef()
// tell the introducer we're waiting for changes
// first make a notification chan
notifyUs := make(notificationChan)
// give it to the introducer
select {
case <-s.closeCh:
break OUTER
case s.introducerNotifier <- notifyUs:
// check again
ourSnapshot = s.root
if ourSnapshot.epoch != lastPersistedEpoch {
// lets get started
err := s.persistSnapshot(ourSnapshot)
if err != nil {
log.Printf("got err persisting snapshot: %v", err)
_ = ourSnapshot.DecRef()
continue OUTER
lastPersistedEpoch = ourSnapshot.epoch
if notify != nil {
notify = nil
_ = ourSnapshot.DecRef()
// now wait for it (but also detect close)
select {
case <-s.closeCh:
break OUTER
case <-notifyUs:
// woken up, next loop should pick up work
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
// start a write transaction
tx, err := s.rootBolt.Begin(true)
if err != nil {
return err
// defer fsync of the rootbolt
defer func() {
if err == nil {
err = s.rootBolt.Sync()
// defer commit/rollback transaction
defer func() {
if err == nil {
err = tx.Commit()
} else {
_ = tx.Rollback()
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
if err != nil {
return err
newSnapshotKey := segment.EncodeUvarintAscending(nil, snapshot.epoch)
snapshotBucket, err := snapshotsBucket.CreateBucketIfNotExists(newSnapshotKey)
if err != nil {
return err
// persist internal values
internalBucket, err := snapshotBucket.CreateBucketIfNotExists(boltInternalKey)
if err != nil {
return err
// TODO optimize writing these in order?
for k, v := range snapshot.internal {
err = internalBucket.Put([]byte(k), v)
if err != nil {
return err
var filenames []string
newSegmentPaths := make(map[uint64]string)
// first ensure that each segment in this snapshot has been persisted
for i, segmentSnapshot := range snapshot.segment {
snapshotSegmentKey := segment.EncodeUvarintAscending(nil, uint64(i))
snapshotSegmentBucket, err2 := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey)
if err2 != nil {
return err2
switch seg := segmentSnapshot.segment.(type) {
case *mem.Segment:
// need to persist this to disk
filename := zapFileName(segmentSnapshot.id)
path := s.path + string(os.PathSeparator) + filename
err2 := zap.PersistSegment(seg, path, 1024)
if err2 != nil {
return fmt.Errorf("error persisting segment: %v", err2)
newSegmentPaths[segmentSnapshot.id] = path
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
if err != nil {
return err
filenames = append(filenames, filename)
case *zap.Segment:
path := seg.Path()
filename := strings.TrimPrefix(path, s.path+string(os.PathSeparator))
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
if err != nil {
return err
filenames = append(filenames, filename)
return fmt.Errorf("unknown segment type: %T", seg)
// store current deleted bits
var roaringBuf bytes.Buffer
if segmentSnapshot.deleted != nil {
_, err = segmentSnapshot.deleted.WriteTo(&roaringBuf)
if err != nil {
return fmt.Errorf("error persisting roaring bytes: %v", err)
err = snapshotSegmentBucket.Put(boltDeletedKey, roaringBuf.Bytes())
if err != nil {
return err
// now try to open all the new snapshots
newSegments := make(map[uint64]segment.Segment)
for segmentID, path := range newSegmentPaths {
newSegments[segmentID], err = zap.Open(path)
if err != nil {
for _, s := range newSegments {
if s != nil {
_ = s.Close() // cleanup segments that were successfully opened
return fmt.Errorf("error opening new segment at %s, %v", path, err)
// get write lock and update the current snapshot with disk-based versions
var notifications []chan error
newIndexSnapshot := &IndexSnapshot{
parent: s,
epoch: s.root.epoch,
segment: make([]*SegmentSnapshot, len(s.root.segment)),
offsets: make([]uint64, len(s.root.offsets)),
internal: make(map[string][]byte, len(s.root.internal)),
refs: 1,
for i, segmentSnapshot := range s.root.segment {
// see if this segment has been replaced
if replacement, ok := newSegments[segmentSnapshot.id]; ok {
newSegmentSnapshot := &SegmentSnapshot{
segment: replacement,
deleted: segmentSnapshot.deleted,
id: segmentSnapshot.id,
cachedDocs: segmentSnapshot.cachedDocs,
newIndexSnapshot.segment[i] = newSegmentSnapshot
// add the old segment snapshots notifications to the list
for _, notification := range segmentSnapshot.notify {
notifications = append(notifications, notification)
} else {
newIndexSnapshot.segment[i] = s.root.segment[i]
newIndexSnapshot.offsets[i] = s.root.offsets[i]
for k, v := range s.root.internal {
newIndexSnapshot.internal[k] = v
for _, filename := range filenames {
delete(s.ineligibleForRemoval, filename)
rootPrev := s.root
s.root = newIndexSnapshot
if rootPrev != nil {
_ = rootPrev.DecRef()
// now that we've given up the lock, notify everyone that we've safely
// persisted their data
for _, notification := range notifications {
return nil
func zapFileName(epoch uint64) string {
return fmt.Sprintf("%012x.zap", epoch)
// bolt snapshot code
var boltSnapshotsBucket = []byte{'s'}
var boltPathKey = []byte{'p'}
var boltDeletedKey = []byte{'d'}
var boltInternalKey = []byte{'i'}
func (s *Scorch) loadFromBolt() error {
return s.rootBolt.View(func(tx *bolt.Tx) error {
snapshots := tx.Bucket(boltSnapshotsBucket)
if snapshots == nil {
return nil
foundRoot := false
c := snapshots.Cursor()
for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
_, snapshotEpoch, err := segment.DecodeUvarintAscending(k)
if err != nil {
log.Printf("unable to parse segment epoch %x, continuing", k)
if foundRoot {
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
snapshot := snapshots.Bucket(k)
if snapshot == nil {
log.Printf("snapshot key, but bucket missing %x, continuing", k)
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
indexSnapshot, err := s.loadSnapshot(snapshot)
if err != nil {
log.Printf("unable to load snapshot, %v, continuing", err)
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
indexSnapshot.epoch = snapshotEpoch
// set the nextSegmentID
for _, segment := range indexSnapshot.segment {
if segment.id > s.nextSegmentID {
s.nextSegmentID = segment.id
s.nextSnapshotEpoch = snapshotEpoch + 1
if s.root != nil {
_ = s.root.DecRef()
s.root = indexSnapshot
foundRoot = true
return nil
func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
rv := &IndexSnapshot{
parent: s,
internal: make(map[string][]byte),
refs: 1,
var running uint64
c := snapshot.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
if k[0] == boltInternalKey[0] {
internalBucket := snapshot.Bucket(k)
err := internalBucket.ForEach(func(key []byte, val []byte) error {
copiedVal := append([]byte(nil), val...)
rv.internal[string(key)] = copiedVal
return nil
if err != nil {
_ = rv.DecRef()
return nil, err
} else {
segmentBucket := snapshot.Bucket(k)
if segmentBucket == nil {
_ = rv.DecRef()
return nil, fmt.Errorf("segment key, but bucket missing % x", k)
segmentSnapshot, err := s.loadSegment(segmentBucket)
if err != nil {
_ = rv.DecRef()
return nil, fmt.Errorf("failed to load segment: %v", err)
_, segmentSnapshot.id, err = segment.DecodeUvarintAscending(k)
if err != nil {
_ = rv.DecRef()
return nil, fmt.Errorf("failed to decode segment id: %v", err)
rv.segment = append(rv.segment, segmentSnapshot)
rv.offsets = append(rv.offsets, running)
running += segmentSnapshot.segment.Count()
return rv, nil
func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, error) {
pathBytes := segmentBucket.Get(boltPathKey)
if pathBytes == nil {
return nil, fmt.Errorf("segment path missing")
segmentPath := s.path + string(os.PathSeparator) + string(pathBytes)
segment, err := zap.Open(segmentPath)
if err != nil {
return nil, fmt.Errorf("error opening bolt segment: %v", err)
rv := &SegmentSnapshot{
segment: segment,
cachedDocs: &cachedDocs{cache: nil},
deletedBytes := segmentBucket.Get(boltDeletedKey)
if deletedBytes != nil {
deletedBitmap := roaring.NewBitmap()
r := bytes.NewReader(deletedBytes)
_, err := deletedBitmap.ReadFrom(r)
if err != nil {
_ = segment.Close()
return nil, fmt.Errorf("error reading deleted bytes: %v", err)
rv.deleted = deletedBitmap
return rv, nil
type uint64Descending []uint64
func (p uint64Descending) Len() int { return len(p) }
func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] }
func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (s *Scorch) removeOldData() {
removed, err := s.removeOldBoltSnapshots()
if err != nil {
log.Printf("got err removing old bolt snapshots: %v", err)
if removed > 0 {
err = s.removeOldZapFiles()
if err != nil {
log.Printf("got err removing old zap files: %v", err)
// NumSnapshotsToKeep represents how many recent, old snapshots to
// keep around per Scorch instance. Useful for apps that require
// rollback'ability.
var NumSnapshotsToKeep int
// Removes enough snapshots from the rootBolt so that the
// s.eligibleForRemoval stays under the NumSnapshotsToKeep policy.
func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) {
var epochsToRemove []uint64
if len(s.eligibleForRemoval) > NumSnapshotsToKeep {
epochsToRemove = append([]uint64(nil), s.eligibleForRemoval[NumSnapshotsToKeep:]...) // Copy.
s.eligibleForRemoval = s.eligibleForRemoval[0:NumSnapshotsToKeep]
if len(epochsToRemove) <= 0 {
return 0, nil
tx, err := s.rootBolt.Begin(true)
if err != nil {
return 0, err
defer func() {
if err == nil {
err = s.rootBolt.Sync()
defer func() {
if err == nil {
err = tx.Commit()
} else {
_ = tx.Rollback()
for _, epochToRemove := range epochsToRemove {
k := segment.EncodeUvarintAscending(nil, epochToRemove)
err = tx.DeleteBucket(k)
if err == bolt.ErrBucketNotFound {
err = nil
if err == nil {
return numRemoved, err
// Removes any *.zap files which aren't listed in the rootBolt.
func (s *Scorch) removeOldZapFiles() error {
liveFileNames, err := s.loadZapFileNames()
if err != nil {
return err
currFileInfos, err := ioutil.ReadDir(s.path)
if err != nil {
return err
for _, finfo := range currFileInfos {
fname := finfo.Name()
if filepath.Ext(fname) == ".zap" {
if _, exists := liveFileNames[fname]; !exists && !s.ineligibleForRemoval[fname] {
err := os.Remove(s.path + string(os.PathSeparator) + fname)
if err != nil {
log.Printf("got err removing file: %s, err: %v", fname, err)
return nil
// Returns the *.zap file names that are listed in the rootBolt.
func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) {
rv := map[string]struct{}{}
err := s.rootBolt.View(func(tx *bolt.Tx) error {
snapshots := tx.Bucket(boltSnapshotsBucket)
if snapshots == nil {
return nil
sc := snapshots.Cursor()
for sk, _ := sc.First(); sk != nil; sk, _ = sc.Next() {
snapshot := snapshots.Bucket(sk)
if snapshot == nil {
segc := snapshot.Cursor()
for segk, _ := segc.First(); segk != nil; segk, _ = segc.Next() {
if segk[0] == boltInternalKey[0] {
segmentBucket := snapshot.Bucket(segk)
if segmentBucket == nil {
pathBytes := segmentBucket.Get(boltPathKey)
if pathBytes == nil {
pathString := string(pathBytes)
rv[string(pathString)] = struct{}{}
return nil
return rv, err