scorch conditional merging during persistSnapshot()
As part of this change, there are nw helper methods -- persistSnapshotMaybeMerge() and persistSnapshotDirect().
This commit is contained in:
parent
a0b7508da7
commit
c50d9b4023
|
@ -247,6 +247,8 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
|||
})
|
||||
newSnapshot.offsets = append(newSnapshot.offsets, running)
|
||||
|
||||
newSnapshot.AddRef() // 1 ref for the nextMerge.notify response
|
||||
|
||||
// swap in new segment
|
||||
rootPrev := s.root
|
||||
s.root = newSnapshot
|
||||
|
@ -257,7 +259,8 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
|||
_ = rootPrev.DecRef()
|
||||
}
|
||||
|
||||
// notify merger we incorporated this
|
||||
// notify requester that we incorporated this
|
||||
nextMerge.notify <- newSnapshot
|
||||
close(nextMerge.notify)
|
||||
}
|
||||
|
||||
|
|
|
@ -103,7 +103,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
|
|||
}
|
||||
|
||||
// process tasks in serial for now
|
||||
var notifications []notificationChan
|
||||
var notifications []chan *IndexSnapshot
|
||||
for _, task := range resultMergePlan.Tasks {
|
||||
oldMap := make(map[uint64]*SegmentSnapshot)
|
||||
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
|
||||
|
@ -137,7 +137,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
|
|||
old: oldMap,
|
||||
oldNewDocNums: make(map[uint64][]uint64),
|
||||
new: segment,
|
||||
notify: make(notificationChan),
|
||||
notify: make(chan *IndexSnapshot, 1),
|
||||
}
|
||||
notifications = append(notifications, sm.notify)
|
||||
for i, segNewDocNums := range newDocNums {
|
||||
|
@ -156,7 +156,10 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
|
|||
select {
|
||||
case <-s.closeCh:
|
||||
return nil
|
||||
case <-notification:
|
||||
case newSnapshot := <-notification:
|
||||
if newSnapshot != nil {
|
||||
_ = newSnapshot.DecRef()
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -167,14 +170,15 @@ type segmentMerge struct {
|
|||
old map[uint64]*SegmentSnapshot
|
||||
oldNewDocNums map[uint64][]uint64
|
||||
new segment.Segment
|
||||
notify notificationChan
|
||||
notify chan *IndexSnapshot
|
||||
}
|
||||
|
||||
// perform in-memory merging of the given SegmentBase instances, and
|
||||
// synchronously introduce the merged segment into the root
|
||||
// perform a merging of the given SegmentBase instances into a new,
|
||||
// persisted segment, and synchronously introduce that new segment
|
||||
// into the root
|
||||
func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
|
||||
sbs []*zap.SegmentBase, sbsDrops []*roaring.Bitmap, sbsIndexes []int,
|
||||
chunkFactor uint32) (uint64, error) {
|
||||
chunkFactor uint32) (uint64, *IndexSnapshot, uint64, error) {
|
||||
var br bytes.Buffer
|
||||
|
||||
cr := zap.NewCountHashWriter(&br)
|
||||
|
@ -183,24 +187,36 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
|
|||
docValueOffset, dictLocs, fieldsInv, fieldsMap, err :=
|
||||
zap.MergeToWriter(sbs, sbsDrops, chunkFactor, cr)
|
||||
if err != nil {
|
||||
return 0, nil
|
||||
return 0, nil, 0, err
|
||||
}
|
||||
|
||||
segment, err := zap.InitSegmentBase(br.Bytes(), cr.Sum32(), chunkFactor,
|
||||
sb, err := zap.InitSegmentBase(br.Bytes(), cr.Sum32(), chunkFactor,
|
||||
fieldsMap, fieldsInv, numDocs, storedIndexOffset, fieldsIndexOffset,
|
||||
docValueOffset, dictLocs)
|
||||
if err != nil {
|
||||
return 0, nil
|
||||
return 0, nil, 0, err
|
||||
}
|
||||
|
||||
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
|
||||
|
||||
filename := zapFileName(newSegmentID)
|
||||
path := s.path + string(os.PathSeparator) + filename
|
||||
err = zap.PersistSegmentBase(sb, path)
|
||||
if err != nil {
|
||||
return 0, nil, 0, err
|
||||
}
|
||||
|
||||
segment, err := zap.Open(path)
|
||||
if err != nil {
|
||||
return 0, nil, 0, err
|
||||
}
|
||||
|
||||
sm := &segmentMerge{
|
||||
id: newSegmentID,
|
||||
old: make(map[uint64]*SegmentSnapshot),
|
||||
oldNewDocNums: make(map[uint64][]uint64),
|
||||
new: segment,
|
||||
notify: make(notificationChan),
|
||||
notify: make(chan *IndexSnapshot, 1),
|
||||
}
|
||||
|
||||
for i, idx := range sbsIndexes {
|
||||
|
@ -211,15 +227,15 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
|
|||
|
||||
select { // send to introducer
|
||||
case <-s.closeCh:
|
||||
return 0, nil // TODO: instead return some ErrInterruptedClosed?
|
||||
_ = segment.DecRef()
|
||||
return 0, nil, 0, nil // TODO: return ErrInterruptedClosed?
|
||||
case s.merges <- sm:
|
||||
}
|
||||
|
||||
select { // wait for introduction to complete
|
||||
case <-s.closeCh:
|
||||
return 0, nil // TODO: instead return some ErrInterruptedClosed?
|
||||
case <-sm.notify:
|
||||
return 0, nil, 0, nil // TODO: return ErrInterruptedClosed?
|
||||
case newSnapshot := <-sm.notify:
|
||||
return numDocs, newSnapshot, newSegmentID, nil
|
||||
}
|
||||
|
||||
return numDocs, nil
|
||||
}
|
||||
|
|
|
@ -145,7 +145,100 @@ OUTER:
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) {
|
||||
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
|
||||
persisted, err := s.persistSnapshotMaybeMerge(snapshot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if persisted {
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.persistSnapshotDirect(snapshot)
|
||||
}
|
||||
|
||||
// DefaultMinSegmentsForInMemoryMerge represents the default number of
|
||||
// in-memory zap segments that persistSnapshotMaybeMerge() needs to
|
||||
// see in an IndexSnapshot before it decides to merge and persist
|
||||
// those segments
|
||||
var DefaultMinSegmentsForInMemoryMerge = 2
|
||||
|
||||
// persistSnapshotMaybeMerge examines the snapshot and might merge and
|
||||
// persist the in-memory zap segments if there are enough of them
|
||||
func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
|
||||
bool, error) {
|
||||
// collect the in-memory zap segments (SegmentBase instances)
|
||||
var sbs []*zap.SegmentBase
|
||||
var sbsDrops []*roaring.Bitmap
|
||||
var sbsIndexes []int
|
||||
|
||||
for i, segmentSnapshot := range snapshot.segment {
|
||||
if sb, ok := segmentSnapshot.segment.(*zap.SegmentBase); ok {
|
||||
sbs = append(sbs, sb)
|
||||
sbsDrops = append(sbsDrops, segmentSnapshot.deleted)
|
||||
sbsIndexes = append(sbsIndexes, i)
|
||||
}
|
||||
}
|
||||
|
||||
if len(sbs) < DefaultMinSegmentsForInMemoryMerge {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
_, newSnapshot, newSegmentID, err := s.mergeSegmentBases(
|
||||
snapshot, sbs, sbsDrops, sbsIndexes, DefaultChunkFactor)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if newSnapshot == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = newSnapshot.DecRef()
|
||||
}()
|
||||
|
||||
mergedSegmentIDs := map[uint64]struct{}{}
|
||||
for _, idx := range sbsIndexes {
|
||||
mergedSegmentIDs[snapshot.segment[idx].id] = struct{}{}
|
||||
}
|
||||
|
||||
// construct a snapshot that's logically equivalent to the input
|
||||
// snapshot, but with merged segments replaced by the new segment
|
||||
equiv := &IndexSnapshot{
|
||||
parent: snapshot.parent,
|
||||
segment: make([]*SegmentSnapshot, 0, len(snapshot.segment)),
|
||||
internal: snapshot.internal,
|
||||
epoch: snapshot.epoch,
|
||||
}
|
||||
|
||||
// copy to the equiv the segments that weren't replaced
|
||||
for _, segment := range snapshot.segment {
|
||||
if _, wasMerged := mergedSegmentIDs[segment.id]; !wasMerged {
|
||||
equiv.segment = append(equiv.segment, segment)
|
||||
}
|
||||
}
|
||||
|
||||
// append to the equiv the new segment
|
||||
for _, segment := range newSnapshot.segment {
|
||||
if segment.id == newSegmentID {
|
||||
equiv.segment = append(equiv.segment, &SegmentSnapshot{
|
||||
id: newSegmentID,
|
||||
segment: segment.segment,
|
||||
deleted: nil, // nil since merging handled deletions
|
||||
})
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
err = s.persistSnapshotDirect(equiv)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
|
||||
// start a write transaction
|
||||
tx, err := s.rootBolt.Begin(true)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue