2017-12-13 19:41:03 +01:00
|
|
|
// Copyright (c) 2017 Couchbase, Inc.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
package scorch
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
"os"
|
|
|
|
"sync/atomic"
|
|
|
|
|
|
|
|
"github.com/RoaringBitmap/roaring"
|
|
|
|
"github.com/blevesearch/bleve/index/scorch/mergeplan"
|
|
|
|
"github.com/blevesearch/bleve/index/scorch/segment"
|
|
|
|
"github.com/blevesearch/bleve/index/scorch/segment/zap"
|
|
|
|
)
|
|
|
|
|
|
|
|
func (s *Scorch) mergerLoop() {
|
|
|
|
var lastEpochMergePlanned uint64
|
|
|
|
OUTER:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-s.closeCh:
|
|
|
|
break OUTER
|
|
|
|
|
|
|
|
default:
|
|
|
|
// check to see if there is a new snapshot to persist
|
|
|
|
s.rootLock.RLock()
|
|
|
|
ourSnapshot := s.root
|
2017-12-13 22:10:44 +01:00
|
|
|
ourSnapshot.AddRef()
|
2017-12-13 19:41:03 +01:00
|
|
|
s.rootLock.RUnlock()
|
|
|
|
|
|
|
|
if ourSnapshot.epoch != lastEpochMergePlanned {
|
|
|
|
// lets get started
|
|
|
|
err := s.planMergeAtSnapshot(ourSnapshot)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("merging err: %v", err)
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = ourSnapshot.DecRef()
|
2017-12-13 19:41:03 +01:00
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
lastEpochMergePlanned = ourSnapshot.epoch
|
|
|
|
}
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = ourSnapshot.DecRef()
|
2017-12-13 19:41:03 +01:00
|
|
|
|
|
|
|
// tell the persister we're waiting for changes
|
|
|
|
// first make a notification chan
|
|
|
|
notifyUs := make(notificationChan)
|
|
|
|
|
|
|
|
// give it to the persister
|
|
|
|
select {
|
|
|
|
case <-s.closeCh:
|
|
|
|
break OUTER
|
|
|
|
case s.persisterNotifier <- notifyUs:
|
|
|
|
}
|
|
|
|
|
|
|
|
// check again
|
|
|
|
s.rootLock.RLock()
|
|
|
|
ourSnapshot = s.root
|
2017-12-13 22:10:44 +01:00
|
|
|
ourSnapshot.AddRef()
|
2017-12-13 19:41:03 +01:00
|
|
|
s.rootLock.RUnlock()
|
|
|
|
|
|
|
|
if ourSnapshot.epoch != lastEpochMergePlanned {
|
|
|
|
// lets get started
|
|
|
|
err := s.planMergeAtSnapshot(ourSnapshot)
|
|
|
|
if err != nil {
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = ourSnapshot.DecRef()
|
2017-12-13 19:41:03 +01:00
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
lastEpochMergePlanned = ourSnapshot.epoch
|
|
|
|
}
|
2017-12-13 22:10:44 +01:00
|
|
|
_ = ourSnapshot.DecRef()
|
2017-12-13 19:41:03 +01:00
|
|
|
|
|
|
|
// now wait for it (but also detect close)
|
|
|
|
select {
|
|
|
|
case <-s.closeCh:
|
|
|
|
break OUTER
|
|
|
|
case <-notifyUs:
|
|
|
|
// woken up, next loop should pick up work
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
s.asyncTasks.Done()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
|
|
|
|
// build list of zap segments in this snapshot
|
|
|
|
var onlyZapSnapshots []mergeplan.Segment
|
|
|
|
for _, segmentSnapshot := range ourSnapshot.segment {
|
|
|
|
if _, ok := segmentSnapshot.segment.(*zap.Segment); ok {
|
|
|
|
onlyZapSnapshots = append(onlyZapSnapshots, segmentSnapshot)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// give this list to the planner
|
|
|
|
resultMergePlan, err := mergeplan.Plan(onlyZapSnapshots, nil)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("merge planning err: %v", err)
|
|
|
|
}
|
|
|
|
if resultMergePlan == nil {
|
|
|
|
// nothing to do
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// process tasks in serial for now
|
|
|
|
var notifications []notificationChan
|
|
|
|
for _, task := range resultMergePlan.Tasks {
|
|
|
|
oldMap := make(map[uint64]*SegmentSnapshot)
|
|
|
|
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
|
|
|
|
segmentsToMerge := make([]*zap.Segment, 0, len(task.Segments))
|
|
|
|
docsToDrop := make([]*roaring.Bitmap, 0, len(task.Segments))
|
|
|
|
for _, planSegment := range task.Segments {
|
|
|
|
if segSnapshot, ok := planSegment.(*SegmentSnapshot); ok {
|
|
|
|
oldMap[segSnapshot.id] = segSnapshot
|
|
|
|
if zapSeg, ok := segSnapshot.segment.(*zap.Segment); ok {
|
|
|
|
segmentsToMerge = append(segmentsToMerge, zapSeg)
|
|
|
|
docsToDrop = append(docsToDrop, segSnapshot.deleted)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-12-14 19:49:33 +01:00
|
|
|
filename := zapFileName(newSegmentID)
|
|
|
|
s.markIneligibleForRemoval(filename)
|
2017-12-13 19:41:03 +01:00
|
|
|
path := s.path + string(os.PathSeparator) + filename
|
|
|
|
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024)
|
|
|
|
if err != nil {
|
2017-12-14 19:49:33 +01:00
|
|
|
s.unmarkIneligibleForRemoval(filename)
|
2017-12-13 19:41:03 +01:00
|
|
|
return fmt.Errorf("merging failed: %v", err)
|
|
|
|
}
|
|
|
|
segment, err := zap.Open(path)
|
|
|
|
if err != nil {
|
2017-12-14 19:49:33 +01:00
|
|
|
s.unmarkIneligibleForRemoval(filename)
|
2017-12-13 19:41:03 +01:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
sm := &segmentMerge{
|
|
|
|
id: newSegmentID,
|
|
|
|
old: oldMap,
|
|
|
|
oldNewDocNums: make(map[uint64][]uint64),
|
|
|
|
new: segment,
|
|
|
|
notify: make(notificationChan),
|
|
|
|
}
|
|
|
|
notifications = append(notifications, sm.notify)
|
|
|
|
for i, segNewDocNums := range newDocNums {
|
|
|
|
sm.oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
|
|
|
|
}
|
|
|
|
|
|
|
|
// give it to the introducer
|
|
|
|
select {
|
|
|
|
case <-s.closeCh:
|
|
|
|
return nil
|
|
|
|
case s.merges <- sm:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for _, notification := range notifications {
|
|
|
|
<-notification
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type segmentMerge struct {
|
|
|
|
id uint64
|
|
|
|
old map[uint64]*SegmentSnapshot
|
|
|
|
oldNewDocNums map[uint64][]uint64
|
|
|
|
new segment.Segment
|
|
|
|
notify notificationChan
|
|
|
|
}
|