0
0
Fork 0

scorch persister goes through introducer to affect root

This change allows the introducer to become the only goroutine to
modify the root, which in turn allows the introducer to greatly reduce
its root lock holding surface area.
This commit is contained in:
Steve Yen 2018-03-02 14:30:17 -08:00
parent 33641ef9d3
commit a5253bfe2b
4 changed files with 127 additions and 77 deletions

View File

@ -33,6 +33,11 @@ type segmentIntroduction struct {
persisted chan error
}
type persistIntroduction struct {
persisted map[uint64]segment.Segment
applied notificationChan
}
type epochWatcher struct {
epoch uint64
notifyCh notificationChan
@ -66,6 +71,9 @@ OUTER:
continue OUTER
}
case persist := <-s.persists:
s.introducePersist(persist)
case revertTo := <-s.revertToSnapshots:
err := s.revertToSnapshot(revertTo)
if err != nil {
@ -97,32 +105,30 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
atomic.AddUint64(&s.stats.TotIntroduceSegmentBeg, 1)
defer atomic.AddUint64(&s.stats.TotIntroduceSegmentEnd, 1)
// acquire lock
s.rootLock.Lock()
s.rootLock.RLock()
root := s.root
s.rootLock.RUnlock()
nsegs := len(s.root.segment)
nsegs := len(root.segment)
// prepare new index snapshot
newSnapshot := &IndexSnapshot{
parent: s,
segment: make([]*SegmentSnapshot, 0, nsegs+1),
offsets: make([]uint64, 0, nsegs+1),
internal: make(map[string][]byte, len(s.root.internal)),
epoch: s.nextSnapshotEpoch,
internal: make(map[string][]byte, len(root.internal)),
refs: 1,
}
s.nextSnapshotEpoch++
// iterate through current segments
var running uint64
for i := range s.root.segment {
for i := range root.segment {
// see if optimistic work included this segment
delta, ok := next.obsoletes[s.root.segment[i].id]
delta, ok := next.obsoletes[root.segment[i].id]
if !ok {
var err error
delta, err = s.root.segment[i].segment.DocNumbers(next.ids)
delta, err = root.segment[i].segment.DocNumbers(next.ids)
if err != nil {
s.rootLock.Unlock()
next.applied <- fmt.Errorf("error computing doc numbers: %v", err)
close(next.applied)
_ = newSnapshot.DecRef()
@ -131,24 +137,24 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
}
newss := &SegmentSnapshot{
id: s.root.segment[i].id,
segment: s.root.segment[i].segment,
cachedDocs: s.root.segment[i].cachedDocs,
id: root.segment[i].id,
segment: root.segment[i].segment,
cachedDocs: root.segment[i].cachedDocs,
}
// apply new obsoletions
if s.root.segment[i].deleted == nil {
if root.segment[i].deleted == nil {
newss.deleted = delta
} else {
newss.deleted = roaring.Or(s.root.segment[i].deleted, delta)
newss.deleted = roaring.Or(root.segment[i].deleted, delta)
}
// check for live size before copying
if newss.LiveSize() > 0 {
newSnapshot.segment = append(newSnapshot.segment, newss)
s.root.segment[i].segment.AddRef()
root.segment[i].segment.AddRef()
newSnapshot.offsets = append(newSnapshot.offsets, running)
running += s.root.segment[i].Count()
running += root.segment[i].Count()
}
}
@ -168,7 +174,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
atomic.AddUint64(&s.stats.TotIntroducedSegmentsBatch, 1)
}
// copy old values
for key, oldVal := range s.root.internal {
for key, oldVal := range root.internal {
newSnapshot.internal[key] = oldVal
}
// set new values and apply deletes
@ -179,10 +185,14 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
delete(newSnapshot.internal, key)
}
}
s.rootLock.Lock()
if next.persisted != nil {
s.rootPersisted = append(s.rootPersisted, next.persisted)
}
// swap in new index snapshot
newSnapshot.epoch = s.nextSnapshotEpoch
s.nextSnapshotEpoch++
rootPrev := s.root
s.root = newSnapshot
// release lock
@ -197,34 +207,89 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
return nil
}
func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
atomic.AddUint64(&s.stats.TotIntroduceMergeBeg, 1)
defer atomic.AddUint64(&s.stats.TotIntroduceMergeEnd, 1)
func (s *Scorch) introducePersist(persist *persistIntroduction) {
atomic.AddUint64(&s.stats.TotIntroducePersistBeg, 1)
defer atomic.AddUint64(&s.stats.TotIntroducePersistEnd, 1)
// acquire lock
s.rootLock.Lock()
s.rootLock.RLock()
root := s.root
s.rootLock.RUnlock()
newSnapshot := &IndexSnapshot{
newIndexSnapshot := &IndexSnapshot{
parent: s,
internal: s.root.internal,
epoch: s.nextSnapshotEpoch,
segment: make([]*SegmentSnapshot, len(root.segment)),
offsets: make([]uint64, len(root.offsets)),
internal: make(map[string][]byte, len(root.internal)),
refs: 1,
}
s.nextSnapshotEpoch++
for i, segmentSnapshot := range root.segment {
// see if this segment has been replaced
if replacement, ok := persist.persisted[segmentSnapshot.id]; ok {
newSegmentSnapshot := &SegmentSnapshot{
id: segmentSnapshot.id,
segment: replacement,
deleted: segmentSnapshot.deleted,
cachedDocs: segmentSnapshot.cachedDocs,
}
newIndexSnapshot.segment[i] = newSegmentSnapshot
delete(persist.persisted, segmentSnapshot.id)
// update items persisted incase of a new segment snapshot
atomic.AddUint64(&s.stats.TotPersistedItems, newSegmentSnapshot.Count())
atomic.AddUint64(&s.stats.TotPersistedSegments, 1)
} else {
newIndexSnapshot.segment[i] = root.segment[i]
newIndexSnapshot.segment[i].segment.AddRef()
}
newIndexSnapshot.offsets[i] = root.offsets[i]
}
for k, v := range root.internal {
newIndexSnapshot.internal[k] = v
}
s.rootLock.Lock()
rootPrev := s.root
s.root = newIndexSnapshot
s.rootLock.Unlock()
if rootPrev != nil {
_ = rootPrev.DecRef()
}
close(persist.applied)
}
func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
atomic.AddUint64(&s.stats.TotIntroduceMergeBeg, 1)
defer atomic.AddUint64(&s.stats.TotIntroduceMergeEnd, 1)
s.rootLock.RLock()
root := s.root
s.rootLock.RUnlock()
newSnapshot := &IndexSnapshot{
parent: s,
internal: root.internal,
refs: 1,
}
// iterate through current segments
newSegmentDeleted := roaring.NewBitmap()
var running uint64
for i := range s.root.segment {
segmentID := s.root.segment[i].id
for i := range root.segment {
segmentID := root.segment[i].id
if segSnapAtMerge, ok := nextMerge.old[segmentID]; ok {
// this segment is going away, see if anything else was deleted since we started the merge
if segSnapAtMerge != nil && s.root.segment[i].deleted != nil {
if segSnapAtMerge != nil && root.segment[i].deleted != nil {
// assume all these deletes are new
deletedSince := s.root.segment[i].deleted
deletedSince := root.segment[i].deleted
// if we already knew about some of them, remove
if segSnapAtMerge.deleted != nil {
deletedSince = roaring.AndNot(s.root.segment[i].deleted, segSnapAtMerge.deleted)
deletedSince = roaring.AndNot(root.segment[i].deleted, segSnapAtMerge.deleted)
}
deletedSinceItr := deletedSince.Iterator()
for deletedSinceItr.HasNext() {
@ -238,18 +303,17 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
// segments left behind in old map after processing
// the root segments would be the obsolete segment set
delete(nextMerge.old, segmentID)
} else if s.root.segment[i].LiveSize() > 0 {
} else if root.segment[i].LiveSize() > 0 {
// this segment is staying
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
id: s.root.segment[i].id,
segment: s.root.segment[i].segment,
deleted: s.root.segment[i].deleted,
cachedDocs: s.root.segment[i].cachedDocs,
id: root.segment[i].id,
segment: root.segment[i].segment,
deleted: root.segment[i].deleted,
cachedDocs: root.segment[i].cachedDocs,
})
s.root.segment[i].segment.AddRef()
root.segment[i].segment.AddRef()
newSnapshot.offsets = append(newSnapshot.offsets, running)
running += s.root.segment[i].Count()
running += root.segment[i].Count()
}
}
@ -284,7 +348,10 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
newSnapshot.AddRef() // 1 ref for the nextMerge.notify response
// swap in new segment
s.rootLock.Lock()
// swap in new index snapshot
newSnapshot.epoch = s.nextSnapshotEpoch
s.nextSnapshotEpoch++
rootPrev := s.root
s.root = newSnapshot
// release lock

View File

@ -401,46 +401,23 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
}
}
s.rootLock.Lock()
newIndexSnapshot := &IndexSnapshot{
parent: s,
epoch: s.nextSnapshotEpoch,
segment: make([]*SegmentSnapshot, len(s.root.segment)),
offsets: make([]uint64, len(s.root.offsets)),
internal: make(map[string][]byte, len(s.root.internal)),
refs: 1,
}
s.nextSnapshotEpoch++
for i, segmentSnapshot := range s.root.segment {
// see if this segment has been replaced
if replacement, ok := newSegments[segmentSnapshot.id]; ok {
newSegmentSnapshot := &SegmentSnapshot{
id: segmentSnapshot.id,
segment: replacement,
deleted: segmentSnapshot.deleted,
cachedDocs: segmentSnapshot.cachedDocs,
}
newIndexSnapshot.segment[i] = newSegmentSnapshot
delete(newSegments, segmentSnapshot.id)
// update items persisted incase of a new segment snapshot
atomic.AddUint64(&s.stats.TotPersistedItems, newSegmentSnapshot.Count())
atomic.AddUint64(&s.stats.TotPersistedSegments, 1)
} else {
newIndexSnapshot.segment[i] = s.root.segment[i]
newIndexSnapshot.segment[i].segment.AddRef()
}
newIndexSnapshot.offsets[i] = s.root.offsets[i]
}
for k, v := range s.root.internal {
newIndexSnapshot.internal[k] = v
persist := &persistIntroduction{
persisted: newSegments,
applied: make(notificationChan),
}
rootPrev := s.root
s.root = newIndexSnapshot
s.rootLock.Unlock()
if rootPrev != nil {
_ = rootPrev.DecRef()
select {
case <-s.closeCh:
err = ErrClosed
return err
case s.persists <- persist:
}
select {
case <-s.closeCh:
err = ErrClosed
return err
case <-persist.applied:
}
}

View File

@ -39,6 +39,8 @@ const Name = "scorch"
const Version uint8 = 1
var ErrClosed = fmt.Errorf("scorch closed")
type Scorch struct {
readOnly bool
version uint8
@ -59,6 +61,7 @@ type Scorch struct {
closeCh chan struct{}
introductions chan *segmentIntroduction
persists chan *persistIntroduction
merges chan *segmentMerge
introducerNotifier chan *epochWatcher
revertToSnapshots chan *snapshotReversion
@ -174,6 +177,7 @@ func (s *Scorch) openBolt() error {
}
s.introductions = make(chan *segmentIntroduction)
s.persists = make(chan *persistIntroduction)
s.merges = make(chan *segmentMerge)
s.introducerNotifier = make(chan *epochWatcher, 1)
s.revertToSnapshots = make(chan *snapshotReversion)

View File

@ -46,6 +46,8 @@ type Stats struct {
TotIntroduceLoop uint64
TotIntroduceSegmentBeg uint64
TotIntroduceSegmentEnd uint64
TotIntroducePersistBeg uint64
TotIntroducePersistEnd uint64
TotIntroduceMergeBeg uint64
TotIntroduceMergeEnd uint64
TotIntroduceRevertBeg uint64