diff --git a/index/scorch/persister.go b/index/scorch/persister.go index b54b2013..acf241eb 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -21,7 +21,6 @@ import ( "log" "os" "path/filepath" - "sort" "strconv" "strings" "sync/atomic" @@ -218,59 +217,63 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { } } - // now try to open all the new snapshots - newSegments := make(map[uint64]segment.Segment) - for segmentID, path := range newSegmentPaths { - newSegments[segmentID], err = zap.Open(path) - if err != nil { - for _, s := range newSegments { - if s != nil { - _ = s.Close() // cleanup segments that were successfully opened + // only alter the root if we actually persisted a segment + // (sometimes its just a new snapshot, possibly with new internal values) + if len(newSegmentPaths) > 0 { + // now try to open all the new snapshots + newSegments := make(map[uint64]segment.Segment) + for segmentID, path := range newSegmentPaths { + newSegments[segmentID], err = zap.Open(path) + if err != nil { + for _, s := range newSegments { + if s != nil { + _ = s.Close() // cleanup segments that were successfully opened + } } + return fmt.Errorf("error opening new segment at %s, %v", path, err) } - return fmt.Errorf("error opening new segment at %s, %v", path, err) } - } - s.rootLock.Lock() - newIndexSnapshot := &IndexSnapshot{ - parent: s, - epoch: s.root.epoch, - segment: make([]*SegmentSnapshot, len(s.root.segment)), - offsets: make([]uint64, len(s.root.offsets)), - internal: make(map[string][]byte, len(s.root.internal)), - refs: 1, - } - for i, segmentSnapshot := range s.root.segment { - // see if this segment has been replaced - if replacement, ok := newSegments[segmentSnapshot.id]; ok { - newSegmentSnapshot := &SegmentSnapshot{ - id: segmentSnapshot.id, - segment: replacement, - deleted: segmentSnapshot.deleted, - cachedDocs: segmentSnapshot.cachedDocs, + s.rootLock.Lock() + newIndexSnapshot := &IndexSnapshot{ + parent: s, + epoch: s.nextSnapshotEpoch, + segment: make([]*SegmentSnapshot, len(s.root.segment)), + offsets: make([]uint64, len(s.root.offsets)), + internal: make(map[string][]byte, len(s.root.internal)), + refs: 1, + } + s.nextSnapshotEpoch++ + for i, segmentSnapshot := range s.root.segment { + // see if this segment has been replaced + if replacement, ok := newSegments[segmentSnapshot.id]; ok { + newSegmentSnapshot := &SegmentSnapshot{ + id: segmentSnapshot.id, + segment: replacement, + deleted: segmentSnapshot.deleted, + cachedDocs: segmentSnapshot.cachedDocs, + } + newIndexSnapshot.segment[i] = newSegmentSnapshot + // update items persisted incase of a new segment snapshot + atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count()) + } else { + newIndexSnapshot.segment[i] = s.root.segment[i] + newIndexSnapshot.segment[i].segment.AddRef() } - newIndexSnapshot.segment[i] = newSegmentSnapshot - // update items persisted incase of a new segment snapshot - atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count()) - } else { - newIndexSnapshot.segment[i] = s.root.segment[i] - newIndexSnapshot.segment[i].segment.AddRef() + newIndexSnapshot.offsets[i] = s.root.offsets[i] + } + for k, v := range s.root.internal { + newIndexSnapshot.internal[k] = v + } + for _, filename := range filenames { + delete(s.ineligibleForRemoval, filename) + } + rootPrev := s.root + s.root = newIndexSnapshot + s.rootLock.Unlock() + if rootPrev != nil { + _ = rootPrev.DecRef() } - newIndexSnapshot.offsets[i] = s.root.offsets[i] - } - for k, v := range s.root.internal { - newIndexSnapshot.internal[k] = v - } - for _, filename := range filenames { - delete(s.ineligibleForRemoval, filename) - } - rootPrev := s.root - s.root = newIndexSnapshot - s.rootLock.Unlock() - - if rootPrev != nil { - _ = rootPrev.DecRef() } return nil @@ -435,19 +438,39 @@ func (s *Scorch) removeOldData() { // NumSnapshotsToKeep represents how many recent, old snapshots to // keep around per Scorch instance. Useful for apps that require // rollback'ability. -var NumSnapshotsToKeep int +var NumSnapshotsToKeep = 1 // Removes enough snapshots from the rootBolt so that the // s.eligibleForRemoval stays under the NumSnapshotsToKeep policy. func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) { - var epochsToRemove []uint64 - - s.rootLock.Lock() - if len(s.eligibleForRemoval) > NumSnapshotsToKeep { - sort.Sort(uint64Descending(s.eligibleForRemoval)) - epochsToRemove = append([]uint64(nil), s.eligibleForRemoval[NumSnapshotsToKeep:]...) // Copy. - s.eligibleForRemoval = s.eligibleForRemoval[0:NumSnapshotsToKeep] + persistedEpochs, err := s.rootBoltSnapshotEpochs() + if err != nil { + return 0, err } + + if len(persistedEpochs) <= NumSnapshotsToKeep { + // we need to keep everything + return 0, nil + } + + // make a map of epochs to protect from deletion + protectedEpochs := make(map[uint64]struct{}, NumSnapshotsToKeep) + for _, epoch := range persistedEpochs[0:NumSnapshotsToKeep] { + protectedEpochs[epoch] = struct{}{} + } + + var epochsToRemove []uint64 + var newEligible []uint64 + s.rootLock.Lock() + for _, epoch := range s.eligibleForRemoval { + if _, ok := protectedEpochs[epoch]; ok { + // protected + newEligible = append(newEligible, epoch) + } else { + epochsToRemove = append(epochsToRemove, epoch) + } + } + s.eligibleForRemoval = newEligible s.rootLock.Unlock() if len(epochsToRemove) <= 0 { @@ -542,6 +565,26 @@ func (s *Scorch) removeOldZapFiles() error { return nil } +func (s *Scorch) rootBoltSnapshotEpochs() ([]uint64, error) { + var rv []uint64 + err := s.rootBolt.View(func(tx *bolt.Tx) error { + snapshots := tx.Bucket(boltSnapshotsBucket) + if snapshots == nil { + return nil + } + sc := snapshots.Cursor() + for sk, _ := sc.Last(); sk != nil; sk, _ = sc.Prev() { + _, snapshotEpoch, err := segment.DecodeUvarintAscending(sk) + if err != nil { + continue + } + rv = append(rv, snapshotEpoch) + } + return nil + }) + return rv, err +} + // Returns the *.zap file names that are listed in the rootBolt. func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) { rv := map[string]struct{}{} diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 94cbb3e6..e03cdeee 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -38,20 +38,22 @@ const Name = "scorch" const Version uint8 = 1 type Scorch struct { - readOnly bool - version uint8 - config map[string]interface{} - analysisQueue *index.AnalysisQueue - stats *Stats - nextSegmentID uint64 - nextSnapshotEpoch uint64 - path string + readOnly bool + version uint8 + config map[string]interface{} + analysisQueue *index.AnalysisQueue + stats *Stats + nextSegmentID uint64 + path string unsafeBatch bool - rootLock sync.RWMutex - root *IndexSnapshot // holds 1 ref-count on the root - rootPersisted []chan error // closed when root is persisted + rootLock sync.RWMutex + root *IndexSnapshot // holds 1 ref-count on the root + rootPersisted []chan error // closed when root is persisted + nextSnapshotEpoch uint64 + eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC. + ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet. closeCh chan struct{} introductions chan *segmentIntroduction @@ -62,9 +64,6 @@ type Scorch struct { rootBolt *bolt.DB asyncTasks sync.WaitGroup - eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC. - ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet. - onEvent func(event Event) } diff --git a/index_test.go b/index_test.go index 03ef7192..762e3838 100644 --- a/index_test.go +++ b/index_test.go @@ -1565,6 +1565,12 @@ func TestBatchRaceBug260(t *testing.T) { if err != nil { t.Fatal(err) } + defer func() { + err := i.Close() + if err != nil { + t.Fatal(err) + } + }() b := i.NewBatch() err = b.Index("1", 1) if err != nil {