Merge pull request #794 from steveyen/persister-uses-introducer
scorch persister goes through introducer to affect root
This commit is contained in:
commit
4ebf3f1d44
|
@ -33,6 +33,11 @@ type segmentIntroduction struct {
|
|||
persisted chan error
|
||||
}
|
||||
|
||||
type persistIntroduction struct {
|
||||
persisted map[uint64]segment.Segment
|
||||
applied notificationChan
|
||||
}
|
||||
|
||||
type epochWatcher struct {
|
||||
epoch uint64
|
||||
notifyCh notificationChan
|
||||
|
@ -66,6 +71,9 @@ OUTER:
|
|||
continue OUTER
|
||||
}
|
||||
|
||||
case persist := <-s.persists:
|
||||
s.introducePersist(persist)
|
||||
|
||||
case revertTo := <-s.revertToSnapshots:
|
||||
err := s.revertToSnapshot(revertTo)
|
||||
if err != nil {
|
||||
|
@ -97,32 +105,30 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
|||
atomic.AddUint64(&s.stats.TotIntroduceSegmentBeg, 1)
|
||||
defer atomic.AddUint64(&s.stats.TotIntroduceSegmentEnd, 1)
|
||||
|
||||
// acquire lock
|
||||
s.rootLock.Lock()
|
||||
s.rootLock.RLock()
|
||||
root := s.root
|
||||
s.rootLock.RUnlock()
|
||||
|
||||
nsegs := len(s.root.segment)
|
||||
nsegs := len(root.segment)
|
||||
|
||||
// prepare new index snapshot
|
||||
newSnapshot := &IndexSnapshot{
|
||||
parent: s,
|
||||
segment: make([]*SegmentSnapshot, 0, nsegs+1),
|
||||
offsets: make([]uint64, 0, nsegs+1),
|
||||
internal: make(map[string][]byte, len(s.root.internal)),
|
||||
epoch: s.nextSnapshotEpoch,
|
||||
internal: make(map[string][]byte, len(root.internal)),
|
||||
refs: 1,
|
||||
}
|
||||
s.nextSnapshotEpoch++
|
||||
|
||||
// iterate through current segments
|
||||
var running uint64
|
||||
for i := range s.root.segment {
|
||||
for i := range root.segment {
|
||||
// see if optimistic work included this segment
|
||||
delta, ok := next.obsoletes[s.root.segment[i].id]
|
||||
delta, ok := next.obsoletes[root.segment[i].id]
|
||||
if !ok {
|
||||
var err error
|
||||
delta, err = s.root.segment[i].segment.DocNumbers(next.ids)
|
||||
delta, err = root.segment[i].segment.DocNumbers(next.ids)
|
||||
if err != nil {
|
||||
s.rootLock.Unlock()
|
||||
next.applied <- fmt.Errorf("error computing doc numbers: %v", err)
|
||||
close(next.applied)
|
||||
_ = newSnapshot.DecRef()
|
||||
|
@ -131,24 +137,24 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
|||
}
|
||||
|
||||
newss := &SegmentSnapshot{
|
||||
id: s.root.segment[i].id,
|
||||
segment: s.root.segment[i].segment,
|
||||
cachedDocs: s.root.segment[i].cachedDocs,
|
||||
id: root.segment[i].id,
|
||||
segment: root.segment[i].segment,
|
||||
cachedDocs: root.segment[i].cachedDocs,
|
||||
}
|
||||
|
||||
// apply new obsoletions
|
||||
if s.root.segment[i].deleted == nil {
|
||||
if root.segment[i].deleted == nil {
|
||||
newss.deleted = delta
|
||||
} else {
|
||||
newss.deleted = roaring.Or(s.root.segment[i].deleted, delta)
|
||||
newss.deleted = roaring.Or(root.segment[i].deleted, delta)
|
||||
}
|
||||
|
||||
// check for live size before copying
|
||||
if newss.LiveSize() > 0 {
|
||||
newSnapshot.segment = append(newSnapshot.segment, newss)
|
||||
s.root.segment[i].segment.AddRef()
|
||||
root.segment[i].segment.AddRef()
|
||||
newSnapshot.offsets = append(newSnapshot.offsets, running)
|
||||
running += s.root.segment[i].Count()
|
||||
running += root.segment[i].Count()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -168,7 +174,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
|||
atomic.AddUint64(&s.stats.TotIntroducedSegmentsBatch, 1)
|
||||
}
|
||||
// copy old values
|
||||
for key, oldVal := range s.root.internal {
|
||||
for key, oldVal := range root.internal {
|
||||
newSnapshot.internal[key] = oldVal
|
||||
}
|
||||
// set new values and apply deletes
|
||||
|
@ -179,10 +185,14 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
|||
delete(newSnapshot.internal, key)
|
||||
}
|
||||
}
|
||||
|
||||
s.rootLock.Lock()
|
||||
if next.persisted != nil {
|
||||
s.rootPersisted = append(s.rootPersisted, next.persisted)
|
||||
}
|
||||
// swap in new index snapshot
|
||||
newSnapshot.epoch = s.nextSnapshotEpoch
|
||||
s.nextSnapshotEpoch++
|
||||
rootPrev := s.root
|
||||
s.root = newSnapshot
|
||||
// release lock
|
||||
|
@ -197,34 +207,89 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
||||
atomic.AddUint64(&s.stats.TotIntroduceMergeBeg, 1)
|
||||
defer atomic.AddUint64(&s.stats.TotIntroduceMergeEnd, 1)
|
||||
func (s *Scorch) introducePersist(persist *persistIntroduction) {
|
||||
atomic.AddUint64(&s.stats.TotIntroducePersistBeg, 1)
|
||||
defer atomic.AddUint64(&s.stats.TotIntroducePersistEnd, 1)
|
||||
|
||||
// acquire lock
|
||||
s.rootLock.Lock()
|
||||
s.rootLock.RLock()
|
||||
root := s.root
|
||||
s.rootLock.RUnlock()
|
||||
|
||||
newSnapshot := &IndexSnapshot{
|
||||
newIndexSnapshot := &IndexSnapshot{
|
||||
parent: s,
|
||||
internal: s.root.internal,
|
||||
epoch: s.nextSnapshotEpoch,
|
||||
segment: make([]*SegmentSnapshot, len(root.segment)),
|
||||
offsets: make([]uint64, len(root.offsets)),
|
||||
internal: make(map[string][]byte, len(root.internal)),
|
||||
refs: 1,
|
||||
}
|
||||
s.nextSnapshotEpoch++
|
||||
|
||||
for i, segmentSnapshot := range root.segment {
|
||||
// see if this segment has been replaced
|
||||
if replacement, ok := persist.persisted[segmentSnapshot.id]; ok {
|
||||
newSegmentSnapshot := &SegmentSnapshot{
|
||||
id: segmentSnapshot.id,
|
||||
segment: replacement,
|
||||
deleted: segmentSnapshot.deleted,
|
||||
cachedDocs: segmentSnapshot.cachedDocs,
|
||||
}
|
||||
newIndexSnapshot.segment[i] = newSegmentSnapshot
|
||||
delete(persist.persisted, segmentSnapshot.id)
|
||||
|
||||
// update items persisted incase of a new segment snapshot
|
||||
atomic.AddUint64(&s.stats.TotPersistedItems, newSegmentSnapshot.Count())
|
||||
atomic.AddUint64(&s.stats.TotPersistedSegments, 1)
|
||||
} else {
|
||||
newIndexSnapshot.segment[i] = root.segment[i]
|
||||
newIndexSnapshot.segment[i].segment.AddRef()
|
||||
}
|
||||
newIndexSnapshot.offsets[i] = root.offsets[i]
|
||||
}
|
||||
|
||||
for k, v := range root.internal {
|
||||
newIndexSnapshot.internal[k] = v
|
||||
}
|
||||
|
||||
s.rootLock.Lock()
|
||||
rootPrev := s.root
|
||||
s.root = newIndexSnapshot
|
||||
s.rootLock.Unlock()
|
||||
|
||||
if rootPrev != nil {
|
||||
_ = rootPrev.DecRef()
|
||||
}
|
||||
|
||||
close(persist.applied)
|
||||
}
|
||||
|
||||
func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
||||
atomic.AddUint64(&s.stats.TotIntroduceMergeBeg, 1)
|
||||
defer atomic.AddUint64(&s.stats.TotIntroduceMergeEnd, 1)
|
||||
|
||||
s.rootLock.RLock()
|
||||
root := s.root
|
||||
s.rootLock.RUnlock()
|
||||
|
||||
newSnapshot := &IndexSnapshot{
|
||||
parent: s,
|
||||
internal: root.internal,
|
||||
refs: 1,
|
||||
}
|
||||
|
||||
// iterate through current segments
|
||||
newSegmentDeleted := roaring.NewBitmap()
|
||||
var running uint64
|
||||
for i := range s.root.segment {
|
||||
segmentID := s.root.segment[i].id
|
||||
for i := range root.segment {
|
||||
segmentID := root.segment[i].id
|
||||
if segSnapAtMerge, ok := nextMerge.old[segmentID]; ok {
|
||||
// this segment is going away, see if anything else was deleted since we started the merge
|
||||
if segSnapAtMerge != nil && s.root.segment[i].deleted != nil {
|
||||
if segSnapAtMerge != nil && root.segment[i].deleted != nil {
|
||||
// assume all these deletes are new
|
||||
deletedSince := s.root.segment[i].deleted
|
||||
deletedSince := root.segment[i].deleted
|
||||
// if we already knew about some of them, remove
|
||||
if segSnapAtMerge.deleted != nil {
|
||||
deletedSince = roaring.AndNot(s.root.segment[i].deleted, segSnapAtMerge.deleted)
|
||||
deletedSince = roaring.AndNot(root.segment[i].deleted, segSnapAtMerge.deleted)
|
||||
}
|
||||
deletedSinceItr := deletedSince.Iterator()
|
||||
for deletedSinceItr.HasNext() {
|
||||
|
@ -238,18 +303,17 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
|||
// segments left behind in old map after processing
|
||||
// the root segments would be the obsolete segment set
|
||||
delete(nextMerge.old, segmentID)
|
||||
|
||||
} else if s.root.segment[i].LiveSize() > 0 {
|
||||
} else if root.segment[i].LiveSize() > 0 {
|
||||
// this segment is staying
|
||||
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
|
||||
id: s.root.segment[i].id,
|
||||
segment: s.root.segment[i].segment,
|
||||
deleted: s.root.segment[i].deleted,
|
||||
cachedDocs: s.root.segment[i].cachedDocs,
|
||||
id: root.segment[i].id,
|
||||
segment: root.segment[i].segment,
|
||||
deleted: root.segment[i].deleted,
|
||||
cachedDocs: root.segment[i].cachedDocs,
|
||||
})
|
||||
s.root.segment[i].segment.AddRef()
|
||||
root.segment[i].segment.AddRef()
|
||||
newSnapshot.offsets = append(newSnapshot.offsets, running)
|
||||
running += s.root.segment[i].Count()
|
||||
running += root.segment[i].Count()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -284,7 +348,10 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
|||
|
||||
newSnapshot.AddRef() // 1 ref for the nextMerge.notify response
|
||||
|
||||
// swap in new segment
|
||||
s.rootLock.Lock()
|
||||
// swap in new index snapshot
|
||||
newSnapshot.epoch = s.nextSnapshotEpoch
|
||||
s.nextSnapshotEpoch++
|
||||
rootPrev := s.root
|
||||
s.root = newSnapshot
|
||||
// release lock
|
||||
|
|
|
@ -401,46 +401,23 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
|
|||
}
|
||||
}
|
||||
|
||||
s.rootLock.Lock()
|
||||
newIndexSnapshot := &IndexSnapshot{
|
||||
parent: s,
|
||||
epoch: s.nextSnapshotEpoch,
|
||||
segment: make([]*SegmentSnapshot, len(s.root.segment)),
|
||||
offsets: make([]uint64, len(s.root.offsets)),
|
||||
internal: make(map[string][]byte, len(s.root.internal)),
|
||||
refs: 1,
|
||||
}
|
||||
s.nextSnapshotEpoch++
|
||||
for i, segmentSnapshot := range s.root.segment {
|
||||
// see if this segment has been replaced
|
||||
if replacement, ok := newSegments[segmentSnapshot.id]; ok {
|
||||
newSegmentSnapshot := &SegmentSnapshot{
|
||||
id: segmentSnapshot.id,
|
||||
segment: replacement,
|
||||
deleted: segmentSnapshot.deleted,
|
||||
cachedDocs: segmentSnapshot.cachedDocs,
|
||||
}
|
||||
newIndexSnapshot.segment[i] = newSegmentSnapshot
|
||||
delete(newSegments, segmentSnapshot.id)
|
||||
|
||||
// update items persisted incase of a new segment snapshot
|
||||
atomic.AddUint64(&s.stats.TotPersistedItems, newSegmentSnapshot.Count())
|
||||
atomic.AddUint64(&s.stats.TotPersistedSegments, 1)
|
||||
} else {
|
||||
newIndexSnapshot.segment[i] = s.root.segment[i]
|
||||
newIndexSnapshot.segment[i].segment.AddRef()
|
||||
}
|
||||
newIndexSnapshot.offsets[i] = s.root.offsets[i]
|
||||
}
|
||||
for k, v := range s.root.internal {
|
||||
newIndexSnapshot.internal[k] = v
|
||||
persist := &persistIntroduction{
|
||||
persisted: newSegments,
|
||||
applied: make(notificationChan),
|
||||
}
|
||||
|
||||
rootPrev := s.root
|
||||
s.root = newIndexSnapshot
|
||||
s.rootLock.Unlock()
|
||||
if rootPrev != nil {
|
||||
_ = rootPrev.DecRef()
|
||||
select {
|
||||
case <-s.closeCh:
|
||||
err = ErrClosed
|
||||
return err
|
||||
case s.persists <- persist:
|
||||
}
|
||||
|
||||
select {
|
||||
case <-s.closeCh:
|
||||
err = ErrClosed
|
||||
return err
|
||||
case <-persist.applied:
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,6 +39,8 @@ const Name = "scorch"
|
|||
|
||||
const Version uint8 = 1
|
||||
|
||||
var ErrClosed = fmt.Errorf("scorch closed")
|
||||
|
||||
type Scorch struct {
|
||||
readOnly bool
|
||||
version uint8
|
||||
|
@ -59,6 +61,7 @@ type Scorch struct {
|
|||
|
||||
closeCh chan struct{}
|
||||
introductions chan *segmentIntroduction
|
||||
persists chan *persistIntroduction
|
||||
merges chan *segmentMerge
|
||||
introducerNotifier chan *epochWatcher
|
||||
revertToSnapshots chan *snapshotReversion
|
||||
|
@ -174,6 +177,7 @@ func (s *Scorch) openBolt() error {
|
|||
}
|
||||
|
||||
s.introductions = make(chan *segmentIntroduction)
|
||||
s.persists = make(chan *persistIntroduction)
|
||||
s.merges = make(chan *segmentMerge)
|
||||
s.introducerNotifier = make(chan *epochWatcher, 1)
|
||||
s.revertToSnapshots = make(chan *snapshotReversion)
|
||||
|
|
|
@ -46,6 +46,8 @@ type Stats struct {
|
|||
TotIntroduceLoop uint64
|
||||
TotIntroduceSegmentBeg uint64
|
||||
TotIntroduceSegmentEnd uint64
|
||||
TotIntroducePersistBeg uint64
|
||||
TotIntroducePersistEnd uint64
|
||||
TotIntroduceMergeBeg uint64
|
||||
TotIntroduceMergeEnd uint64
|
||||
TotIntroduceRevertBeg uint64
|
||||
|
|
Loading…
Reference in New Issue