2017-12-01 21:42:50 +01:00
|
|
|
// Copyright (c) 2017 Couchbase, Inc.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2017-09-29 18:42:37 +02:00
|
|
|
package scorch
|
|
|
|
|
|
|
|
import (
|
2017-12-07 00:33:47 +01:00
|
|
|
"fmt"
|
2017-12-28 01:06:31 +01:00
|
|
|
"sync/atomic"
|
2017-12-07 00:33:47 +01:00
|
|
|
|
2017-09-29 18:42:37 +02:00
|
|
|
"github.com/RoaringBitmap/roaring"
|
|
|
|
"github.com/blevesearch/bleve/index/scorch/segment"
|
|
|
|
)
|
|
|
|
|
|
|
|
type segmentIntroduction struct {
|
|
|
|
id uint64
|
|
|
|
data segment.Segment
|
|
|
|
obsoletes map[uint64]*roaring.Bitmap
|
|
|
|
ids []string
|
|
|
|
internal map[string][]byte
|
|
|
|
|
2017-12-07 00:33:47 +01:00
|
|
|
applied chan error
|
|
|
|
persisted chan error
|
2017-09-29 18:42:37 +02:00
|
|
|
}
|
|
|
|
|
2018-03-02 23:30:17 +01:00
|
|
|
type persistIntroduction struct {
|
|
|
|
persisted map[uint64]segment.Segment
|
|
|
|
applied notificationChan
|
|
|
|
}
|
|
|
|
|
2017-12-17 17:23:00 +01:00
|
|
|
type epochWatcher struct {
|
|
|
|
epoch uint64
|
|
|
|
notifyCh notificationChan
|
|
|
|
}
|
|
|
|
|
2017-12-15 23:49:33 +01:00
|
|
|
type snapshotReversion struct {
|
2017-12-26 18:37:42 +01:00
|
|
|
snapshot *IndexSnapshot
|
|
|
|
applied chan error
|
|
|
|
persisted chan error
|
2017-12-15 23:49:33 +01:00
|
|
|
}
|
|
|
|
|
2017-09-29 18:42:37 +02:00
|
|
|
func (s *Scorch) mainLoop() {
|
2017-12-17 17:23:00 +01:00
|
|
|
var epochWatchers []*epochWatcher
|
2017-12-07 00:33:47 +01:00
|
|
|
OUTER:
|
2017-09-29 18:42:37 +02:00
|
|
|
for {
|
2018-02-28 20:36:35 +01:00
|
|
|
atomic.AddUint64(&s.stats.TotIntroduceLoop, 1)
|
|
|
|
|
2017-09-29 18:42:37 +02:00
|
|
|
select {
|
|
|
|
case <-s.closeCh:
|
2017-12-07 00:33:47 +01:00
|
|
|
break OUTER
|
2017-09-29 18:42:37 +02:00
|
|
|
|
2017-12-17 17:23:00 +01:00
|
|
|
case epochWatcher := <-s.introducerNotifier:
|
|
|
|
epochWatchers = append(epochWatchers, epochWatcher)
|
2017-09-29 18:42:37 +02:00
|
|
|
|
2017-12-13 19:41:03 +01:00
|
|
|
case nextMerge := <-s.merges:
|
2017-12-13 22:30:39 +01:00
|
|
|
s.introduceMerge(nextMerge)
|
2017-12-13 19:41:03 +01:00
|
|
|
|
2017-12-13 22:30:39 +01:00
|
|
|
case next := <-s.introductions:
|
|
|
|
err := s.introduceSegment(next)
|
|
|
|
if err != nil {
|
|
|
|
continue OUTER
|
2017-12-13 19:41:03 +01:00
|
|
|
}
|
2017-12-15 23:49:33 +01:00
|
|
|
|
2018-03-02 23:30:17 +01:00
|
|
|
case persist := <-s.persists:
|
|
|
|
s.introducePersist(persist)
|
|
|
|
|
2017-12-15 23:49:33 +01:00
|
|
|
case revertTo := <-s.revertToSnapshots:
|
|
|
|
err := s.revertToSnapshot(revertTo)
|
|
|
|
if err != nil {
|
|
|
|
continue OUTER
|
|
|
|
}
|
2017-12-13 22:30:39 +01:00
|
|
|
}
|
2017-12-17 17:23:00 +01:00
|
|
|
|
|
|
|
var epochCurr uint64
|
|
|
|
s.rootLock.RLock()
|
|
|
|
if s.root != nil {
|
|
|
|
epochCurr = s.root.epoch
|
|
|
|
}
|
|
|
|
s.rootLock.RUnlock()
|
|
|
|
var epochWatchersNext []*epochWatcher
|
|
|
|
for _, w := range epochWatchers {
|
|
|
|
if w.epoch < epochCurr {
|
|
|
|
close(w.notifyCh)
|
|
|
|
} else {
|
|
|
|
epochWatchersNext = append(epochWatchersNext, w)
|
|
|
|
}
|
2017-12-13 22:30:39 +01:00
|
|
|
}
|
2017-12-17 17:23:00 +01:00
|
|
|
epochWatchers = epochWatchersNext
|
2017-12-13 22:30:39 +01:00
|
|
|
}
|
2017-12-13 19:41:03 +01:00
|
|
|
|
2017-12-13 22:30:39 +01:00
|
|
|
s.asyncTasks.Done()
|
|
|
|
}
|
2017-12-13 19:41:03 +01:00
|
|
|
|
2017-12-13 22:30:39 +01:00
|
|
|
func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
2018-02-28 20:36:35 +01:00
|
|
|
atomic.AddUint64(&s.stats.TotIntroduceSegmentBeg, 1)
|
|
|
|
defer atomic.AddUint64(&s.stats.TotIntroduceSegmentEnd, 1)
|
|
|
|
|
2018-03-02 23:30:17 +01:00
|
|
|
s.rootLock.RLock()
|
|
|
|
root := s.root
|
|
|
|
s.rootLock.RUnlock()
|
2017-12-13 19:41:03 +01:00
|
|
|
|
2018-03-02 23:30:17 +01:00
|
|
|
nsegs := len(root.segment)
|
2017-12-17 06:21:43 +01:00
|
|
|
|
|
|
|
// prepare new index snapshot
|
2017-12-13 22:30:39 +01:00
|
|
|
newSnapshot := &IndexSnapshot{
|
2017-12-13 23:54:58 +01:00
|
|
|
parent: s,
|
2018-02-16 09:26:57 +01:00
|
|
|
segment: make([]*SegmentSnapshot, 0, nsegs+1),
|
|
|
|
offsets: make([]uint64, 0, nsegs+1),
|
2018-03-02 23:30:17 +01:00
|
|
|
internal: make(map[string][]byte, len(root.internal)),
|
2017-12-13 22:10:44 +01:00
|
|
|
refs: 1,
|
2017-12-13 22:30:39 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// iterate through current segments
|
|
|
|
var running uint64
|
2018-03-02 23:30:17 +01:00
|
|
|
for i := range root.segment {
|
2017-12-13 22:30:39 +01:00
|
|
|
// see if optimistic work included this segment
|
2018-03-02 23:30:17 +01:00
|
|
|
delta, ok := next.obsoletes[root.segment[i].id]
|
2017-12-13 22:30:39 +01:00
|
|
|
if !ok {
|
|
|
|
var err error
|
2018-03-02 23:30:17 +01:00
|
|
|
delta, err = root.segment[i].segment.DocNumbers(next.ids)
|
2017-12-13 22:30:39 +01:00
|
|
|
if err != nil {
|
|
|
|
next.applied <- fmt.Errorf("error computing doc numbers: %v", err)
|
|
|
|
close(next.applied)
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = newSnapshot.DecRef()
|
2017-12-13 22:30:39 +01:00
|
|
|
return err
|
2017-12-13 19:41:03 +01:00
|
|
|
}
|
2017-12-13 22:30:39 +01:00
|
|
|
}
|
2018-02-16 09:26:57 +01:00
|
|
|
|
|
|
|
newss := &SegmentSnapshot{
|
2018-03-02 23:30:17 +01:00
|
|
|
id: root.segment[i].id,
|
|
|
|
segment: root.segment[i].segment,
|
|
|
|
cachedDocs: root.segment[i].cachedDocs,
|
2017-12-13 22:30:39 +01:00
|
|
|
}
|
2018-02-26 23:23:53 +01:00
|
|
|
|
2017-12-13 22:30:39 +01:00
|
|
|
// apply new obsoletions
|
2018-03-02 23:30:17 +01:00
|
|
|
if root.segment[i].deleted == nil {
|
2018-02-16 09:26:57 +01:00
|
|
|
newss.deleted = delta
|
2017-12-13 22:30:39 +01:00
|
|
|
} else {
|
2018-03-02 23:30:17 +01:00
|
|
|
newss.deleted = roaring.Or(root.segment[i].deleted, delta)
|
2018-02-16 09:26:57 +01:00
|
|
|
}
|
2018-02-26 23:23:53 +01:00
|
|
|
|
2018-02-16 09:26:57 +01:00
|
|
|
// check for live size before copying
|
|
|
|
if newss.LiveSize() > 0 {
|
|
|
|
newSnapshot.segment = append(newSnapshot.segment, newss)
|
2018-03-02 23:30:17 +01:00
|
|
|
root.segment[i].segment.AddRef()
|
2018-02-16 09:26:57 +01:00
|
|
|
newSnapshot.offsets = append(newSnapshot.offsets, running)
|
2018-03-19 06:59:58 +01:00
|
|
|
running += newss.segment.Count()
|
2017-12-13 22:30:39 +01:00
|
|
|
}
|
|
|
|
}
|
2018-02-16 09:26:57 +01:00
|
|
|
|
2017-12-17 06:21:43 +01:00
|
|
|
// append new segment, if any, to end of the new index snapshot
|
|
|
|
if next.data != nil {
|
2017-12-28 01:06:31 +01:00
|
|
|
newSegmentSnapshot := &SegmentSnapshot{
|
2017-12-17 06:21:43 +01:00
|
|
|
id: next.id,
|
|
|
|
segment: next.data, // take ownership of next.data's ref-count
|
|
|
|
cachedDocs: &cachedDocs{cache: nil},
|
2017-12-28 01:06:31 +01:00
|
|
|
}
|
|
|
|
newSnapshot.segment = append(newSnapshot.segment, newSegmentSnapshot)
|
2017-12-17 06:21:43 +01:00
|
|
|
newSnapshot.offsets = append(newSnapshot.offsets, running)
|
2017-12-28 01:06:31 +01:00
|
|
|
|
|
|
|
// increment numItemsIntroduced which tracks the number of items
|
|
|
|
// queued for persistence.
|
2018-02-28 11:01:55 +01:00
|
|
|
atomic.AddUint64(&s.stats.TotIntroducedItems, newSegmentSnapshot.Count())
|
2018-02-28 20:36:35 +01:00
|
|
|
atomic.AddUint64(&s.stats.TotIntroducedSegmentsBatch, 1)
|
2017-12-13 22:30:39 +01:00
|
|
|
}
|
|
|
|
// copy old values
|
2018-03-02 23:30:17 +01:00
|
|
|
for key, oldVal := range root.internal {
|
2017-12-13 22:30:39 +01:00
|
|
|
newSnapshot.internal[key] = oldVal
|
|
|
|
}
|
|
|
|
// set new values and apply deletes
|
|
|
|
for key, newVal := range next.internal {
|
|
|
|
if newVal != nil {
|
|
|
|
newSnapshot.internal[key] = newVal
|
|
|
|
} else {
|
|
|
|
delete(newSnapshot.internal, key)
|
|
|
|
}
|
|
|
|
}
|
2018-03-02 23:30:17 +01:00
|
|
|
|
2018-03-15 21:31:04 +01:00
|
|
|
newSnapshot.updateSize()
|
2018-03-02 23:30:17 +01:00
|
|
|
s.rootLock.Lock()
|
2017-12-17 17:23:00 +01:00
|
|
|
if next.persisted != nil {
|
|
|
|
s.rootPersisted = append(s.rootPersisted, next.persisted)
|
|
|
|
}
|
|
|
|
// swap in new index snapshot
|
2018-03-02 23:30:17 +01:00
|
|
|
newSnapshot.epoch = s.nextSnapshotEpoch
|
|
|
|
s.nextSnapshotEpoch++
|
2017-12-13 22:10:44 +01:00
|
|
|
rootPrev := s.root
|
2017-12-13 22:30:39 +01:00
|
|
|
s.root = newSnapshot
|
|
|
|
// release lock
|
|
|
|
s.rootLock.Unlock()
|
2017-12-13 22:10:44 +01:00
|
|
|
|
|
|
|
if rootPrev != nil {
|
|
|
|
_ = rootPrev.DecRef()
|
|
|
|
}
|
|
|
|
|
2017-12-13 22:30:39 +01:00
|
|
|
close(next.applied)
|
2017-09-29 18:42:37 +02:00
|
|
|
|
2017-12-13 22:30:39 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-03-02 23:30:17 +01:00
|
|
|
func (s *Scorch) introducePersist(persist *persistIntroduction) {
|
|
|
|
atomic.AddUint64(&s.stats.TotIntroducePersistBeg, 1)
|
|
|
|
defer atomic.AddUint64(&s.stats.TotIntroducePersistEnd, 1)
|
|
|
|
|
|
|
|
s.rootLock.RLock()
|
|
|
|
root := s.root
|
|
|
|
s.rootLock.RUnlock()
|
|
|
|
|
|
|
|
newIndexSnapshot := &IndexSnapshot{
|
|
|
|
parent: s,
|
|
|
|
epoch: s.nextSnapshotEpoch,
|
|
|
|
segment: make([]*SegmentSnapshot, len(root.segment)),
|
|
|
|
offsets: make([]uint64, len(root.offsets)),
|
|
|
|
internal: make(map[string][]byte, len(root.internal)),
|
|
|
|
refs: 1,
|
|
|
|
}
|
|
|
|
s.nextSnapshotEpoch++
|
|
|
|
|
|
|
|
for i, segmentSnapshot := range root.segment {
|
|
|
|
// see if this segment has been replaced
|
|
|
|
if replacement, ok := persist.persisted[segmentSnapshot.id]; ok {
|
|
|
|
newSegmentSnapshot := &SegmentSnapshot{
|
|
|
|
id: segmentSnapshot.id,
|
|
|
|
segment: replacement,
|
|
|
|
deleted: segmentSnapshot.deleted,
|
|
|
|
cachedDocs: segmentSnapshot.cachedDocs,
|
|
|
|
}
|
|
|
|
newIndexSnapshot.segment[i] = newSegmentSnapshot
|
|
|
|
delete(persist.persisted, segmentSnapshot.id)
|
|
|
|
|
|
|
|
// update items persisted incase of a new segment snapshot
|
|
|
|
atomic.AddUint64(&s.stats.TotPersistedItems, newSegmentSnapshot.Count())
|
|
|
|
atomic.AddUint64(&s.stats.TotPersistedSegments, 1)
|
|
|
|
} else {
|
|
|
|
newIndexSnapshot.segment[i] = root.segment[i]
|
|
|
|
newIndexSnapshot.segment[i].segment.AddRef()
|
|
|
|
}
|
|
|
|
newIndexSnapshot.offsets[i] = root.offsets[i]
|
|
|
|
}
|
|
|
|
|
|
|
|
for k, v := range root.internal {
|
|
|
|
newIndexSnapshot.internal[k] = v
|
|
|
|
}
|
|
|
|
|
2018-03-15 21:31:04 +01:00
|
|
|
newIndexSnapshot.updateSize()
|
2018-03-02 23:30:17 +01:00
|
|
|
s.rootLock.Lock()
|
|
|
|
rootPrev := s.root
|
|
|
|
s.root = newIndexSnapshot
|
|
|
|
s.rootLock.Unlock()
|
|
|
|
|
|
|
|
if rootPrev != nil {
|
|
|
|
_ = rootPrev.DecRef()
|
|
|
|
}
|
|
|
|
|
|
|
|
close(persist.applied)
|
|
|
|
}
|
|
|
|
|
2017-12-13 22:30:39 +01:00
|
|
|
func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
2018-02-28 20:36:35 +01:00
|
|
|
atomic.AddUint64(&s.stats.TotIntroduceMergeBeg, 1)
|
|
|
|
defer atomic.AddUint64(&s.stats.TotIntroduceMergeEnd, 1)
|
|
|
|
|
2018-03-02 23:30:17 +01:00
|
|
|
s.rootLock.RLock()
|
|
|
|
root := s.root
|
|
|
|
s.rootLock.RUnlock()
|
2017-12-13 22:30:39 +01:00
|
|
|
|
|
|
|
newSnapshot := &IndexSnapshot{
|
2017-12-13 23:54:58 +01:00
|
|
|
parent: s,
|
2018-03-02 23:30:17 +01:00
|
|
|
internal: root.internal,
|
2017-12-13 22:10:44 +01:00
|
|
|
refs: 1,
|
2017-12-13 22:30:39 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// iterate through current segments
|
|
|
|
newSegmentDeleted := roaring.NewBitmap()
|
|
|
|
var running uint64
|
2018-03-02 23:30:17 +01:00
|
|
|
for i := range root.segment {
|
|
|
|
segmentID := root.segment[i].id
|
2017-12-13 22:30:39 +01:00
|
|
|
if segSnapAtMerge, ok := nextMerge.old[segmentID]; ok {
|
|
|
|
// this segment is going away, see if anything else was deleted since we started the merge
|
2018-03-02 23:30:17 +01:00
|
|
|
if segSnapAtMerge != nil && root.segment[i].deleted != nil {
|
2017-12-13 22:30:39 +01:00
|
|
|
// assume all these deletes are new
|
2018-03-02 23:30:17 +01:00
|
|
|
deletedSince := root.segment[i].deleted
|
2017-12-13 22:30:39 +01:00
|
|
|
// if we already knew about some of them, remove
|
|
|
|
if segSnapAtMerge.deleted != nil {
|
2018-03-02 23:30:17 +01:00
|
|
|
deletedSince = roaring.AndNot(root.segment[i].deleted, segSnapAtMerge.deleted)
|
2017-12-13 22:30:39 +01:00
|
|
|
}
|
|
|
|
deletedSinceItr := deletedSince.Iterator()
|
|
|
|
for deletedSinceItr.HasNext() {
|
|
|
|
oldDocNum := deletedSinceItr.Next()
|
|
|
|
newDocNum := nextMerge.oldNewDocNums[segmentID][oldDocNum]
|
|
|
|
newSegmentDeleted.Add(uint32(newDocNum))
|
2017-09-29 18:42:37 +02:00
|
|
|
}
|
|
|
|
}
|
2018-02-16 09:26:57 +01:00
|
|
|
// clean up the old segment map to figure out the
|
|
|
|
// obsolete segments wrt root in meantime, whatever
|
|
|
|
// segments left behind in old map after processing
|
|
|
|
// the root segments would be the obsolete segment set
|
|
|
|
delete(nextMerge.old, segmentID)
|
2018-03-02 23:30:17 +01:00
|
|
|
} else if root.segment[i].LiveSize() > 0 {
|
2017-12-13 22:30:39 +01:00
|
|
|
// this segment is staying
|
|
|
|
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
|
2018-03-02 23:30:17 +01:00
|
|
|
id: root.segment[i].id,
|
|
|
|
segment: root.segment[i].segment,
|
|
|
|
deleted: root.segment[i].deleted,
|
|
|
|
cachedDocs: root.segment[i].cachedDocs,
|
2017-12-13 22:30:39 +01:00
|
|
|
})
|
2018-03-02 23:30:17 +01:00
|
|
|
root.segment[i].segment.AddRef()
|
2017-12-13 22:30:39 +01:00
|
|
|
newSnapshot.offsets = append(newSnapshot.offsets, running)
|
2018-03-19 06:59:58 +01:00
|
|
|
running += root.segment[i].segment.Count()
|
2017-09-29 18:42:37 +02:00
|
|
|
}
|
|
|
|
}
|
2017-12-07 00:33:47 +01:00
|
|
|
|
2018-02-16 09:26:57 +01:00
|
|
|
// before the newMerge introduction, need to clean the newly
|
|
|
|
// merged segment wrt the current root segments, hence
|
|
|
|
// applying the obsolete segment contents to newly merged segment
|
|
|
|
for segID, ss := range nextMerge.old {
|
|
|
|
obsoleted := ss.DocNumbersLive()
|
|
|
|
if obsoleted != nil {
|
|
|
|
obsoletedIter := obsoleted.Iterator()
|
|
|
|
for obsoletedIter.HasNext() {
|
|
|
|
oldDocNum := obsoletedIter.Next()
|
|
|
|
newDocNum := nextMerge.oldNewDocNums[segID][oldDocNum]
|
|
|
|
newSegmentDeleted.Add(uint32(newDocNum))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2018-02-26 23:23:53 +01:00
|
|
|
// In case where all the docs in the newly merged segment getting
|
2018-02-16 09:26:57 +01:00
|
|
|
// deleted by the time we reach here, can skip the introduction.
|
|
|
|
if nextMerge.new != nil &&
|
|
|
|
nextMerge.new.Count() > newSegmentDeleted.GetCardinality() {
|
2018-02-12 17:24:33 +01:00
|
|
|
// put new segment at end
|
|
|
|
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
|
|
|
|
id: nextMerge.id,
|
|
|
|
segment: nextMerge.new, // take ownership for nextMerge.new's ref-count
|
|
|
|
deleted: newSegmentDeleted,
|
|
|
|
cachedDocs: &cachedDocs{cache: nil},
|
|
|
|
})
|
|
|
|
newSnapshot.offsets = append(newSnapshot.offsets, running)
|
2018-02-28 20:36:35 +01:00
|
|
|
atomic.AddUint64(&s.stats.TotIntroducedSegmentsMerge, 1)
|
2018-02-12 17:24:33 +01:00
|
|
|
}
|
2017-12-13 22:30:39 +01:00
|
|
|
|
2018-02-14 23:57:46 +01:00
|
|
|
newSnapshot.AddRef() // 1 ref for the nextMerge.notify response
|
|
|
|
|
2018-03-15 21:31:04 +01:00
|
|
|
newSnapshot.updateSize()
|
2018-03-02 23:30:17 +01:00
|
|
|
s.rootLock.Lock()
|
|
|
|
// swap in new index snapshot
|
|
|
|
newSnapshot.epoch = s.nextSnapshotEpoch
|
|
|
|
s.nextSnapshotEpoch++
|
2017-12-13 22:10:44 +01:00
|
|
|
rootPrev := s.root
|
2017-12-13 22:30:39 +01:00
|
|
|
s.root = newSnapshot
|
|
|
|
// release lock
|
|
|
|
s.rootLock.Unlock()
|
|
|
|
|
2017-12-13 22:10:44 +01:00
|
|
|
if rootPrev != nil {
|
|
|
|
_ = rootPrev.DecRef()
|
|
|
|
}
|
|
|
|
|
2018-02-14 23:57:46 +01:00
|
|
|
// notify requester that we incorporated this
|
|
|
|
nextMerge.notify <- newSnapshot
|
2017-12-13 22:30:39 +01:00
|
|
|
close(nextMerge.notify)
|
2017-09-29 18:42:37 +02:00
|
|
|
}
|
2017-12-15 23:49:33 +01:00
|
|
|
|
|
|
|
func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error {
|
2018-02-28 20:36:35 +01:00
|
|
|
atomic.AddUint64(&s.stats.TotIntroduceRevertBeg, 1)
|
|
|
|
defer atomic.AddUint64(&s.stats.TotIntroduceRevertEnd, 1)
|
|
|
|
|
2017-12-15 23:49:33 +01:00
|
|
|
if revertTo.snapshot == nil {
|
|
|
|
err := fmt.Errorf("Cannot revert to a nil snapshot")
|
|
|
|
revertTo.applied <- err
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// acquire lock
|
|
|
|
s.rootLock.Lock()
|
|
|
|
|
|
|
|
// prepare a new index snapshot, based on next snapshot
|
|
|
|
newSnapshot := &IndexSnapshot{
|
|
|
|
parent: s,
|
|
|
|
segment: make([]*SegmentSnapshot, len(revertTo.snapshot.segment)),
|
|
|
|
offsets: revertTo.snapshot.offsets,
|
|
|
|
internal: revertTo.snapshot.internal,
|
|
|
|
epoch: s.nextSnapshotEpoch,
|
|
|
|
refs: 1,
|
|
|
|
}
|
|
|
|
s.nextSnapshotEpoch++
|
|
|
|
|
|
|
|
// iterate through segments
|
|
|
|
for i, segmentSnapshot := range revertTo.snapshot.segment {
|
|
|
|
newSnapshot.segment[i] = &SegmentSnapshot{
|
|
|
|
id: segmentSnapshot.id,
|
|
|
|
segment: segmentSnapshot.segment,
|
|
|
|
deleted: segmentSnapshot.deleted,
|
|
|
|
cachedDocs: segmentSnapshot.cachedDocs,
|
|
|
|
}
|
2018-01-04 00:26:21 +01:00
|
|
|
newSnapshot.segment[i].segment.AddRef()
|
|
|
|
|
|
|
|
// remove segment from ineligibleForRemoval map
|
|
|
|
filename := zapFileName(segmentSnapshot.id)
|
|
|
|
delete(s.ineligibleForRemoval, filename)
|
2017-12-15 23:49:33 +01:00
|
|
|
}
|
|
|
|
|
2017-12-26 18:37:42 +01:00
|
|
|
if revertTo.persisted != nil {
|
|
|
|
s.rootPersisted = append(s.rootPersisted, revertTo.persisted)
|
|
|
|
}
|
|
|
|
|
2018-03-15 21:31:04 +01:00
|
|
|
newSnapshot.updateSize()
|
2017-12-15 23:49:33 +01:00
|
|
|
// swap in new snapshot
|
|
|
|
rootPrev := s.root
|
|
|
|
s.root = newSnapshot
|
|
|
|
// release lock
|
|
|
|
s.rootLock.Unlock()
|
|
|
|
|
|
|
|
if rootPrev != nil {
|
|
|
|
_ = rootPrev.DecRef()
|
|
|
|
}
|
|
|
|
|
|
|
|
close(revertTo.applied)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|