Merge pull request #717 from mschoch/scorch-fix-refcounting
attempt to fix core reference counting issues
This commit is contained in:
commit
6ad679bbb5
|
@ -21,7 +21,6 @@ import (
|
|||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
@ -218,59 +217,63 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
|
|||
}
|
||||
}
|
||||
|
||||
// now try to open all the new snapshots
|
||||
newSegments := make(map[uint64]segment.Segment)
|
||||
for segmentID, path := range newSegmentPaths {
|
||||
newSegments[segmentID], err = zap.Open(path)
|
||||
if err != nil {
|
||||
for _, s := range newSegments {
|
||||
if s != nil {
|
||||
_ = s.Close() // cleanup segments that were successfully opened
|
||||
// only alter the root if we actually persisted a segment
|
||||
// (sometimes its just a new snapshot, possibly with new internal values)
|
||||
if len(newSegmentPaths) > 0 {
|
||||
// now try to open all the new snapshots
|
||||
newSegments := make(map[uint64]segment.Segment)
|
||||
for segmentID, path := range newSegmentPaths {
|
||||
newSegments[segmentID], err = zap.Open(path)
|
||||
if err != nil {
|
||||
for _, s := range newSegments {
|
||||
if s != nil {
|
||||
_ = s.Close() // cleanup segments that were successfully opened
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("error opening new segment at %s, %v", path, err)
|
||||
}
|
||||
return fmt.Errorf("error opening new segment at %s, %v", path, err)
|
||||
}
|
||||
}
|
||||
|
||||
s.rootLock.Lock()
|
||||
newIndexSnapshot := &IndexSnapshot{
|
||||
parent: s,
|
||||
epoch: s.root.epoch,
|
||||
segment: make([]*SegmentSnapshot, len(s.root.segment)),
|
||||
offsets: make([]uint64, len(s.root.offsets)),
|
||||
internal: make(map[string][]byte, len(s.root.internal)),
|
||||
refs: 1,
|
||||
}
|
||||
for i, segmentSnapshot := range s.root.segment {
|
||||
// see if this segment has been replaced
|
||||
if replacement, ok := newSegments[segmentSnapshot.id]; ok {
|
||||
newSegmentSnapshot := &SegmentSnapshot{
|
||||
id: segmentSnapshot.id,
|
||||
segment: replacement,
|
||||
deleted: segmentSnapshot.deleted,
|
||||
cachedDocs: segmentSnapshot.cachedDocs,
|
||||
s.rootLock.Lock()
|
||||
newIndexSnapshot := &IndexSnapshot{
|
||||
parent: s,
|
||||
epoch: s.nextSnapshotEpoch,
|
||||
segment: make([]*SegmentSnapshot, len(s.root.segment)),
|
||||
offsets: make([]uint64, len(s.root.offsets)),
|
||||
internal: make(map[string][]byte, len(s.root.internal)),
|
||||
refs: 1,
|
||||
}
|
||||
s.nextSnapshotEpoch++
|
||||
for i, segmentSnapshot := range s.root.segment {
|
||||
// see if this segment has been replaced
|
||||
if replacement, ok := newSegments[segmentSnapshot.id]; ok {
|
||||
newSegmentSnapshot := &SegmentSnapshot{
|
||||
id: segmentSnapshot.id,
|
||||
segment: replacement,
|
||||
deleted: segmentSnapshot.deleted,
|
||||
cachedDocs: segmentSnapshot.cachedDocs,
|
||||
}
|
||||
newIndexSnapshot.segment[i] = newSegmentSnapshot
|
||||
// update items persisted incase of a new segment snapshot
|
||||
atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count())
|
||||
} else {
|
||||
newIndexSnapshot.segment[i] = s.root.segment[i]
|
||||
newIndexSnapshot.segment[i].segment.AddRef()
|
||||
}
|
||||
newIndexSnapshot.segment[i] = newSegmentSnapshot
|
||||
// update items persisted incase of a new segment snapshot
|
||||
atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count())
|
||||
} else {
|
||||
newIndexSnapshot.segment[i] = s.root.segment[i]
|
||||
newIndexSnapshot.segment[i].segment.AddRef()
|
||||
newIndexSnapshot.offsets[i] = s.root.offsets[i]
|
||||
}
|
||||
for k, v := range s.root.internal {
|
||||
newIndexSnapshot.internal[k] = v
|
||||
}
|
||||
for _, filename := range filenames {
|
||||
delete(s.ineligibleForRemoval, filename)
|
||||
}
|
||||
rootPrev := s.root
|
||||
s.root = newIndexSnapshot
|
||||
s.rootLock.Unlock()
|
||||
if rootPrev != nil {
|
||||
_ = rootPrev.DecRef()
|
||||
}
|
||||
newIndexSnapshot.offsets[i] = s.root.offsets[i]
|
||||
}
|
||||
for k, v := range s.root.internal {
|
||||
newIndexSnapshot.internal[k] = v
|
||||
}
|
||||
for _, filename := range filenames {
|
||||
delete(s.ineligibleForRemoval, filename)
|
||||
}
|
||||
rootPrev := s.root
|
||||
s.root = newIndexSnapshot
|
||||
s.rootLock.Unlock()
|
||||
|
||||
if rootPrev != nil {
|
||||
_ = rootPrev.DecRef()
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -435,19 +438,39 @@ func (s *Scorch) removeOldData() {
|
|||
// NumSnapshotsToKeep represents how many recent, old snapshots to
|
||||
// keep around per Scorch instance. Useful for apps that require
|
||||
// rollback'ability.
|
||||
var NumSnapshotsToKeep int
|
||||
var NumSnapshotsToKeep = 1
|
||||
|
||||
// Removes enough snapshots from the rootBolt so that the
|
||||
// s.eligibleForRemoval stays under the NumSnapshotsToKeep policy.
|
||||
func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) {
|
||||
var epochsToRemove []uint64
|
||||
|
||||
s.rootLock.Lock()
|
||||
if len(s.eligibleForRemoval) > NumSnapshotsToKeep {
|
||||
sort.Sort(uint64Descending(s.eligibleForRemoval))
|
||||
epochsToRemove = append([]uint64(nil), s.eligibleForRemoval[NumSnapshotsToKeep:]...) // Copy.
|
||||
s.eligibleForRemoval = s.eligibleForRemoval[0:NumSnapshotsToKeep]
|
||||
persistedEpochs, err := s.rootBoltSnapshotEpochs()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if len(persistedEpochs) <= NumSnapshotsToKeep {
|
||||
// we need to keep everything
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// make a map of epochs to protect from deletion
|
||||
protectedEpochs := make(map[uint64]struct{}, NumSnapshotsToKeep)
|
||||
for _, epoch := range persistedEpochs[0:NumSnapshotsToKeep] {
|
||||
protectedEpochs[epoch] = struct{}{}
|
||||
}
|
||||
|
||||
var epochsToRemove []uint64
|
||||
var newEligible []uint64
|
||||
s.rootLock.Lock()
|
||||
for _, epoch := range s.eligibleForRemoval {
|
||||
if _, ok := protectedEpochs[epoch]; ok {
|
||||
// protected
|
||||
newEligible = append(newEligible, epoch)
|
||||
} else {
|
||||
epochsToRemove = append(epochsToRemove, epoch)
|
||||
}
|
||||
}
|
||||
s.eligibleForRemoval = newEligible
|
||||
s.rootLock.Unlock()
|
||||
|
||||
if len(epochsToRemove) <= 0 {
|
||||
|
@ -542,6 +565,26 @@ func (s *Scorch) removeOldZapFiles() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Scorch) rootBoltSnapshotEpochs() ([]uint64, error) {
|
||||
var rv []uint64
|
||||
err := s.rootBolt.View(func(tx *bolt.Tx) error {
|
||||
snapshots := tx.Bucket(boltSnapshotsBucket)
|
||||
if snapshots == nil {
|
||||
return nil
|
||||
}
|
||||
sc := snapshots.Cursor()
|
||||
for sk, _ := sc.Last(); sk != nil; sk, _ = sc.Prev() {
|
||||
_, snapshotEpoch, err := segment.DecodeUvarintAscending(sk)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
rv = append(rv, snapshotEpoch)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return rv, err
|
||||
}
|
||||
|
||||
// Returns the *.zap file names that are listed in the rootBolt.
|
||||
func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) {
|
||||
rv := map[string]struct{}{}
|
||||
|
|
|
@ -38,20 +38,22 @@ const Name = "scorch"
|
|||
const Version uint8 = 1
|
||||
|
||||
type Scorch struct {
|
||||
readOnly bool
|
||||
version uint8
|
||||
config map[string]interface{}
|
||||
analysisQueue *index.AnalysisQueue
|
||||
stats *Stats
|
||||
nextSegmentID uint64
|
||||
nextSnapshotEpoch uint64
|
||||
path string
|
||||
readOnly bool
|
||||
version uint8
|
||||
config map[string]interface{}
|
||||
analysisQueue *index.AnalysisQueue
|
||||
stats *Stats
|
||||
nextSegmentID uint64
|
||||
path string
|
||||
|
||||
unsafeBatch bool
|
||||
|
||||
rootLock sync.RWMutex
|
||||
root *IndexSnapshot // holds 1 ref-count on the root
|
||||
rootPersisted []chan error // closed when root is persisted
|
||||
rootLock sync.RWMutex
|
||||
root *IndexSnapshot // holds 1 ref-count on the root
|
||||
rootPersisted []chan error // closed when root is persisted
|
||||
nextSnapshotEpoch uint64
|
||||
eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC.
|
||||
ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet.
|
||||
|
||||
closeCh chan struct{}
|
||||
introductions chan *segmentIntroduction
|
||||
|
@ -62,9 +64,6 @@ type Scorch struct {
|
|||
rootBolt *bolt.DB
|
||||
asyncTasks sync.WaitGroup
|
||||
|
||||
eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC.
|
||||
ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet.
|
||||
|
||||
onEvent func(event Event)
|
||||
}
|
||||
|
||||
|
|
|
@ -1565,6 +1565,12 @@ func TestBatchRaceBug260(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
err := i.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
b := i.NewBatch()
|
||||
err = b.Index("1", 1)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue