2be5eb4427
A race & solution found by Marty Schoch... consider a case when the merger might grab a nextSegmentID, like 4, but takes awhile to complete. Meanwhile, the persister grabs the nextSegmentID of 5, but finishes its persistence work fast, and then loops to cleanup any old files. The simple approach of checking a "highest segment ID" of 5 is wrong now, because the deleter now thinks that segment 4's zap file is (incorrectly) ok to delete. The solution in this commit is to track an ephemeral map of filenames which are ineligibleForRemoval, because they're still being written (by the merger) and haven't been fully incorporated into the rootBolt yet. The merger adds to that ineligibleForRemoval map as it starts a merged zap file, the persister cleans up entries from that map when it persists zap filenames into the rootBolt, and the deleter (part of the persister's loop) consults the map before performing any actual zap file deletions.
177 lines
4.6 KiB
Go
177 lines
4.6 KiB
Go
// Copyright (c) 2017 Couchbase, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package scorch
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"sync/atomic"
|
|
|
|
"github.com/RoaringBitmap/roaring"
|
|
"github.com/blevesearch/bleve/index/scorch/mergeplan"
|
|
"github.com/blevesearch/bleve/index/scorch/segment"
|
|
"github.com/blevesearch/bleve/index/scorch/segment/zap"
|
|
)
|
|
|
|
func (s *Scorch) mergerLoop() {
|
|
var lastEpochMergePlanned uint64
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case <-s.closeCh:
|
|
break OUTER
|
|
|
|
default:
|
|
// check to see if there is a new snapshot to persist
|
|
s.rootLock.RLock()
|
|
ourSnapshot := s.root
|
|
ourSnapshot.AddRef()
|
|
s.rootLock.RUnlock()
|
|
|
|
if ourSnapshot.epoch != lastEpochMergePlanned {
|
|
// lets get started
|
|
err := s.planMergeAtSnapshot(ourSnapshot)
|
|
if err != nil {
|
|
log.Printf("merging err: %v", err)
|
|
_ = ourSnapshot.DecRef()
|
|
continue OUTER
|
|
}
|
|
lastEpochMergePlanned = ourSnapshot.epoch
|
|
}
|
|
_ = ourSnapshot.DecRef()
|
|
|
|
// tell the persister we're waiting for changes
|
|
// first make a notification chan
|
|
notifyUs := make(notificationChan)
|
|
|
|
// give it to the persister
|
|
select {
|
|
case <-s.closeCh:
|
|
break OUTER
|
|
case s.persisterNotifier <- notifyUs:
|
|
}
|
|
|
|
// check again
|
|
s.rootLock.RLock()
|
|
ourSnapshot = s.root
|
|
ourSnapshot.AddRef()
|
|
s.rootLock.RUnlock()
|
|
|
|
if ourSnapshot.epoch != lastEpochMergePlanned {
|
|
// lets get started
|
|
err := s.planMergeAtSnapshot(ourSnapshot)
|
|
if err != nil {
|
|
_ = ourSnapshot.DecRef()
|
|
continue OUTER
|
|
}
|
|
lastEpochMergePlanned = ourSnapshot.epoch
|
|
}
|
|
_ = ourSnapshot.DecRef()
|
|
|
|
// now wait for it (but also detect close)
|
|
select {
|
|
case <-s.closeCh:
|
|
break OUTER
|
|
case <-notifyUs:
|
|
// woken up, next loop should pick up work
|
|
}
|
|
}
|
|
}
|
|
s.asyncTasks.Done()
|
|
}
|
|
|
|
func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
|
|
// build list of zap segments in this snapshot
|
|
var onlyZapSnapshots []mergeplan.Segment
|
|
for _, segmentSnapshot := range ourSnapshot.segment {
|
|
if _, ok := segmentSnapshot.segment.(*zap.Segment); ok {
|
|
onlyZapSnapshots = append(onlyZapSnapshots, segmentSnapshot)
|
|
}
|
|
}
|
|
|
|
// give this list to the planner
|
|
resultMergePlan, err := mergeplan.Plan(onlyZapSnapshots, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("merge planning err: %v", err)
|
|
}
|
|
if resultMergePlan == nil {
|
|
// nothing to do
|
|
return nil
|
|
}
|
|
|
|
// process tasks in serial for now
|
|
var notifications []notificationChan
|
|
for _, task := range resultMergePlan.Tasks {
|
|
oldMap := make(map[uint64]*SegmentSnapshot)
|
|
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
|
|
segmentsToMerge := make([]*zap.Segment, 0, len(task.Segments))
|
|
docsToDrop := make([]*roaring.Bitmap, 0, len(task.Segments))
|
|
for _, planSegment := range task.Segments {
|
|
if segSnapshot, ok := planSegment.(*SegmentSnapshot); ok {
|
|
oldMap[segSnapshot.id] = segSnapshot
|
|
if zapSeg, ok := segSnapshot.segment.(*zap.Segment); ok {
|
|
segmentsToMerge = append(segmentsToMerge, zapSeg)
|
|
docsToDrop = append(docsToDrop, segSnapshot.deleted)
|
|
}
|
|
}
|
|
}
|
|
|
|
filename := zapFileName(newSegmentID)
|
|
s.markIneligibleForRemoval(filename)
|
|
path := s.path + string(os.PathSeparator) + filename
|
|
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024)
|
|
if err != nil {
|
|
s.unmarkIneligibleForRemoval(filename)
|
|
return fmt.Errorf("merging failed: %v", err)
|
|
}
|
|
segment, err := zap.Open(path)
|
|
if err != nil {
|
|
s.unmarkIneligibleForRemoval(filename)
|
|
return err
|
|
}
|
|
sm := &segmentMerge{
|
|
id: newSegmentID,
|
|
old: oldMap,
|
|
oldNewDocNums: make(map[uint64][]uint64),
|
|
new: segment,
|
|
notify: make(notificationChan),
|
|
}
|
|
notifications = append(notifications, sm.notify)
|
|
for i, segNewDocNums := range newDocNums {
|
|
sm.oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
|
|
}
|
|
|
|
// give it to the introducer
|
|
select {
|
|
case <-s.closeCh:
|
|
return nil
|
|
case s.merges <- sm:
|
|
}
|
|
}
|
|
for _, notification := range notifications {
|
|
<-notification
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type segmentMerge struct {
|
|
id uint64
|
|
old map[uint64]*SegmentSnapshot
|
|
oldNewDocNums map[uint64][]uint64
|
|
new segment.Segment
|
|
notify notificationChan
|
|
}
|