2017-12-07 00:33:47 +01:00
|
|
|
// Copyright (c) 2017 Couchbase, Inc.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
package scorch
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"fmt"
|
2017-12-14 01:58:36 +01:00
|
|
|
"io/ioutil"
|
2017-12-07 00:33:47 +01:00
|
|
|
"log"
|
|
|
|
"os"
|
2017-12-14 01:58:36 +01:00
|
|
|
"path/filepath"
|
2017-12-13 23:54:58 +01:00
|
|
|
"sort"
|
2017-12-16 01:26:23 +01:00
|
|
|
"strconv"
|
2017-12-07 00:33:47 +01:00
|
|
|
"strings"
|
|
|
|
|
|
|
|
"github.com/RoaringBitmap/roaring"
|
|
|
|
"github.com/blevesearch/bleve/index/scorch/segment"
|
|
|
|
"github.com/blevesearch/bleve/index/scorch/segment/mem"
|
2017-12-09 20:28:50 +01:00
|
|
|
"github.com/blevesearch/bleve/index/scorch/segment/zap"
|
2017-12-07 00:33:47 +01:00
|
|
|
"github.com/boltdb/bolt"
|
|
|
|
)
|
|
|
|
|
|
|
|
type notificationChan chan struct{}
|
|
|
|
|
|
|
|
func (s *Scorch) persisterLoop() {
|
2017-12-17 17:23:00 +01:00
|
|
|
defer s.asyncTasks.Done()
|
|
|
|
|
|
|
|
var notifyChs []notificationChan
|
2017-12-07 00:33:47 +01:00
|
|
|
var lastPersistedEpoch uint64
|
|
|
|
OUTER:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-s.closeCh:
|
|
|
|
break OUTER
|
2017-12-17 17:23:00 +01:00
|
|
|
case notifyCh := <-s.persisterNotifier:
|
|
|
|
notifyChs = append(notifyChs, notifyCh)
|
2017-12-07 00:33:47 +01:00
|
|
|
default:
|
2017-12-17 17:23:00 +01:00
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
|
2017-12-17 17:23:00 +01:00
|
|
|
var ourSnapshot *IndexSnapshot
|
|
|
|
var ourPersisted []chan error
|
2017-12-07 00:33:47 +01:00
|
|
|
|
2017-12-17 17:23:00 +01:00
|
|
|
// check to see if there is a new snapshot to persist
|
|
|
|
s.rootLock.Lock()
|
|
|
|
if s.root != nil && s.root.epoch > lastPersistedEpoch {
|
2017-12-07 00:33:47 +01:00
|
|
|
ourSnapshot = s.root
|
2017-12-13 22:10:44 +01:00
|
|
|
ourSnapshot.AddRef()
|
2017-12-17 17:23:00 +01:00
|
|
|
ourPersisted = s.rootPersisted
|
|
|
|
s.rootPersisted = nil
|
|
|
|
}
|
|
|
|
s.rootLock.Unlock()
|
2017-12-07 00:33:47 +01:00
|
|
|
|
2017-12-17 17:23:00 +01:00
|
|
|
if ourSnapshot != nil {
|
|
|
|
err := s.persistSnapshot(ourSnapshot)
|
|
|
|
for _, ch := range ourPersisted {
|
2017-12-07 00:33:47 +01:00
|
|
|
if err != nil {
|
2017-12-17 17:23:00 +01:00
|
|
|
ch <- err
|
2017-12-13 19:41:03 +01:00
|
|
|
}
|
2017-12-17 17:23:00 +01:00
|
|
|
close(ch)
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("got err persisting snapshot: %v", err)
|
|
|
|
_ = ourSnapshot.DecRef()
|
|
|
|
continue OUTER
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
2017-12-17 17:23:00 +01:00
|
|
|
lastPersistedEpoch = ourSnapshot.epoch
|
|
|
|
for _, notifyCh := range notifyChs {
|
|
|
|
close(notifyCh)
|
|
|
|
}
|
|
|
|
notifyChs = nil
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = ourSnapshot.DecRef()
|
2017-12-07 00:33:47 +01:00
|
|
|
|
2017-12-17 17:23:00 +01:00
|
|
|
changed := false
|
|
|
|
s.rootLock.RLock()
|
|
|
|
if s.root != nil && s.root.epoch != lastPersistedEpoch {
|
|
|
|
changed = true
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
2017-12-17 17:23:00 +01:00
|
|
|
s.rootLock.RUnlock()
|
|
|
|
if changed {
|
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// tell the introducer we're waiting for changes
|
|
|
|
w := &epochWatcher{
|
|
|
|
epoch: lastPersistedEpoch,
|
|
|
|
notifyCh: make(notificationChan, 1),
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-s.closeCh:
|
|
|
|
break OUTER
|
|
|
|
case s.introducerNotifier <- w:
|
|
|
|
}
|
|
|
|
|
|
|
|
s.removeOldData() // might as well cleanup while waiting
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-s.closeCh:
|
|
|
|
break OUTER
|
|
|
|
case <-w.notifyCh:
|
|
|
|
// woken up, next loop should pick up work
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
|
|
|
|
// start a write transaction
|
|
|
|
tx, err := s.rootBolt.Begin(true)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-12-13 19:55:06 +01:00
|
|
|
// defer fsync of the rootbolt
|
|
|
|
defer func() {
|
|
|
|
if err == nil {
|
|
|
|
err = s.rootBolt.Sync()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
// defer commit/rollback transaction
|
2017-12-07 00:33:47 +01:00
|
|
|
defer func() {
|
|
|
|
if err == nil {
|
|
|
|
err = tx.Commit()
|
|
|
|
} else {
|
|
|
|
_ = tx.Rollback()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
newSnapshotKey := segment.EncodeUvarintAscending(nil, snapshot.epoch)
|
|
|
|
snapshotBucket, err := snapshotsBucket.CreateBucketIfNotExists(newSnapshotKey)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// persist internal values
|
|
|
|
internalBucket, err := snapshotBucket.CreateBucketIfNotExists(boltInternalKey)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// TODO optimize writing these in order?
|
|
|
|
for k, v := range snapshot.internal {
|
2017-12-07 00:36:14 +01:00
|
|
|
err = internalBucket.Put([]byte(k), v)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
var filenames []string
|
2017-12-07 00:33:47 +01:00
|
|
|
newSegmentPaths := make(map[uint64]string)
|
|
|
|
|
|
|
|
// first ensure that each segment in this snapshot has been persisted
|
|
|
|
for i, segmentSnapshot := range snapshot.segment {
|
|
|
|
snapshotSegmentKey := segment.EncodeUvarintAscending(nil, uint64(i))
|
|
|
|
snapshotSegmentBucket, err2 := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey)
|
|
|
|
if err2 != nil {
|
|
|
|
return err2
|
|
|
|
}
|
|
|
|
switch seg := segmentSnapshot.segment.(type) {
|
|
|
|
case *mem.Segment:
|
|
|
|
// need to persist this to disk
|
2017-12-14 19:49:33 +01:00
|
|
|
filename := zapFileName(segmentSnapshot.id)
|
2017-12-07 00:33:47 +01:00
|
|
|
path := s.path + string(os.PathSeparator) + filename
|
2017-12-09 20:28:50 +01:00
|
|
|
err2 := zap.PersistSegment(seg, path, 1024)
|
2017-12-07 00:33:47 +01:00
|
|
|
if err2 != nil {
|
|
|
|
return fmt.Errorf("error persisting segment: %v", err2)
|
|
|
|
}
|
|
|
|
newSegmentPaths[segmentSnapshot.id] = path
|
2017-12-07 00:36:14 +01:00
|
|
|
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-12-14 19:49:33 +01:00
|
|
|
filenames = append(filenames, filename)
|
2017-12-09 20:28:50 +01:00
|
|
|
case *zap.Segment:
|
2017-12-07 00:33:47 +01:00
|
|
|
path := seg.Path()
|
|
|
|
filename := strings.TrimPrefix(path, s.path+string(os.PathSeparator))
|
2017-12-07 00:36:14 +01:00
|
|
|
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-12-14 19:49:33 +01:00
|
|
|
filenames = append(filenames, filename)
|
2017-12-07 00:33:47 +01:00
|
|
|
default:
|
|
|
|
return fmt.Errorf("unknown segment type: %T", seg)
|
|
|
|
}
|
|
|
|
// store current deleted bits
|
|
|
|
var roaringBuf bytes.Buffer
|
|
|
|
if segmentSnapshot.deleted != nil {
|
|
|
|
_, err = segmentSnapshot.deleted.WriteTo(&roaringBuf)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error persisting roaring bytes: %v", err)
|
|
|
|
}
|
2017-12-07 00:36:14 +01:00
|
|
|
err = snapshotSegmentBucket.Put(boltDeletedKey, roaringBuf.Bytes())
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// now try to open all the new snapshots
|
|
|
|
newSegments := make(map[uint64]segment.Segment)
|
|
|
|
for segmentID, path := range newSegmentPaths {
|
2017-12-09 20:28:50 +01:00
|
|
|
newSegments[segmentID], err = zap.Open(path)
|
2017-12-07 00:33:47 +01:00
|
|
|
if err != nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
for _, s := range newSegments {
|
2017-12-14 16:29:19 +01:00
|
|
|
if s != nil {
|
|
|
|
_ = s.Close() // cleanup segments that were successfully opened
|
|
|
|
}
|
2017-12-13 22:10:44 +01:00
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
return fmt.Errorf("error opening new segment at %s, %v", path, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
s.rootLock.Lock()
|
|
|
|
newIndexSnapshot := &IndexSnapshot{
|
2017-12-13 23:54:58 +01:00
|
|
|
parent: s,
|
2017-12-07 00:33:47 +01:00
|
|
|
epoch: s.root.epoch,
|
|
|
|
segment: make([]*SegmentSnapshot, len(s.root.segment)),
|
|
|
|
offsets: make([]uint64, len(s.root.offsets)),
|
|
|
|
internal: make(map[string][]byte, len(s.root.internal)),
|
2017-12-13 22:10:44 +01:00
|
|
|
refs: 1,
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
for i, segmentSnapshot := range s.root.segment {
|
|
|
|
// see if this segment has been replaced
|
|
|
|
if replacement, ok := newSegments[segmentSnapshot.id]; ok {
|
|
|
|
newSegmentSnapshot := &SegmentSnapshot{
|
2017-12-17 17:23:00 +01:00
|
|
|
id: segmentSnapshot.id,
|
2017-12-14 16:27:39 +01:00
|
|
|
segment: replacement,
|
|
|
|
deleted: segmentSnapshot.deleted,
|
|
|
|
cachedDocs: segmentSnapshot.cachedDocs,
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
newIndexSnapshot.segment[i] = newSegmentSnapshot
|
|
|
|
} else {
|
|
|
|
newIndexSnapshot.segment[i] = s.root.segment[i]
|
2017-12-13 22:10:44 +01:00
|
|
|
newIndexSnapshot.segment[i].segment.AddRef()
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
newIndexSnapshot.offsets[i] = s.root.offsets[i]
|
|
|
|
}
|
|
|
|
for k, v := range s.root.internal {
|
|
|
|
newIndexSnapshot.internal[k] = v
|
|
|
|
}
|
2017-12-14 19:49:33 +01:00
|
|
|
for _, filename := range filenames {
|
|
|
|
delete(s.ineligibleForRemoval, filename)
|
|
|
|
}
|
2017-12-13 22:10:44 +01:00
|
|
|
rootPrev := s.root
|
2017-12-07 00:33:47 +01:00
|
|
|
s.root = newIndexSnapshot
|
|
|
|
s.rootLock.Unlock()
|
|
|
|
|
2017-12-13 22:10:44 +01:00
|
|
|
if rootPrev != nil {
|
|
|
|
_ = rootPrev.DecRef()
|
|
|
|
}
|
|
|
|
|
2017-12-07 00:33:47 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
func zapFileName(epoch uint64) string {
|
|
|
|
return fmt.Sprintf("%012x.zap", epoch)
|
|
|
|
}
|
|
|
|
|
2017-12-07 00:33:47 +01:00
|
|
|
// bolt snapshot code
|
|
|
|
|
|
|
|
var boltSnapshotsBucket = []byte{'s'}
|
|
|
|
var boltPathKey = []byte{'p'}
|
|
|
|
var boltDeletedKey = []byte{'d'}
|
|
|
|
var boltInternalKey = []byte{'i'}
|
|
|
|
|
|
|
|
func (s *Scorch) loadFromBolt() error {
|
|
|
|
return s.rootBolt.View(func(tx *bolt.Tx) error {
|
|
|
|
snapshots := tx.Bucket(boltSnapshotsBucket)
|
|
|
|
if snapshots == nil {
|
|
|
|
return nil
|
|
|
|
}
|
2017-12-13 23:54:58 +01:00
|
|
|
foundRoot := false
|
2017-12-07 00:33:47 +01:00
|
|
|
c := snapshots.Cursor()
|
|
|
|
for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
|
|
|
|
_, snapshotEpoch, err := segment.DecodeUvarintAscending(k)
|
|
|
|
if err != nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
log.Printf("unable to parse segment epoch %x, continuing", k)
|
2017-12-07 00:33:47 +01:00
|
|
|
continue
|
|
|
|
}
|
2017-12-13 23:54:58 +01:00
|
|
|
if foundRoot {
|
|
|
|
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
|
|
|
|
continue
|
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
snapshot := snapshots.Bucket(k)
|
|
|
|
if snapshot == nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
log.Printf("snapshot key, but bucket missing %x, continuing", k)
|
2017-12-13 23:54:58 +01:00
|
|
|
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
|
2017-12-07 00:33:47 +01:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
indexSnapshot, err := s.loadSnapshot(snapshot)
|
|
|
|
if err != nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
log.Printf("unable to load snapshot, %v, continuing", err)
|
2017-12-13 23:54:58 +01:00
|
|
|
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
|
2017-12-07 00:33:47 +01:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
indexSnapshot.epoch = snapshotEpoch
|
|
|
|
// set the nextSegmentID
|
2017-12-16 01:26:23 +01:00
|
|
|
s.nextSegmentID, err = s.maxSegmentIDOnDisk()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
s.nextSegmentID++
|
|
|
|
s.nextSnapshotEpoch = snapshotEpoch + 1
|
2017-12-14 23:40:33 +01:00
|
|
|
s.rootLock.Lock()
|
2017-12-13 23:54:58 +01:00
|
|
|
if s.root != nil {
|
|
|
|
_ = s.root.DecRef()
|
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
s.root = indexSnapshot
|
2017-12-14 23:40:33 +01:00
|
|
|
s.rootLock.Unlock()
|
2017-12-13 23:54:58 +01:00
|
|
|
foundRoot = true
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
|
|
|
|
|
|
|
|
rv := &IndexSnapshot{
|
2017-12-13 23:54:58 +01:00
|
|
|
parent: s,
|
2017-12-07 00:33:47 +01:00
|
|
|
internal: make(map[string][]byte),
|
2017-12-13 22:10:44 +01:00
|
|
|
refs: 1,
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
var running uint64
|
|
|
|
c := snapshot.Cursor()
|
|
|
|
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
|
|
|
if k[0] == boltInternalKey[0] {
|
|
|
|
internalBucket := snapshot.Bucket(k)
|
2017-12-07 00:36:14 +01:00
|
|
|
err := internalBucket.ForEach(func(key []byte, val []byte) error {
|
2017-12-07 00:33:47 +01:00
|
|
|
copiedVal := append([]byte(nil), val...)
|
|
|
|
rv.internal[string(key)] = copiedVal
|
|
|
|
return nil
|
|
|
|
})
|
2017-12-07 00:36:14 +01:00
|
|
|
if err != nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = rv.DecRef()
|
2017-12-07 00:36:14 +01:00
|
|
|
return nil, err
|
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
} else {
|
|
|
|
segmentBucket := snapshot.Bucket(k)
|
|
|
|
if segmentBucket == nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = rv.DecRef()
|
2017-12-07 00:33:47 +01:00
|
|
|
return nil, fmt.Errorf("segment key, but bucket missing % x", k)
|
|
|
|
}
|
|
|
|
segmentSnapshot, err := s.loadSegment(segmentBucket)
|
|
|
|
if err != nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = rv.DecRef()
|
2017-12-07 00:33:47 +01:00
|
|
|
return nil, fmt.Errorf("failed to load segment: %v", err)
|
|
|
|
}
|
|
|
|
_, segmentSnapshot.id, err = segment.DecodeUvarintAscending(k)
|
|
|
|
if err != nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = rv.DecRef()
|
2017-12-07 00:33:47 +01:00
|
|
|
return nil, fmt.Errorf("failed to decode segment id: %v", err)
|
|
|
|
}
|
|
|
|
rv.segment = append(rv.segment, segmentSnapshot)
|
|
|
|
rv.offsets = append(rv.offsets, running)
|
|
|
|
running += segmentSnapshot.segment.Count()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return rv, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, error) {
|
|
|
|
pathBytes := segmentBucket.Get(boltPathKey)
|
|
|
|
if pathBytes == nil {
|
|
|
|
return nil, fmt.Errorf("segment path missing")
|
|
|
|
}
|
|
|
|
segmentPath := s.path + string(os.PathSeparator) + string(pathBytes)
|
2017-12-09 20:28:50 +01:00
|
|
|
segment, err := zap.Open(segmentPath)
|
2017-12-07 00:33:47 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("error opening bolt segment: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
rv := &SegmentSnapshot{
|
2017-12-14 16:27:39 +01:00
|
|
|
segment: segment,
|
|
|
|
cachedDocs: &cachedDocs{cache: nil},
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
deletedBytes := segmentBucket.Get(boltDeletedKey)
|
|
|
|
if deletedBytes != nil {
|
|
|
|
deletedBitmap := roaring.NewBitmap()
|
|
|
|
r := bytes.NewReader(deletedBytes)
|
|
|
|
_, err := deletedBitmap.ReadFrom(r)
|
|
|
|
if err != nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = segment.Close()
|
2017-12-07 00:33:47 +01:00
|
|
|
return nil, fmt.Errorf("error reading deleted bytes: %v", err)
|
|
|
|
}
|
|
|
|
rv.deleted = deletedBitmap
|
|
|
|
}
|
|
|
|
|
|
|
|
return rv, nil
|
|
|
|
}
|
2017-12-13 23:54:58 +01:00
|
|
|
|
|
|
|
type uint64Descending []uint64
|
|
|
|
|
|
|
|
func (p uint64Descending) Len() int { return len(p) }
|
|
|
|
func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] }
|
|
|
|
func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
func (s *Scorch) removeOldData() {
|
2017-12-14 01:58:36 +01:00
|
|
|
removed, err := s.removeOldBoltSnapshots()
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("got err removing old bolt snapshots: %v", err)
|
|
|
|
}
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
if removed > 0 {
|
2017-12-14 01:58:36 +01:00
|
|
|
err = s.removeOldZapFiles()
|
|
|
|
if err != nil {
|
2017-12-14 19:49:33 +01:00
|
|
|
log.Printf("got err removing old zap files: %v", err)
|
2017-12-14 01:58:36 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-12-13 23:54:58 +01:00
|
|
|
// NumSnapshotsToKeep represents how many recent, old snapshots to
|
|
|
|
// keep around per Scorch instance. Useful for apps that require
|
|
|
|
// rollback'ability.
|
|
|
|
var NumSnapshotsToKeep int
|
|
|
|
|
|
|
|
// Removes enough snapshots from the rootBolt so that the
|
|
|
|
// s.eligibleForRemoval stays under the NumSnapshotsToKeep policy.
|
2017-12-14 01:58:36 +01:00
|
|
|
func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) {
|
2017-12-13 23:54:58 +01:00
|
|
|
var epochsToRemove []uint64
|
|
|
|
|
|
|
|
s.rootLock.Lock()
|
|
|
|
if len(s.eligibleForRemoval) > NumSnapshotsToKeep {
|
|
|
|
sort.Sort(uint64Descending(s.eligibleForRemoval))
|
|
|
|
epochsToRemove = append([]uint64(nil), s.eligibleForRemoval[NumSnapshotsToKeep:]...) // Copy.
|
|
|
|
s.eligibleForRemoval = s.eligibleForRemoval[0:NumSnapshotsToKeep]
|
|
|
|
}
|
|
|
|
s.rootLock.Unlock()
|
|
|
|
|
|
|
|
if len(epochsToRemove) <= 0 {
|
2017-12-14 01:58:36 +01:00
|
|
|
return 0, nil
|
2017-12-13 23:54:58 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
tx, err := s.rootBolt.Begin(true)
|
|
|
|
if err != nil {
|
2017-12-14 01:58:36 +01:00
|
|
|
return 0, err
|
2017-12-13 23:54:58 +01:00
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
if err == nil {
|
|
|
|
err = s.rootBolt.Sync()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
defer func() {
|
|
|
|
if err == nil {
|
|
|
|
err = tx.Commit()
|
|
|
|
} else {
|
|
|
|
_ = tx.Rollback()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
for _, epochToRemove := range epochsToRemove {
|
|
|
|
k := segment.EncodeUvarintAscending(nil, epochToRemove)
|
|
|
|
err = tx.DeleteBucket(k)
|
|
|
|
if err == bolt.ErrBucketNotFound {
|
|
|
|
err = nil
|
|
|
|
}
|
2017-12-14 01:58:36 +01:00
|
|
|
if err == nil {
|
|
|
|
numRemoved++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return numRemoved, err
|
|
|
|
}
|
|
|
|
|
2017-12-16 01:26:23 +01:00
|
|
|
func (s *Scorch) maxSegmentIDOnDisk() (uint64, error) {
|
|
|
|
currFileInfos, err := ioutil.ReadDir(s.path)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var rv uint64
|
|
|
|
for _, finfo := range currFileInfos {
|
|
|
|
fname := finfo.Name()
|
|
|
|
if filepath.Ext(fname) == ".zap" {
|
|
|
|
prefix := strings.TrimSuffix(fname, ".zap")
|
|
|
|
id, err2 := strconv.ParseUint(prefix, 16, 64)
|
|
|
|
if err2 != nil {
|
|
|
|
return 0, err2
|
|
|
|
}
|
|
|
|
if id > rv {
|
|
|
|
rv = id
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return rv, err
|
|
|
|
}
|
|
|
|
|
2017-12-14 01:58:36 +01:00
|
|
|
// Removes any *.zap files which aren't listed in the rootBolt.
|
|
|
|
func (s *Scorch) removeOldZapFiles() error {
|
2017-12-14 19:49:33 +01:00
|
|
|
liveFileNames, err := s.loadZapFileNames()
|
2017-12-14 01:58:36 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
currFileInfos, err := ioutil.ReadDir(s.path)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
s.rootLock.RLock()
|
|
|
|
|
2017-12-14 01:58:36 +01:00
|
|
|
for _, finfo := range currFileInfos {
|
|
|
|
fname := finfo.Name()
|
|
|
|
if filepath.Ext(fname) == ".zap" {
|
2017-12-14 19:49:33 +01:00
|
|
|
if _, exists := liveFileNames[fname]; !exists && !s.ineligibleForRemoval[fname] {
|
2017-12-14 01:58:36 +01:00
|
|
|
err := os.Remove(s.path + string(os.PathSeparator) + fname)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("got err removing file: %s, err: %v", fname, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2017-12-13 23:54:58 +01:00
|
|
|
}
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
s.rootLock.RUnlock()
|
|
|
|
|
2017-12-14 01:58:36 +01:00
|
|
|
return nil
|
2017-12-13 23:54:58 +01:00
|
|
|
}
|
|
|
|
|
2017-12-14 01:58:36 +01:00
|
|
|
// Returns the *.zap file names that are listed in the rootBolt.
|
2017-12-14 19:49:33 +01:00
|
|
|
func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) {
|
2017-12-14 01:58:36 +01:00
|
|
|
rv := map[string]struct{}{}
|
|
|
|
err := s.rootBolt.View(func(tx *bolt.Tx) error {
|
|
|
|
snapshots := tx.Bucket(boltSnapshotsBucket)
|
|
|
|
if snapshots == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
sc := snapshots.Cursor()
|
|
|
|
for sk, _ := sc.First(); sk != nil; sk, _ = sc.Next() {
|
|
|
|
snapshot := snapshots.Bucket(sk)
|
|
|
|
if snapshot == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
segc := snapshot.Cursor()
|
|
|
|
for segk, _ := segc.First(); segk != nil; segk, _ = segc.Next() {
|
|
|
|
if segk[0] == boltInternalKey[0] {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
segmentBucket := snapshot.Bucket(segk)
|
|
|
|
if segmentBucket == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
pathBytes := segmentBucket.Get(boltPathKey)
|
|
|
|
if pathBytes == nil {
|
|
|
|
continue
|
|
|
|
}
|
2017-12-14 16:27:39 +01:00
|
|
|
pathString := string(pathBytes)
|
|
|
|
rv[string(pathString)] = struct{}{}
|
2017-12-14 01:58:36 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
return rv, err
|
2017-12-14 01:58:36 +01:00
|
|
|
}
|