0
0
bleve/index/scorch/merge.go
Steve Yen 2be5eb4427 scorch tracks zap files that can't be removed yet
A race & solution found by Marty Schoch... consider a case when the
merger might grab a nextSegmentID, like 4, but takes awhile to
complete.  Meanwhile, the persister grabs the nextSegmentID of 5, but
finishes its persistence work fast, and then loops to cleanup any old
files.  The simple approach of checking a "highest segment ID" of 5 is
wrong now, because the deleter now thinks that segment 4's zap file is
(incorrectly) ok to delete.

The solution in this commit is to track an ephemeral map of filenames
which are ineligibleForRemoval, because they're still being written
(by the merger) and haven't been fully incorporated into the rootBolt
yet.

The merger adds to that ineligibleForRemoval map as it starts a merged
zap file, the persister cleans up entries from that map when it
persists zap filenames into the rootBolt, and the deleter (part of the
persister's loop) consults the map before performing any actual zap
file deletions.
2017-12-14 10:49:33 -08:00

177 lines
4.6 KiB
Go

// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scorch
import (
"fmt"
"log"
"os"
"sync/atomic"
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/index/scorch/mergeplan"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/blevesearch/bleve/index/scorch/segment/zap"
)
func (s *Scorch) mergerLoop() {
var lastEpochMergePlanned uint64
OUTER:
for {
select {
case <-s.closeCh:
break OUTER
default:
// check to see if there is a new snapshot to persist
s.rootLock.RLock()
ourSnapshot := s.root
ourSnapshot.AddRef()
s.rootLock.RUnlock()
if ourSnapshot.epoch != lastEpochMergePlanned {
// lets get started
err := s.planMergeAtSnapshot(ourSnapshot)
if err != nil {
log.Printf("merging err: %v", err)
_ = ourSnapshot.DecRef()
continue OUTER
}
lastEpochMergePlanned = ourSnapshot.epoch
}
_ = ourSnapshot.DecRef()
// tell the persister we're waiting for changes
// first make a notification chan
notifyUs := make(notificationChan)
// give it to the persister
select {
case <-s.closeCh:
break OUTER
case s.persisterNotifier <- notifyUs:
}
// check again
s.rootLock.RLock()
ourSnapshot = s.root
ourSnapshot.AddRef()
s.rootLock.RUnlock()
if ourSnapshot.epoch != lastEpochMergePlanned {
// lets get started
err := s.planMergeAtSnapshot(ourSnapshot)
if err != nil {
_ = ourSnapshot.DecRef()
continue OUTER
}
lastEpochMergePlanned = ourSnapshot.epoch
}
_ = ourSnapshot.DecRef()
// now wait for it (but also detect close)
select {
case <-s.closeCh:
break OUTER
case <-notifyUs:
// woken up, next loop should pick up work
}
}
}
s.asyncTasks.Done()
}
func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
// build list of zap segments in this snapshot
var onlyZapSnapshots []mergeplan.Segment
for _, segmentSnapshot := range ourSnapshot.segment {
if _, ok := segmentSnapshot.segment.(*zap.Segment); ok {
onlyZapSnapshots = append(onlyZapSnapshots, segmentSnapshot)
}
}
// give this list to the planner
resultMergePlan, err := mergeplan.Plan(onlyZapSnapshots, nil)
if err != nil {
return fmt.Errorf("merge planning err: %v", err)
}
if resultMergePlan == nil {
// nothing to do
return nil
}
// process tasks in serial for now
var notifications []notificationChan
for _, task := range resultMergePlan.Tasks {
oldMap := make(map[uint64]*SegmentSnapshot)
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
segmentsToMerge := make([]*zap.Segment, 0, len(task.Segments))
docsToDrop := make([]*roaring.Bitmap, 0, len(task.Segments))
for _, planSegment := range task.Segments {
if segSnapshot, ok := planSegment.(*SegmentSnapshot); ok {
oldMap[segSnapshot.id] = segSnapshot
if zapSeg, ok := segSnapshot.segment.(*zap.Segment); ok {
segmentsToMerge = append(segmentsToMerge, zapSeg)
docsToDrop = append(docsToDrop, segSnapshot.deleted)
}
}
}
filename := zapFileName(newSegmentID)
s.markIneligibleForRemoval(filename)
path := s.path + string(os.PathSeparator) + filename
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
return fmt.Errorf("merging failed: %v", err)
}
segment, err := zap.Open(path)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
return err
}
sm := &segmentMerge{
id: newSegmentID,
old: oldMap,
oldNewDocNums: make(map[uint64][]uint64),
new: segment,
notify: make(notificationChan),
}
notifications = append(notifications, sm.notify)
for i, segNewDocNums := range newDocNums {
sm.oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
}
// give it to the introducer
select {
case <-s.closeCh:
return nil
case s.merges <- sm:
}
}
for _, notification := range notifications {
<-notification
}
return nil
}
type segmentMerge struct {
id uint64
old map[uint64]*SegmentSnapshot
oldNewDocNums map[uint64][]uint64
new segment.Segment
notify notificationChan
}