2017-12-07 00:33:47 +01:00
|
|
|
// Copyright (c) 2017 Couchbase, Inc.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
package scorch
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"fmt"
|
2017-12-14 01:58:36 +01:00
|
|
|
"io/ioutil"
|
2017-12-07 00:33:47 +01:00
|
|
|
"log"
|
|
|
|
"os"
|
2017-12-14 01:58:36 +01:00
|
|
|
"path/filepath"
|
2017-12-16 01:26:23 +01:00
|
|
|
"strconv"
|
2017-12-07 00:33:47 +01:00
|
|
|
"strings"
|
2017-12-28 01:06:31 +01:00
|
|
|
"sync/atomic"
|
2017-12-27 03:11:14 +01:00
|
|
|
"time"
|
2017-12-07 00:33:47 +01:00
|
|
|
|
|
|
|
"github.com/RoaringBitmap/roaring"
|
|
|
|
"github.com/blevesearch/bleve/index/scorch/segment"
|
|
|
|
"github.com/blevesearch/bleve/index/scorch/segment/mem"
|
2017-12-09 20:28:50 +01:00
|
|
|
"github.com/blevesearch/bleve/index/scorch/segment/zap"
|
2017-12-07 00:33:47 +01:00
|
|
|
"github.com/boltdb/bolt"
|
|
|
|
)
|
|
|
|
|
|
|
|
type notificationChan chan struct{}
|
|
|
|
|
|
|
|
func (s *Scorch) persisterLoop() {
|
2017-12-17 17:23:00 +01:00
|
|
|
defer s.asyncTasks.Done()
|
|
|
|
|
|
|
|
var notifyChs []notificationChan
|
2017-12-07 00:33:47 +01:00
|
|
|
var lastPersistedEpoch uint64
|
|
|
|
OUTER:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-s.closeCh:
|
|
|
|
break OUTER
|
2017-12-17 17:23:00 +01:00
|
|
|
case notifyCh := <-s.persisterNotifier:
|
|
|
|
notifyChs = append(notifyChs, notifyCh)
|
2017-12-07 00:33:47 +01:00
|
|
|
default:
|
2017-12-17 17:23:00 +01:00
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
|
2017-12-17 17:23:00 +01:00
|
|
|
var ourSnapshot *IndexSnapshot
|
|
|
|
var ourPersisted []chan error
|
2017-12-07 00:33:47 +01:00
|
|
|
|
2017-12-17 17:23:00 +01:00
|
|
|
// check to see if there is a new snapshot to persist
|
|
|
|
s.rootLock.Lock()
|
|
|
|
if s.root != nil && s.root.epoch > lastPersistedEpoch {
|
2017-12-07 00:33:47 +01:00
|
|
|
ourSnapshot = s.root
|
2017-12-13 22:10:44 +01:00
|
|
|
ourSnapshot.AddRef()
|
2017-12-17 17:23:00 +01:00
|
|
|
ourPersisted = s.rootPersisted
|
|
|
|
s.rootPersisted = nil
|
|
|
|
}
|
|
|
|
s.rootLock.Unlock()
|
2017-12-07 00:33:47 +01:00
|
|
|
|
2017-12-17 17:23:00 +01:00
|
|
|
if ourSnapshot != nil {
|
2017-12-27 03:11:14 +01:00
|
|
|
startTime := time.Now()
|
|
|
|
|
2017-12-17 17:23:00 +01:00
|
|
|
err := s.persistSnapshot(ourSnapshot)
|
|
|
|
for _, ch := range ourPersisted {
|
2017-12-07 00:33:47 +01:00
|
|
|
if err != nil {
|
2017-12-17 17:23:00 +01:00
|
|
|
ch <- err
|
2017-12-13 19:41:03 +01:00
|
|
|
}
|
2017-12-17 17:23:00 +01:00
|
|
|
close(ch)
|
|
|
|
}
|
|
|
|
if err != nil {
|
2018-01-05 21:29:01 +01:00
|
|
|
s.fireAsyncError(fmt.Errorf("got err persisting snapshot: %v", err))
|
2017-12-17 17:23:00 +01:00
|
|
|
_ = ourSnapshot.DecRef()
|
|
|
|
continue OUTER
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
2017-12-28 01:06:31 +01:00
|
|
|
|
2017-12-17 17:23:00 +01:00
|
|
|
lastPersistedEpoch = ourSnapshot.epoch
|
|
|
|
for _, notifyCh := range notifyChs {
|
|
|
|
close(notifyCh)
|
|
|
|
}
|
|
|
|
notifyChs = nil
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = ourSnapshot.DecRef()
|
2017-12-07 00:33:47 +01:00
|
|
|
|
2017-12-17 17:23:00 +01:00
|
|
|
changed := false
|
|
|
|
s.rootLock.RLock()
|
|
|
|
if s.root != nil && s.root.epoch != lastPersistedEpoch {
|
|
|
|
changed = true
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
2017-12-17 17:23:00 +01:00
|
|
|
s.rootLock.RUnlock()
|
2017-12-27 03:11:14 +01:00
|
|
|
|
|
|
|
s.fireEvent(EventKindPersisterProgress, time.Since(startTime))
|
|
|
|
|
2017-12-17 17:23:00 +01:00
|
|
|
if changed {
|
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// tell the introducer we're waiting for changes
|
|
|
|
w := &epochWatcher{
|
|
|
|
epoch: lastPersistedEpoch,
|
|
|
|
notifyCh: make(notificationChan, 1),
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-s.closeCh:
|
|
|
|
break OUTER
|
|
|
|
case s.introducerNotifier <- w:
|
|
|
|
}
|
|
|
|
|
|
|
|
s.removeOldData() // might as well cleanup while waiting
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-s.closeCh:
|
|
|
|
break OUTER
|
|
|
|
case <-w.notifyCh:
|
|
|
|
// woken up, next loop should pick up work
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
|
|
|
|
// start a write transaction
|
|
|
|
tx, err := s.rootBolt.Begin(true)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-12-13 19:55:06 +01:00
|
|
|
// defer fsync of the rootbolt
|
|
|
|
defer func() {
|
|
|
|
if err == nil {
|
|
|
|
err = s.rootBolt.Sync()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
// defer commit/rollback transaction
|
2017-12-07 00:33:47 +01:00
|
|
|
defer func() {
|
|
|
|
if err == nil {
|
|
|
|
err = tx.Commit()
|
|
|
|
} else {
|
|
|
|
_ = tx.Rollback()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
newSnapshotKey := segment.EncodeUvarintAscending(nil, snapshot.epoch)
|
|
|
|
snapshotBucket, err := snapshotsBucket.CreateBucketIfNotExists(newSnapshotKey)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// persist internal values
|
|
|
|
internalBucket, err := snapshotBucket.CreateBucketIfNotExists(boltInternalKey)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// TODO optimize writing these in order?
|
|
|
|
for k, v := range snapshot.internal {
|
2017-12-07 00:36:14 +01:00
|
|
|
err = internalBucket.Put([]byte(k), v)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
var filenames []string
|
2017-12-07 00:33:47 +01:00
|
|
|
newSegmentPaths := make(map[uint64]string)
|
|
|
|
|
|
|
|
// first ensure that each segment in this snapshot has been persisted
|
|
|
|
for i, segmentSnapshot := range snapshot.segment {
|
|
|
|
snapshotSegmentKey := segment.EncodeUvarintAscending(nil, uint64(i))
|
|
|
|
snapshotSegmentBucket, err2 := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey)
|
|
|
|
if err2 != nil {
|
|
|
|
return err2
|
|
|
|
}
|
|
|
|
switch seg := segmentSnapshot.segment.(type) {
|
|
|
|
case *mem.Segment:
|
|
|
|
// need to persist this to disk
|
2017-12-14 19:49:33 +01:00
|
|
|
filename := zapFileName(segmentSnapshot.id)
|
2017-12-07 00:33:47 +01:00
|
|
|
path := s.path + string(os.PathSeparator) + filename
|
2017-12-09 20:28:50 +01:00
|
|
|
err2 := zap.PersistSegment(seg, path, 1024)
|
2017-12-07 00:33:47 +01:00
|
|
|
if err2 != nil {
|
|
|
|
return fmt.Errorf("error persisting segment: %v", err2)
|
|
|
|
}
|
|
|
|
newSegmentPaths[segmentSnapshot.id] = path
|
2017-12-07 00:36:14 +01:00
|
|
|
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-12-14 19:49:33 +01:00
|
|
|
filenames = append(filenames, filename)
|
2017-12-09 20:28:50 +01:00
|
|
|
case *zap.Segment:
|
2017-12-07 00:33:47 +01:00
|
|
|
path := seg.Path()
|
|
|
|
filename := strings.TrimPrefix(path, s.path+string(os.PathSeparator))
|
2017-12-07 00:36:14 +01:00
|
|
|
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-12-14 19:49:33 +01:00
|
|
|
filenames = append(filenames, filename)
|
2017-12-07 00:33:47 +01:00
|
|
|
default:
|
|
|
|
return fmt.Errorf("unknown segment type: %T", seg)
|
|
|
|
}
|
|
|
|
// store current deleted bits
|
|
|
|
var roaringBuf bytes.Buffer
|
|
|
|
if segmentSnapshot.deleted != nil {
|
|
|
|
_, err = segmentSnapshot.deleted.WriteTo(&roaringBuf)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error persisting roaring bytes: %v", err)
|
|
|
|
}
|
2017-12-07 00:36:14 +01:00
|
|
|
err = snapshotSegmentBucket.Put(boltDeletedKey, roaringBuf.Bytes())
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
attempt to fix core reference counting issues
Observed problem:
Persisted index state (in root bolt) would contain index snapshots which
pointed to index files that did not exist.
Debugging this uncovered two main problems:
1. At the end of persisting a snapshot, the persister creates a new index
snapshot with the SAME epoch as the current root, only it replaces in-memory
segments with the new disk based ones. This is problematic because reference
counting an index segment triggers "eligible for deletion". And eligible for
deletion is keyed by epoch. So having two separate instances going by the same
epoch is problematic. Specifically, one of them gets to 0 before the other,
and we wrongly conclude it's eligible for deletion, when in fact the "other"
instance with same epoch is actually still in use.
To address this problem, we have modified the behavior of the persister. Now,
upon completion of persistence, ONLY if new files were actually created do we
proceed to introduce a new snapshot. AND, this new snapshot now gets it's own
brand new epoch. BOTH of these are important because since the persister now
also introduces a new epoch, it will see this epoch again in the future AND be
expected to persist it. That is OK (mostly harmless), but we cannot allow it
to form a loop. Checking that new files were actually introduced is what
short-circuits the potential loop. The new epoch introduced by the persister,
if seen again will not have any new segments that actually need persisting to
disk, and the cycle is stopped.
2. The implementation of NumSnapshotsToKeep, and related code to deleted old
snapshots from the root bolt also contains problems. Specifically, the
determination of which snapshots to keep vs delete did not consider which ones
were actually persisted. So, lets say you had set NumSnapshotsToKeep to 3, if
the introducer gets 3 snapshots ahead of the persister, what can happen is that
the three snapshots we choose to keep are all in memory. We now wrongly delete
all of the snapshots from the root bolt. But it gets worse, in this instant of
time, we now have files on disk that nothing in the root bolt points to, so we
also go ahead and delete those files. Those files were still being referenced
by the in-memory snapshots. But, now even if they get persisted to disk, they
simply have references to non-existent files. Opening up one of these indexes
results in lost data (often everything).
To address this problem, we made large change to the way this section of code
operates. First, we now start with a list of all epochs actually persisted in
the root bolt. Second, we set aside NumSnapshotsToKeep of these snapshots to
keep. Third, anything else in the eligibleForRemoval list will be deleted. I
suspect this code is slower and less elegant, but I think it is more correct.
Also, previously NumSnapshotsToKeep defaulted to 0, I have now defaulted it to
1, which feels like saner out-of-the-box behavior (though it's debatable if the
original intent was perhaps instead for "extra" snapshots to keep, but with the
variable named as it is, 1 makes more sense to me)
Other minor changes included in this change:
- Location of 'nextSnapshotEpoch', 'eligibleForRemoval', and
'ineligibleForRemoval' members of Scorch struct were moved into the
paragraph with 'rootLock' to clarify that you must hold the lock to access it.
- TestBatchRaceBug260 was updated to properly Close() the index, which leads to
occasional test failures.
2018-01-02 22:09:55 +01:00
|
|
|
// only alter the root if we actually persisted a segment
|
|
|
|
// (sometimes its just a new snapshot, possibly with new internal values)
|
|
|
|
if len(newSegmentPaths) > 0 {
|
|
|
|
// now try to open all the new snapshots
|
|
|
|
newSegments := make(map[uint64]segment.Segment)
|
|
|
|
for segmentID, path := range newSegmentPaths {
|
|
|
|
newSegments[segmentID], err = zap.Open(path)
|
|
|
|
if err != nil {
|
|
|
|
for _, s := range newSegments {
|
|
|
|
if s != nil {
|
|
|
|
_ = s.Close() // cleanup segments that were successfully opened
|
|
|
|
}
|
2017-12-14 16:29:19 +01:00
|
|
|
}
|
attempt to fix core reference counting issues
Observed problem:
Persisted index state (in root bolt) would contain index snapshots which
pointed to index files that did not exist.
Debugging this uncovered two main problems:
1. At the end of persisting a snapshot, the persister creates a new index
snapshot with the SAME epoch as the current root, only it replaces in-memory
segments with the new disk based ones. This is problematic because reference
counting an index segment triggers "eligible for deletion". And eligible for
deletion is keyed by epoch. So having two separate instances going by the same
epoch is problematic. Specifically, one of them gets to 0 before the other,
and we wrongly conclude it's eligible for deletion, when in fact the "other"
instance with same epoch is actually still in use.
To address this problem, we have modified the behavior of the persister. Now,
upon completion of persistence, ONLY if new files were actually created do we
proceed to introduce a new snapshot. AND, this new snapshot now gets it's own
brand new epoch. BOTH of these are important because since the persister now
also introduces a new epoch, it will see this epoch again in the future AND be
expected to persist it. That is OK (mostly harmless), but we cannot allow it
to form a loop. Checking that new files were actually introduced is what
short-circuits the potential loop. The new epoch introduced by the persister,
if seen again will not have any new segments that actually need persisting to
disk, and the cycle is stopped.
2. The implementation of NumSnapshotsToKeep, and related code to deleted old
snapshots from the root bolt also contains problems. Specifically, the
determination of which snapshots to keep vs delete did not consider which ones
were actually persisted. So, lets say you had set NumSnapshotsToKeep to 3, if
the introducer gets 3 snapshots ahead of the persister, what can happen is that
the three snapshots we choose to keep are all in memory. We now wrongly delete
all of the snapshots from the root bolt. But it gets worse, in this instant of
time, we now have files on disk that nothing in the root bolt points to, so we
also go ahead and delete those files. Those files were still being referenced
by the in-memory snapshots. But, now even if they get persisted to disk, they
simply have references to non-existent files. Opening up one of these indexes
results in lost data (often everything).
To address this problem, we made large change to the way this section of code
operates. First, we now start with a list of all epochs actually persisted in
the root bolt. Second, we set aside NumSnapshotsToKeep of these snapshots to
keep. Third, anything else in the eligibleForRemoval list will be deleted. I
suspect this code is slower and less elegant, but I think it is more correct.
Also, previously NumSnapshotsToKeep defaulted to 0, I have now defaulted it to
1, which feels like saner out-of-the-box behavior (though it's debatable if the
original intent was perhaps instead for "extra" snapshots to keep, but with the
variable named as it is, 1 makes more sense to me)
Other minor changes included in this change:
- Location of 'nextSnapshotEpoch', 'eligibleForRemoval', and
'ineligibleForRemoval' members of Scorch struct were moved into the
paragraph with 'rootLock' to clarify that you must hold the lock to access it.
- TestBatchRaceBug260 was updated to properly Close() the index, which leads to
occasional test failures.
2018-01-02 22:09:55 +01:00
|
|
|
return fmt.Errorf("error opening new segment at %s, %v", path, err)
|
2017-12-13 22:10:44 +01:00
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
|
attempt to fix core reference counting issues
Observed problem:
Persisted index state (in root bolt) would contain index snapshots which
pointed to index files that did not exist.
Debugging this uncovered two main problems:
1. At the end of persisting a snapshot, the persister creates a new index
snapshot with the SAME epoch as the current root, only it replaces in-memory
segments with the new disk based ones. This is problematic because reference
counting an index segment triggers "eligible for deletion". And eligible for
deletion is keyed by epoch. So having two separate instances going by the same
epoch is problematic. Specifically, one of them gets to 0 before the other,
and we wrongly conclude it's eligible for deletion, when in fact the "other"
instance with same epoch is actually still in use.
To address this problem, we have modified the behavior of the persister. Now,
upon completion of persistence, ONLY if new files were actually created do we
proceed to introduce a new snapshot. AND, this new snapshot now gets it's own
brand new epoch. BOTH of these are important because since the persister now
also introduces a new epoch, it will see this epoch again in the future AND be
expected to persist it. That is OK (mostly harmless), but we cannot allow it
to form a loop. Checking that new files were actually introduced is what
short-circuits the potential loop. The new epoch introduced by the persister,
if seen again will not have any new segments that actually need persisting to
disk, and the cycle is stopped.
2. The implementation of NumSnapshotsToKeep, and related code to deleted old
snapshots from the root bolt also contains problems. Specifically, the
determination of which snapshots to keep vs delete did not consider which ones
were actually persisted. So, lets say you had set NumSnapshotsToKeep to 3, if
the introducer gets 3 snapshots ahead of the persister, what can happen is that
the three snapshots we choose to keep are all in memory. We now wrongly delete
all of the snapshots from the root bolt. But it gets worse, in this instant of
time, we now have files on disk that nothing in the root bolt points to, so we
also go ahead and delete those files. Those files were still being referenced
by the in-memory snapshots. But, now even if they get persisted to disk, they
simply have references to non-existent files. Opening up one of these indexes
results in lost data (often everything).
To address this problem, we made large change to the way this section of code
operates. First, we now start with a list of all epochs actually persisted in
the root bolt. Second, we set aside NumSnapshotsToKeep of these snapshots to
keep. Third, anything else in the eligibleForRemoval list will be deleted. I
suspect this code is slower and less elegant, but I think it is more correct.
Also, previously NumSnapshotsToKeep defaulted to 0, I have now defaulted it to
1, which feels like saner out-of-the-box behavior (though it's debatable if the
original intent was perhaps instead for "extra" snapshots to keep, but with the
variable named as it is, 1 makes more sense to me)
Other minor changes included in this change:
- Location of 'nextSnapshotEpoch', 'eligibleForRemoval', and
'ineligibleForRemoval' members of Scorch struct were moved into the
paragraph with 'rootLock' to clarify that you must hold the lock to access it.
- TestBatchRaceBug260 was updated to properly Close() the index, which leads to
occasional test failures.
2018-01-02 22:09:55 +01:00
|
|
|
s.rootLock.Lock()
|
|
|
|
newIndexSnapshot := &IndexSnapshot{
|
|
|
|
parent: s,
|
|
|
|
epoch: s.nextSnapshotEpoch,
|
|
|
|
segment: make([]*SegmentSnapshot, len(s.root.segment)),
|
|
|
|
offsets: make([]uint64, len(s.root.offsets)),
|
|
|
|
internal: make(map[string][]byte, len(s.root.internal)),
|
|
|
|
refs: 1,
|
|
|
|
}
|
|
|
|
s.nextSnapshotEpoch++
|
|
|
|
for i, segmentSnapshot := range s.root.segment {
|
|
|
|
// see if this segment has been replaced
|
|
|
|
if replacement, ok := newSegments[segmentSnapshot.id]; ok {
|
|
|
|
newSegmentSnapshot := &SegmentSnapshot{
|
|
|
|
id: segmentSnapshot.id,
|
|
|
|
segment: replacement,
|
|
|
|
deleted: segmentSnapshot.deleted,
|
|
|
|
cachedDocs: segmentSnapshot.cachedDocs,
|
|
|
|
}
|
|
|
|
newIndexSnapshot.segment[i] = newSegmentSnapshot
|
|
|
|
// update items persisted incase of a new segment snapshot
|
|
|
|
atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count())
|
|
|
|
} else {
|
|
|
|
newIndexSnapshot.segment[i] = s.root.segment[i]
|
|
|
|
newIndexSnapshot.segment[i].segment.AddRef()
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
attempt to fix core reference counting issues
Observed problem:
Persisted index state (in root bolt) would contain index snapshots which
pointed to index files that did not exist.
Debugging this uncovered two main problems:
1. At the end of persisting a snapshot, the persister creates a new index
snapshot with the SAME epoch as the current root, only it replaces in-memory
segments with the new disk based ones. This is problematic because reference
counting an index segment triggers "eligible for deletion". And eligible for
deletion is keyed by epoch. So having two separate instances going by the same
epoch is problematic. Specifically, one of them gets to 0 before the other,
and we wrongly conclude it's eligible for deletion, when in fact the "other"
instance with same epoch is actually still in use.
To address this problem, we have modified the behavior of the persister. Now,
upon completion of persistence, ONLY if new files were actually created do we
proceed to introduce a new snapshot. AND, this new snapshot now gets it's own
brand new epoch. BOTH of these are important because since the persister now
also introduces a new epoch, it will see this epoch again in the future AND be
expected to persist it. That is OK (mostly harmless), but we cannot allow it
to form a loop. Checking that new files were actually introduced is what
short-circuits the potential loop. The new epoch introduced by the persister,
if seen again will not have any new segments that actually need persisting to
disk, and the cycle is stopped.
2. The implementation of NumSnapshotsToKeep, and related code to deleted old
snapshots from the root bolt also contains problems. Specifically, the
determination of which snapshots to keep vs delete did not consider which ones
were actually persisted. So, lets say you had set NumSnapshotsToKeep to 3, if
the introducer gets 3 snapshots ahead of the persister, what can happen is that
the three snapshots we choose to keep are all in memory. We now wrongly delete
all of the snapshots from the root bolt. But it gets worse, in this instant of
time, we now have files on disk that nothing in the root bolt points to, so we
also go ahead and delete those files. Those files were still being referenced
by the in-memory snapshots. But, now even if they get persisted to disk, they
simply have references to non-existent files. Opening up one of these indexes
results in lost data (often everything).
To address this problem, we made large change to the way this section of code
operates. First, we now start with a list of all epochs actually persisted in
the root bolt. Second, we set aside NumSnapshotsToKeep of these snapshots to
keep. Third, anything else in the eligibleForRemoval list will be deleted. I
suspect this code is slower and less elegant, but I think it is more correct.
Also, previously NumSnapshotsToKeep defaulted to 0, I have now defaulted it to
1, which feels like saner out-of-the-box behavior (though it's debatable if the
original intent was perhaps instead for "extra" snapshots to keep, but with the
variable named as it is, 1 makes more sense to me)
Other minor changes included in this change:
- Location of 'nextSnapshotEpoch', 'eligibleForRemoval', and
'ineligibleForRemoval' members of Scorch struct were moved into the
paragraph with 'rootLock' to clarify that you must hold the lock to access it.
- TestBatchRaceBug260 was updated to properly Close() the index, which leads to
occasional test failures.
2018-01-02 22:09:55 +01:00
|
|
|
newIndexSnapshot.offsets[i] = s.root.offsets[i]
|
|
|
|
}
|
|
|
|
for k, v := range s.root.internal {
|
|
|
|
newIndexSnapshot.internal[k] = v
|
|
|
|
}
|
|
|
|
for _, filename := range filenames {
|
|
|
|
delete(s.ineligibleForRemoval, filename)
|
|
|
|
}
|
|
|
|
rootPrev := s.root
|
|
|
|
s.root = newIndexSnapshot
|
|
|
|
s.rootLock.Unlock()
|
|
|
|
if rootPrev != nil {
|
|
|
|
_ = rootPrev.DecRef()
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
2017-12-13 22:10:44 +01:00
|
|
|
}
|
|
|
|
|
2017-12-07 00:33:47 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
func zapFileName(epoch uint64) string {
|
|
|
|
return fmt.Sprintf("%012x.zap", epoch)
|
|
|
|
}
|
|
|
|
|
2017-12-07 00:33:47 +01:00
|
|
|
// bolt snapshot code
|
|
|
|
|
|
|
|
var boltSnapshotsBucket = []byte{'s'}
|
|
|
|
var boltPathKey = []byte{'p'}
|
|
|
|
var boltDeletedKey = []byte{'d'}
|
|
|
|
var boltInternalKey = []byte{'i'}
|
|
|
|
|
|
|
|
func (s *Scorch) loadFromBolt() error {
|
|
|
|
return s.rootBolt.View(func(tx *bolt.Tx) error {
|
|
|
|
snapshots := tx.Bucket(boltSnapshotsBucket)
|
|
|
|
if snapshots == nil {
|
|
|
|
return nil
|
|
|
|
}
|
2017-12-13 23:54:58 +01:00
|
|
|
foundRoot := false
|
2017-12-07 00:33:47 +01:00
|
|
|
c := snapshots.Cursor()
|
|
|
|
for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
|
|
|
|
_, snapshotEpoch, err := segment.DecodeUvarintAscending(k)
|
|
|
|
if err != nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
log.Printf("unable to parse segment epoch %x, continuing", k)
|
2017-12-07 00:33:47 +01:00
|
|
|
continue
|
|
|
|
}
|
2017-12-13 23:54:58 +01:00
|
|
|
if foundRoot {
|
|
|
|
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
|
|
|
|
continue
|
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
snapshot := snapshots.Bucket(k)
|
|
|
|
if snapshot == nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
log.Printf("snapshot key, but bucket missing %x, continuing", k)
|
2017-12-13 23:54:58 +01:00
|
|
|
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
|
2017-12-07 00:33:47 +01:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
indexSnapshot, err := s.loadSnapshot(snapshot)
|
|
|
|
if err != nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
log.Printf("unable to load snapshot, %v, continuing", err)
|
2017-12-13 23:54:58 +01:00
|
|
|
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
|
2017-12-07 00:33:47 +01:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
indexSnapshot.epoch = snapshotEpoch
|
|
|
|
// set the nextSegmentID
|
2017-12-16 01:26:23 +01:00
|
|
|
s.nextSegmentID, err = s.maxSegmentIDOnDisk()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
s.nextSegmentID++
|
|
|
|
s.nextSnapshotEpoch = snapshotEpoch + 1
|
2017-12-14 23:40:33 +01:00
|
|
|
s.rootLock.Lock()
|
2017-12-13 23:54:58 +01:00
|
|
|
if s.root != nil {
|
|
|
|
_ = s.root.DecRef()
|
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
s.root = indexSnapshot
|
2017-12-14 23:40:33 +01:00
|
|
|
s.rootLock.Unlock()
|
2017-12-13 23:54:58 +01:00
|
|
|
foundRoot = true
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2018-01-05 17:50:07 +01:00
|
|
|
// LoadSnapshot loads the segment with the specified epoch
|
|
|
|
// NOTE: this is currently ONLY intended to be used by the command-line tool
|
|
|
|
func (s *Scorch) LoadSnapshot(epoch uint64) (rv *IndexSnapshot, err error) {
|
|
|
|
err = s.rootBolt.View(func(tx *bolt.Tx) error {
|
|
|
|
snapshots := tx.Bucket(boltSnapshotsBucket)
|
|
|
|
if snapshots == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
snapshotKey := segment.EncodeUvarintAscending(nil, epoch)
|
|
|
|
snapshot := snapshots.Bucket(snapshotKey)
|
|
|
|
if snapshot == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
rv, err = s.loadSnapshot(snapshot)
|
|
|
|
return err
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return rv, nil
|
|
|
|
}
|
|
|
|
|
2017-12-07 00:33:47 +01:00
|
|
|
func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
|
|
|
|
|
|
|
|
rv := &IndexSnapshot{
|
2017-12-13 23:54:58 +01:00
|
|
|
parent: s,
|
2017-12-07 00:33:47 +01:00
|
|
|
internal: make(map[string][]byte),
|
2017-12-13 22:10:44 +01:00
|
|
|
refs: 1,
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
var running uint64
|
|
|
|
c := snapshot.Cursor()
|
|
|
|
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
|
|
|
if k[0] == boltInternalKey[0] {
|
|
|
|
internalBucket := snapshot.Bucket(k)
|
2017-12-07 00:36:14 +01:00
|
|
|
err := internalBucket.ForEach(func(key []byte, val []byte) error {
|
2017-12-07 00:33:47 +01:00
|
|
|
copiedVal := append([]byte(nil), val...)
|
|
|
|
rv.internal[string(key)] = copiedVal
|
|
|
|
return nil
|
|
|
|
})
|
2017-12-07 00:36:14 +01:00
|
|
|
if err != nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = rv.DecRef()
|
2017-12-07 00:36:14 +01:00
|
|
|
return nil, err
|
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
} else {
|
|
|
|
segmentBucket := snapshot.Bucket(k)
|
|
|
|
if segmentBucket == nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = rv.DecRef()
|
2017-12-07 00:33:47 +01:00
|
|
|
return nil, fmt.Errorf("segment key, but bucket missing % x", k)
|
|
|
|
}
|
|
|
|
segmentSnapshot, err := s.loadSegment(segmentBucket)
|
|
|
|
if err != nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = rv.DecRef()
|
2017-12-07 00:33:47 +01:00
|
|
|
return nil, fmt.Errorf("failed to load segment: %v", err)
|
|
|
|
}
|
|
|
|
_, segmentSnapshot.id, err = segment.DecodeUvarintAscending(k)
|
|
|
|
if err != nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = rv.DecRef()
|
2017-12-07 00:33:47 +01:00
|
|
|
return nil, fmt.Errorf("failed to decode segment id: %v", err)
|
|
|
|
}
|
|
|
|
rv.segment = append(rv.segment, segmentSnapshot)
|
|
|
|
rv.offsets = append(rv.offsets, running)
|
|
|
|
running += segmentSnapshot.segment.Count()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return rv, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, error) {
|
|
|
|
pathBytes := segmentBucket.Get(boltPathKey)
|
|
|
|
if pathBytes == nil {
|
|
|
|
return nil, fmt.Errorf("segment path missing")
|
|
|
|
}
|
|
|
|
segmentPath := s.path + string(os.PathSeparator) + string(pathBytes)
|
2017-12-09 20:28:50 +01:00
|
|
|
segment, err := zap.Open(segmentPath)
|
2017-12-07 00:33:47 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("error opening bolt segment: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
rv := &SegmentSnapshot{
|
2017-12-14 16:27:39 +01:00
|
|
|
segment: segment,
|
|
|
|
cachedDocs: &cachedDocs{cache: nil},
|
2017-12-07 00:33:47 +01:00
|
|
|
}
|
|
|
|
deletedBytes := segmentBucket.Get(boltDeletedKey)
|
|
|
|
if deletedBytes != nil {
|
|
|
|
deletedBitmap := roaring.NewBitmap()
|
|
|
|
r := bytes.NewReader(deletedBytes)
|
|
|
|
_, err := deletedBitmap.ReadFrom(r)
|
|
|
|
if err != nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = segment.Close()
|
2017-12-07 00:33:47 +01:00
|
|
|
return nil, fmt.Errorf("error reading deleted bytes: %v", err)
|
|
|
|
}
|
|
|
|
rv.deleted = deletedBitmap
|
|
|
|
}
|
|
|
|
|
|
|
|
return rv, nil
|
|
|
|
}
|
2017-12-13 23:54:58 +01:00
|
|
|
|
|
|
|
type uint64Descending []uint64
|
|
|
|
|
|
|
|
func (p uint64Descending) Len() int { return len(p) }
|
|
|
|
func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] }
|
|
|
|
func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
func (s *Scorch) removeOldData() {
|
2017-12-14 01:58:36 +01:00
|
|
|
removed, err := s.removeOldBoltSnapshots()
|
|
|
|
if err != nil {
|
2018-01-05 21:29:01 +01:00
|
|
|
s.fireAsyncError(fmt.Errorf("got err removing old bolt snapshots: %v", err))
|
2017-12-14 01:58:36 +01:00
|
|
|
}
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
if removed > 0 {
|
2017-12-14 01:58:36 +01:00
|
|
|
err = s.removeOldZapFiles()
|
|
|
|
if err != nil {
|
2018-01-05 21:29:01 +01:00
|
|
|
s.fireAsyncError(fmt.Errorf("got err removing old zap files: %v", err))
|
2017-12-14 01:58:36 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-12-13 23:54:58 +01:00
|
|
|
// NumSnapshotsToKeep represents how many recent, old snapshots to
|
|
|
|
// keep around per Scorch instance. Useful for apps that require
|
|
|
|
// rollback'ability.
|
attempt to fix core reference counting issues
Observed problem:
Persisted index state (in root bolt) would contain index snapshots which
pointed to index files that did not exist.
Debugging this uncovered two main problems:
1. At the end of persisting a snapshot, the persister creates a new index
snapshot with the SAME epoch as the current root, only it replaces in-memory
segments with the new disk based ones. This is problematic because reference
counting an index segment triggers "eligible for deletion". And eligible for
deletion is keyed by epoch. So having two separate instances going by the same
epoch is problematic. Specifically, one of them gets to 0 before the other,
and we wrongly conclude it's eligible for deletion, when in fact the "other"
instance with same epoch is actually still in use.
To address this problem, we have modified the behavior of the persister. Now,
upon completion of persistence, ONLY if new files were actually created do we
proceed to introduce a new snapshot. AND, this new snapshot now gets it's own
brand new epoch. BOTH of these are important because since the persister now
also introduces a new epoch, it will see this epoch again in the future AND be
expected to persist it. That is OK (mostly harmless), but we cannot allow it
to form a loop. Checking that new files were actually introduced is what
short-circuits the potential loop. The new epoch introduced by the persister,
if seen again will not have any new segments that actually need persisting to
disk, and the cycle is stopped.
2. The implementation of NumSnapshotsToKeep, and related code to deleted old
snapshots from the root bolt also contains problems. Specifically, the
determination of which snapshots to keep vs delete did not consider which ones
were actually persisted. So, lets say you had set NumSnapshotsToKeep to 3, if
the introducer gets 3 snapshots ahead of the persister, what can happen is that
the three snapshots we choose to keep are all in memory. We now wrongly delete
all of the snapshots from the root bolt. But it gets worse, in this instant of
time, we now have files on disk that nothing in the root bolt points to, so we
also go ahead and delete those files. Those files were still being referenced
by the in-memory snapshots. But, now even if they get persisted to disk, they
simply have references to non-existent files. Opening up one of these indexes
results in lost data (often everything).
To address this problem, we made large change to the way this section of code
operates. First, we now start with a list of all epochs actually persisted in
the root bolt. Second, we set aside NumSnapshotsToKeep of these snapshots to
keep. Third, anything else in the eligibleForRemoval list will be deleted. I
suspect this code is slower and less elegant, but I think it is more correct.
Also, previously NumSnapshotsToKeep defaulted to 0, I have now defaulted it to
1, which feels like saner out-of-the-box behavior (though it's debatable if the
original intent was perhaps instead for "extra" snapshots to keep, but with the
variable named as it is, 1 makes more sense to me)
Other minor changes included in this change:
- Location of 'nextSnapshotEpoch', 'eligibleForRemoval', and
'ineligibleForRemoval' members of Scorch struct were moved into the
paragraph with 'rootLock' to clarify that you must hold the lock to access it.
- TestBatchRaceBug260 was updated to properly Close() the index, which leads to
occasional test failures.
2018-01-02 22:09:55 +01:00
|
|
|
var NumSnapshotsToKeep = 1
|
2017-12-13 23:54:58 +01:00
|
|
|
|
|
|
|
// Removes enough snapshots from the rootBolt so that the
|
|
|
|
// s.eligibleForRemoval stays under the NumSnapshotsToKeep policy.
|
2017-12-14 01:58:36 +01:00
|
|
|
func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) {
|
2018-01-05 16:17:18 +01:00
|
|
|
persistedEpochs, err := s.RootBoltSnapshotEpochs()
|
attempt to fix core reference counting issues
Observed problem:
Persisted index state (in root bolt) would contain index snapshots which
pointed to index files that did not exist.
Debugging this uncovered two main problems:
1. At the end of persisting a snapshot, the persister creates a new index
snapshot with the SAME epoch as the current root, only it replaces in-memory
segments with the new disk based ones. This is problematic because reference
counting an index segment triggers "eligible for deletion". And eligible for
deletion is keyed by epoch. So having two separate instances going by the same
epoch is problematic. Specifically, one of them gets to 0 before the other,
and we wrongly conclude it's eligible for deletion, when in fact the "other"
instance with same epoch is actually still in use.
To address this problem, we have modified the behavior of the persister. Now,
upon completion of persistence, ONLY if new files were actually created do we
proceed to introduce a new snapshot. AND, this new snapshot now gets it's own
brand new epoch. BOTH of these are important because since the persister now
also introduces a new epoch, it will see this epoch again in the future AND be
expected to persist it. That is OK (mostly harmless), but we cannot allow it
to form a loop. Checking that new files were actually introduced is what
short-circuits the potential loop. The new epoch introduced by the persister,
if seen again will not have any new segments that actually need persisting to
disk, and the cycle is stopped.
2. The implementation of NumSnapshotsToKeep, and related code to deleted old
snapshots from the root bolt also contains problems. Specifically, the
determination of which snapshots to keep vs delete did not consider which ones
were actually persisted. So, lets say you had set NumSnapshotsToKeep to 3, if
the introducer gets 3 snapshots ahead of the persister, what can happen is that
the three snapshots we choose to keep are all in memory. We now wrongly delete
all of the snapshots from the root bolt. But it gets worse, in this instant of
time, we now have files on disk that nothing in the root bolt points to, so we
also go ahead and delete those files. Those files were still being referenced
by the in-memory snapshots. But, now even if they get persisted to disk, they
simply have references to non-existent files. Opening up one of these indexes
results in lost data (often everything).
To address this problem, we made large change to the way this section of code
operates. First, we now start with a list of all epochs actually persisted in
the root bolt. Second, we set aside NumSnapshotsToKeep of these snapshots to
keep. Third, anything else in the eligibleForRemoval list will be deleted. I
suspect this code is slower and less elegant, but I think it is more correct.
Also, previously NumSnapshotsToKeep defaulted to 0, I have now defaulted it to
1, which feels like saner out-of-the-box behavior (though it's debatable if the
original intent was perhaps instead for "extra" snapshots to keep, but with the
variable named as it is, 1 makes more sense to me)
Other minor changes included in this change:
- Location of 'nextSnapshotEpoch', 'eligibleForRemoval', and
'ineligibleForRemoval' members of Scorch struct were moved into the
paragraph with 'rootLock' to clarify that you must hold the lock to access it.
- TestBatchRaceBug260 was updated to properly Close() the index, which leads to
occasional test failures.
2018-01-02 22:09:55 +01:00
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
2017-12-13 23:54:58 +01:00
|
|
|
|
attempt to fix core reference counting issues
Observed problem:
Persisted index state (in root bolt) would contain index snapshots which
pointed to index files that did not exist.
Debugging this uncovered two main problems:
1. At the end of persisting a snapshot, the persister creates a new index
snapshot with the SAME epoch as the current root, only it replaces in-memory
segments with the new disk based ones. This is problematic because reference
counting an index segment triggers "eligible for deletion". And eligible for
deletion is keyed by epoch. So having two separate instances going by the same
epoch is problematic. Specifically, one of them gets to 0 before the other,
and we wrongly conclude it's eligible for deletion, when in fact the "other"
instance with same epoch is actually still in use.
To address this problem, we have modified the behavior of the persister. Now,
upon completion of persistence, ONLY if new files were actually created do we
proceed to introduce a new snapshot. AND, this new snapshot now gets it's own
brand new epoch. BOTH of these are important because since the persister now
also introduces a new epoch, it will see this epoch again in the future AND be
expected to persist it. That is OK (mostly harmless), but we cannot allow it
to form a loop. Checking that new files were actually introduced is what
short-circuits the potential loop. The new epoch introduced by the persister,
if seen again will not have any new segments that actually need persisting to
disk, and the cycle is stopped.
2. The implementation of NumSnapshotsToKeep, and related code to deleted old
snapshots from the root bolt also contains problems. Specifically, the
determination of which snapshots to keep vs delete did not consider which ones
were actually persisted. So, lets say you had set NumSnapshotsToKeep to 3, if
the introducer gets 3 snapshots ahead of the persister, what can happen is that
the three snapshots we choose to keep are all in memory. We now wrongly delete
all of the snapshots from the root bolt. But it gets worse, in this instant of
time, we now have files on disk that nothing in the root bolt points to, so we
also go ahead and delete those files. Those files were still being referenced
by the in-memory snapshots. But, now even if they get persisted to disk, they
simply have references to non-existent files. Opening up one of these indexes
results in lost data (often everything).
To address this problem, we made large change to the way this section of code
operates. First, we now start with a list of all epochs actually persisted in
the root bolt. Second, we set aside NumSnapshotsToKeep of these snapshots to
keep. Third, anything else in the eligibleForRemoval list will be deleted. I
suspect this code is slower and less elegant, but I think it is more correct.
Also, previously NumSnapshotsToKeep defaulted to 0, I have now defaulted it to
1, which feels like saner out-of-the-box behavior (though it's debatable if the
original intent was perhaps instead for "extra" snapshots to keep, but with the
variable named as it is, 1 makes more sense to me)
Other minor changes included in this change:
- Location of 'nextSnapshotEpoch', 'eligibleForRemoval', and
'ineligibleForRemoval' members of Scorch struct were moved into the
paragraph with 'rootLock' to clarify that you must hold the lock to access it.
- TestBatchRaceBug260 was updated to properly Close() the index, which leads to
occasional test failures.
2018-01-02 22:09:55 +01:00
|
|
|
if len(persistedEpochs) <= NumSnapshotsToKeep {
|
|
|
|
// we need to keep everything
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// make a map of epochs to protect from deletion
|
|
|
|
protectedEpochs := make(map[uint64]struct{}, NumSnapshotsToKeep)
|
|
|
|
for _, epoch := range persistedEpochs[0:NumSnapshotsToKeep] {
|
|
|
|
protectedEpochs[epoch] = struct{}{}
|
|
|
|
}
|
|
|
|
|
|
|
|
var epochsToRemove []uint64
|
|
|
|
var newEligible []uint64
|
2017-12-13 23:54:58 +01:00
|
|
|
s.rootLock.Lock()
|
attempt to fix core reference counting issues
Observed problem:
Persisted index state (in root bolt) would contain index snapshots which
pointed to index files that did not exist.
Debugging this uncovered two main problems:
1. At the end of persisting a snapshot, the persister creates a new index
snapshot with the SAME epoch as the current root, only it replaces in-memory
segments with the new disk based ones. This is problematic because reference
counting an index segment triggers "eligible for deletion". And eligible for
deletion is keyed by epoch. So having two separate instances going by the same
epoch is problematic. Specifically, one of them gets to 0 before the other,
and we wrongly conclude it's eligible for deletion, when in fact the "other"
instance with same epoch is actually still in use.
To address this problem, we have modified the behavior of the persister. Now,
upon completion of persistence, ONLY if new files were actually created do we
proceed to introduce a new snapshot. AND, this new snapshot now gets it's own
brand new epoch. BOTH of these are important because since the persister now
also introduces a new epoch, it will see this epoch again in the future AND be
expected to persist it. That is OK (mostly harmless), but we cannot allow it
to form a loop. Checking that new files were actually introduced is what
short-circuits the potential loop. The new epoch introduced by the persister,
if seen again will not have any new segments that actually need persisting to
disk, and the cycle is stopped.
2. The implementation of NumSnapshotsToKeep, and related code to deleted old
snapshots from the root bolt also contains problems. Specifically, the
determination of which snapshots to keep vs delete did not consider which ones
were actually persisted. So, lets say you had set NumSnapshotsToKeep to 3, if
the introducer gets 3 snapshots ahead of the persister, what can happen is that
the three snapshots we choose to keep are all in memory. We now wrongly delete
all of the snapshots from the root bolt. But it gets worse, in this instant of
time, we now have files on disk that nothing in the root bolt points to, so we
also go ahead and delete those files. Those files were still being referenced
by the in-memory snapshots. But, now even if they get persisted to disk, they
simply have references to non-existent files. Opening up one of these indexes
results in lost data (often everything).
To address this problem, we made large change to the way this section of code
operates. First, we now start with a list of all epochs actually persisted in
the root bolt. Second, we set aside NumSnapshotsToKeep of these snapshots to
keep. Third, anything else in the eligibleForRemoval list will be deleted. I
suspect this code is slower and less elegant, but I think it is more correct.
Also, previously NumSnapshotsToKeep defaulted to 0, I have now defaulted it to
1, which feels like saner out-of-the-box behavior (though it's debatable if the
original intent was perhaps instead for "extra" snapshots to keep, but with the
variable named as it is, 1 makes more sense to me)
Other minor changes included in this change:
- Location of 'nextSnapshotEpoch', 'eligibleForRemoval', and
'ineligibleForRemoval' members of Scorch struct were moved into the
paragraph with 'rootLock' to clarify that you must hold the lock to access it.
- TestBatchRaceBug260 was updated to properly Close() the index, which leads to
occasional test failures.
2018-01-02 22:09:55 +01:00
|
|
|
for _, epoch := range s.eligibleForRemoval {
|
|
|
|
if _, ok := protectedEpochs[epoch]; ok {
|
|
|
|
// protected
|
|
|
|
newEligible = append(newEligible, epoch)
|
|
|
|
} else {
|
|
|
|
epochsToRemove = append(epochsToRemove, epoch)
|
|
|
|
}
|
2017-12-13 23:54:58 +01:00
|
|
|
}
|
attempt to fix core reference counting issues
Observed problem:
Persisted index state (in root bolt) would contain index snapshots which
pointed to index files that did not exist.
Debugging this uncovered two main problems:
1. At the end of persisting a snapshot, the persister creates a new index
snapshot with the SAME epoch as the current root, only it replaces in-memory
segments with the new disk based ones. This is problematic because reference
counting an index segment triggers "eligible for deletion". And eligible for
deletion is keyed by epoch. So having two separate instances going by the same
epoch is problematic. Specifically, one of them gets to 0 before the other,
and we wrongly conclude it's eligible for deletion, when in fact the "other"
instance with same epoch is actually still in use.
To address this problem, we have modified the behavior of the persister. Now,
upon completion of persistence, ONLY if new files were actually created do we
proceed to introduce a new snapshot. AND, this new snapshot now gets it's own
brand new epoch. BOTH of these are important because since the persister now
also introduces a new epoch, it will see this epoch again in the future AND be
expected to persist it. That is OK (mostly harmless), but we cannot allow it
to form a loop. Checking that new files were actually introduced is what
short-circuits the potential loop. The new epoch introduced by the persister,
if seen again will not have any new segments that actually need persisting to
disk, and the cycle is stopped.
2. The implementation of NumSnapshotsToKeep, and related code to deleted old
snapshots from the root bolt also contains problems. Specifically, the
determination of which snapshots to keep vs delete did not consider which ones
were actually persisted. So, lets say you had set NumSnapshotsToKeep to 3, if
the introducer gets 3 snapshots ahead of the persister, what can happen is that
the three snapshots we choose to keep are all in memory. We now wrongly delete
all of the snapshots from the root bolt. But it gets worse, in this instant of
time, we now have files on disk that nothing in the root bolt points to, so we
also go ahead and delete those files. Those files were still being referenced
by the in-memory snapshots. But, now even if they get persisted to disk, they
simply have references to non-existent files. Opening up one of these indexes
results in lost data (often everything).
To address this problem, we made large change to the way this section of code
operates. First, we now start with a list of all epochs actually persisted in
the root bolt. Second, we set aside NumSnapshotsToKeep of these snapshots to
keep. Third, anything else in the eligibleForRemoval list will be deleted. I
suspect this code is slower and less elegant, but I think it is more correct.
Also, previously NumSnapshotsToKeep defaulted to 0, I have now defaulted it to
1, which feels like saner out-of-the-box behavior (though it's debatable if the
original intent was perhaps instead for "extra" snapshots to keep, but with the
variable named as it is, 1 makes more sense to me)
Other minor changes included in this change:
- Location of 'nextSnapshotEpoch', 'eligibleForRemoval', and
'ineligibleForRemoval' members of Scorch struct were moved into the
paragraph with 'rootLock' to clarify that you must hold the lock to access it.
- TestBatchRaceBug260 was updated to properly Close() the index, which leads to
occasional test failures.
2018-01-02 22:09:55 +01:00
|
|
|
s.eligibleForRemoval = newEligible
|
2017-12-13 23:54:58 +01:00
|
|
|
s.rootLock.Unlock()
|
|
|
|
|
|
|
|
if len(epochsToRemove) <= 0 {
|
2017-12-14 01:58:36 +01:00
|
|
|
return 0, nil
|
2017-12-13 23:54:58 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
tx, err := s.rootBolt.Begin(true)
|
|
|
|
if err != nil {
|
2017-12-14 01:58:36 +01:00
|
|
|
return 0, err
|
2017-12-13 23:54:58 +01:00
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
if err == nil {
|
|
|
|
err = tx.Commit()
|
|
|
|
} else {
|
|
|
|
_ = tx.Rollback()
|
|
|
|
}
|
2017-12-20 23:43:08 +01:00
|
|
|
if err == nil {
|
|
|
|
err = s.rootBolt.Sync()
|
|
|
|
}
|
2017-12-13 23:54:58 +01:00
|
|
|
}()
|
|
|
|
|
2017-12-20 23:43:08 +01:00
|
|
|
snapshots := tx.Bucket(boltSnapshotsBucket)
|
|
|
|
if snapshots == nil {
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
|
2017-12-13 23:54:58 +01:00
|
|
|
for _, epochToRemove := range epochsToRemove {
|
|
|
|
k := segment.EncodeUvarintAscending(nil, epochToRemove)
|
2017-12-20 23:43:08 +01:00
|
|
|
err = snapshots.DeleteBucket(k)
|
2017-12-13 23:54:58 +01:00
|
|
|
if err == bolt.ErrBucketNotFound {
|
|
|
|
err = nil
|
|
|
|
}
|
2017-12-14 01:58:36 +01:00
|
|
|
if err == nil {
|
|
|
|
numRemoved++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return numRemoved, err
|
|
|
|
}
|
|
|
|
|
2017-12-16 01:26:23 +01:00
|
|
|
func (s *Scorch) maxSegmentIDOnDisk() (uint64, error) {
|
|
|
|
currFileInfos, err := ioutil.ReadDir(s.path)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var rv uint64
|
|
|
|
for _, finfo := range currFileInfos {
|
|
|
|
fname := finfo.Name()
|
|
|
|
if filepath.Ext(fname) == ".zap" {
|
|
|
|
prefix := strings.TrimSuffix(fname, ".zap")
|
|
|
|
id, err2 := strconv.ParseUint(prefix, 16, 64)
|
|
|
|
if err2 != nil {
|
|
|
|
return 0, err2
|
|
|
|
}
|
|
|
|
if id > rv {
|
|
|
|
rv = id
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return rv, err
|
|
|
|
}
|
|
|
|
|
2017-12-14 01:58:36 +01:00
|
|
|
// Removes any *.zap files which aren't listed in the rootBolt.
|
|
|
|
func (s *Scorch) removeOldZapFiles() error {
|
2017-12-14 19:49:33 +01:00
|
|
|
liveFileNames, err := s.loadZapFileNames()
|
2017-12-14 01:58:36 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
currFileInfos, err := ioutil.ReadDir(s.path)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
s.rootLock.RLock()
|
|
|
|
|
2017-12-14 01:58:36 +01:00
|
|
|
for _, finfo := range currFileInfos {
|
|
|
|
fname := finfo.Name()
|
|
|
|
if filepath.Ext(fname) == ".zap" {
|
2017-12-14 19:49:33 +01:00
|
|
|
if _, exists := liveFileNames[fname]; !exists && !s.ineligibleForRemoval[fname] {
|
2017-12-14 01:58:36 +01:00
|
|
|
err := os.Remove(s.path + string(os.PathSeparator) + fname)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("got err removing file: %s, err: %v", fname, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2017-12-13 23:54:58 +01:00
|
|
|
}
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
s.rootLock.RUnlock()
|
|
|
|
|
2017-12-14 01:58:36 +01:00
|
|
|
return nil
|
2017-12-13 23:54:58 +01:00
|
|
|
}
|
|
|
|
|
2018-01-05 16:17:18 +01:00
|
|
|
func (s *Scorch) RootBoltSnapshotEpochs() ([]uint64, error) {
|
attempt to fix core reference counting issues
Observed problem:
Persisted index state (in root bolt) would contain index snapshots which
pointed to index files that did not exist.
Debugging this uncovered two main problems:
1. At the end of persisting a snapshot, the persister creates a new index
snapshot with the SAME epoch as the current root, only it replaces in-memory
segments with the new disk based ones. This is problematic because reference
counting an index segment triggers "eligible for deletion". And eligible for
deletion is keyed by epoch. So having two separate instances going by the same
epoch is problematic. Specifically, one of them gets to 0 before the other,
and we wrongly conclude it's eligible for deletion, when in fact the "other"
instance with same epoch is actually still in use.
To address this problem, we have modified the behavior of the persister. Now,
upon completion of persistence, ONLY if new files were actually created do we
proceed to introduce a new snapshot. AND, this new snapshot now gets it's own
brand new epoch. BOTH of these are important because since the persister now
also introduces a new epoch, it will see this epoch again in the future AND be
expected to persist it. That is OK (mostly harmless), but we cannot allow it
to form a loop. Checking that new files were actually introduced is what
short-circuits the potential loop. The new epoch introduced by the persister,
if seen again will not have any new segments that actually need persisting to
disk, and the cycle is stopped.
2. The implementation of NumSnapshotsToKeep, and related code to deleted old
snapshots from the root bolt also contains problems. Specifically, the
determination of which snapshots to keep vs delete did not consider which ones
were actually persisted. So, lets say you had set NumSnapshotsToKeep to 3, if
the introducer gets 3 snapshots ahead of the persister, what can happen is that
the three snapshots we choose to keep are all in memory. We now wrongly delete
all of the snapshots from the root bolt. But it gets worse, in this instant of
time, we now have files on disk that nothing in the root bolt points to, so we
also go ahead and delete those files. Those files were still being referenced
by the in-memory snapshots. But, now even if they get persisted to disk, they
simply have references to non-existent files. Opening up one of these indexes
results in lost data (often everything).
To address this problem, we made large change to the way this section of code
operates. First, we now start with a list of all epochs actually persisted in
the root bolt. Second, we set aside NumSnapshotsToKeep of these snapshots to
keep. Third, anything else in the eligibleForRemoval list will be deleted. I
suspect this code is slower and less elegant, but I think it is more correct.
Also, previously NumSnapshotsToKeep defaulted to 0, I have now defaulted it to
1, which feels like saner out-of-the-box behavior (though it's debatable if the
original intent was perhaps instead for "extra" snapshots to keep, but with the
variable named as it is, 1 makes more sense to me)
Other minor changes included in this change:
- Location of 'nextSnapshotEpoch', 'eligibleForRemoval', and
'ineligibleForRemoval' members of Scorch struct were moved into the
paragraph with 'rootLock' to clarify that you must hold the lock to access it.
- TestBatchRaceBug260 was updated to properly Close() the index, which leads to
occasional test failures.
2018-01-02 22:09:55 +01:00
|
|
|
var rv []uint64
|
|
|
|
err := s.rootBolt.View(func(tx *bolt.Tx) error {
|
|
|
|
snapshots := tx.Bucket(boltSnapshotsBucket)
|
|
|
|
if snapshots == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
sc := snapshots.Cursor()
|
|
|
|
for sk, _ := sc.Last(); sk != nil; sk, _ = sc.Prev() {
|
|
|
|
_, snapshotEpoch, err := segment.DecodeUvarintAscending(sk)
|
|
|
|
if err != nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
rv = append(rv, snapshotEpoch)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
return rv, err
|
|
|
|
}
|
|
|
|
|
2017-12-14 01:58:36 +01:00
|
|
|
// Returns the *.zap file names that are listed in the rootBolt.
|
2017-12-14 19:49:33 +01:00
|
|
|
func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) {
|
2017-12-14 01:58:36 +01:00
|
|
|
rv := map[string]struct{}{}
|
|
|
|
err := s.rootBolt.View(func(tx *bolt.Tx) error {
|
|
|
|
snapshots := tx.Bucket(boltSnapshotsBucket)
|
|
|
|
if snapshots == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
sc := snapshots.Cursor()
|
|
|
|
for sk, _ := sc.First(); sk != nil; sk, _ = sc.Next() {
|
|
|
|
snapshot := snapshots.Bucket(sk)
|
|
|
|
if snapshot == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
segc := snapshot.Cursor()
|
|
|
|
for segk, _ := segc.First(); segk != nil; segk, _ = segc.Next() {
|
|
|
|
if segk[0] == boltInternalKey[0] {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
segmentBucket := snapshot.Bucket(segk)
|
|
|
|
if segmentBucket == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
pathBytes := segmentBucket.Get(boltPathKey)
|
|
|
|
if pathBytes == nil {
|
|
|
|
continue
|
|
|
|
}
|
2017-12-14 16:27:39 +01:00
|
|
|
pathString := string(pathBytes)
|
|
|
|
rv[string(pathString)] = struct{}{}
|
2017-12-14 01:58:36 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
return rv, err
|
2017-12-14 01:58:36 +01:00
|
|
|
}
|