0
0
bleve/index/scorch/persister.go
Steve Yen ecbb3d2df4 scorch handles non-updating batches better
This commit improves handling when an incoming batch has internal-data
updates only and no doc updates.  In this case, a nil segment instead
of an empty segment instance is used in the segmentIntroduction.  The
segmentIntroduction, that is, might now hold only internal-data
updates only.

To handle synchronous persistence, a new field that's a slice of
persisted notification channels is added to the IndexSnapshot struct,
which the persister goroutine will close as each IndexSnapshot is
persisted.

Also, as part of this change, instead of checking the unsafeBatch flag
in several places, we instead check for non-nil'ness of these
persisted chan's.
2017-12-17 08:51:23 -08:00

581 lines
15 KiB
Go

// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scorch
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
"github.com/blevesearch/bleve/index/scorch/segment/zap"
"github.com/boltdb/bolt"
)
type notificationChan chan struct{}
func (s *Scorch) persisterLoop() {
var notify notificationChan
var lastPersistedEpoch uint64
OUTER:
for {
select {
case <-s.closeCh:
break OUTER
case notify = <-s.persisterNotifier:
default:
// check to see if there is a new snapshot to persist
s.rootLock.RLock()
ourSnapshot := s.root
ourSnapshot.AddRef()
s.rootLock.RUnlock()
if ourSnapshot.epoch != lastPersistedEpoch {
// lets get started
err := s.persistSnapshot(ourSnapshot)
if err != nil {
log.Printf("got err persisting snapshot: %v", err)
_ = ourSnapshot.DecRef()
continue OUTER
}
lastPersistedEpoch = ourSnapshot.epoch
if notify != nil {
close(notify)
notify = nil
}
}
_ = ourSnapshot.DecRef()
// tell the introducer we're waiting for changes
// first make a notification chan
notifyUs := make(notificationChan)
// give it to the introducer
select {
case <-s.closeCh:
break OUTER
case s.introducerNotifier <- notifyUs:
}
// check again
s.rootLock.RLock()
ourSnapshot = s.root
ourSnapshot.AddRef()
s.rootLock.RUnlock()
if ourSnapshot.epoch != lastPersistedEpoch {
// lets get started
err := s.persistSnapshot(ourSnapshot)
if err != nil {
log.Printf("got err persisting snapshot: %v", err)
_ = ourSnapshot.DecRef()
continue OUTER
}
lastPersistedEpoch = ourSnapshot.epoch
if notify != nil {
close(notify)
notify = nil
}
}
_ = ourSnapshot.DecRef()
// now wait for it (but also detect close)
select {
case <-s.closeCh:
break OUTER
case <-notifyUs:
// woken up, next loop should pick up work
}
}
s.removeOldData()
}
s.asyncTasks.Done()
}
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
// start a write transaction
tx, err := s.rootBolt.Begin(true)
if err != nil {
return err
}
// defer fsync of the rootbolt
defer func() {
if err == nil {
err = s.rootBolt.Sync()
}
}()
// defer commit/rollback transaction
defer func() {
if err == nil {
err = tx.Commit()
} else {
_ = tx.Rollback()
}
}()
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
if err != nil {
return err
}
newSnapshotKey := segment.EncodeUvarintAscending(nil, snapshot.epoch)
snapshotBucket, err := snapshotsBucket.CreateBucketIfNotExists(newSnapshotKey)
if err != nil {
return err
}
// persist internal values
internalBucket, err := snapshotBucket.CreateBucketIfNotExists(boltInternalKey)
if err != nil {
return err
}
// TODO optimize writing these in order?
for k, v := range snapshot.internal {
err = internalBucket.Put([]byte(k), v)
if err != nil {
return err
}
}
var filenames []string
newSegmentPaths := make(map[uint64]string)
// first ensure that each segment in this snapshot has been persisted
for i, segmentSnapshot := range snapshot.segment {
snapshotSegmentKey := segment.EncodeUvarintAscending(nil, uint64(i))
snapshotSegmentBucket, err2 := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey)
if err2 != nil {
return err2
}
switch seg := segmentSnapshot.segment.(type) {
case *mem.Segment:
// need to persist this to disk
filename := zapFileName(segmentSnapshot.id)
path := s.path + string(os.PathSeparator) + filename
err2 := zap.PersistSegment(seg, path, 1024)
if err2 != nil {
return fmt.Errorf("error persisting segment: %v", err2)
}
newSegmentPaths[segmentSnapshot.id] = path
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
if err != nil {
return err
}
filenames = append(filenames, filename)
case *zap.Segment:
path := seg.Path()
filename := strings.TrimPrefix(path, s.path+string(os.PathSeparator))
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
if err != nil {
return err
}
filenames = append(filenames, filename)
default:
return fmt.Errorf("unknown segment type: %T", seg)
}
// store current deleted bits
var roaringBuf bytes.Buffer
if segmentSnapshot.deleted != nil {
_, err = segmentSnapshot.deleted.WriteTo(&roaringBuf)
if err != nil {
return fmt.Errorf("error persisting roaring bytes: %v", err)
}
err = snapshotSegmentBucket.Put(boltDeletedKey, roaringBuf.Bytes())
if err != nil {
return err
}
}
}
// now try to open all the new snapshots
newSegments := make(map[uint64]segment.Segment)
for segmentID, path := range newSegmentPaths {
newSegments[segmentID], err = zap.Open(path)
if err != nil {
for _, s := range newSegments {
if s != nil {
_ = s.Close() // cleanup segments that were successfully opened
}
}
return fmt.Errorf("error opening new segment at %s, %v", path, err)
}
}
// get write lock and update the current snapshot with disk-based versions
snapshot.m.Lock()
notifications := snapshot.persisted
snapshot.persisted = nil
snapshot.m.Unlock()
s.rootLock.Lock()
newIndexSnapshot := &IndexSnapshot{
parent: s,
epoch: s.root.epoch,
segment: make([]*SegmentSnapshot, len(s.root.segment)),
offsets: make([]uint64, len(s.root.offsets)),
internal: make(map[string][]byte, len(s.root.internal)),
refs: 1,
}
for i, segmentSnapshot := range s.root.segment {
// see if this segment has been replaced
if replacement, ok := newSegments[segmentSnapshot.id]; ok {
newSegmentSnapshot := &SegmentSnapshot{
segment: replacement,
deleted: segmentSnapshot.deleted,
id: segmentSnapshot.id,
cachedDocs: segmentSnapshot.cachedDocs,
}
newIndexSnapshot.segment[i] = newSegmentSnapshot
// add the old segment snapshots notifications to the list
for _, notification := range segmentSnapshot.persisted {
notifications = append(notifications, notification)
}
} else {
newIndexSnapshot.segment[i] = s.root.segment[i]
newIndexSnapshot.segment[i].segment.AddRef()
}
newIndexSnapshot.offsets[i] = s.root.offsets[i]
}
for k, v := range s.root.internal {
newIndexSnapshot.internal[k] = v
}
for _, filename := range filenames {
delete(s.ineligibleForRemoval, filename)
}
rootPrev := s.root
s.root = newIndexSnapshot
s.rootLock.Unlock()
if rootPrev != nil {
_ = rootPrev.DecRef()
}
// now that we've given up the lock, notify everyone that we've safely
// persisted their data
for _, notification := range notifications {
close(notification)
}
return nil
}
func zapFileName(epoch uint64) string {
return fmt.Sprintf("%012x.zap", epoch)
}
// bolt snapshot code
var boltSnapshotsBucket = []byte{'s'}
var boltPathKey = []byte{'p'}
var boltDeletedKey = []byte{'d'}
var boltInternalKey = []byte{'i'}
func (s *Scorch) loadFromBolt() error {
return s.rootBolt.View(func(tx *bolt.Tx) error {
snapshots := tx.Bucket(boltSnapshotsBucket)
if snapshots == nil {
return nil
}
foundRoot := false
c := snapshots.Cursor()
for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
_, snapshotEpoch, err := segment.DecodeUvarintAscending(k)
if err != nil {
log.Printf("unable to parse segment epoch %x, continuing", k)
continue
}
if foundRoot {
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
continue
}
snapshot := snapshots.Bucket(k)
if snapshot == nil {
log.Printf("snapshot key, but bucket missing %x, continuing", k)
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
continue
}
indexSnapshot, err := s.loadSnapshot(snapshot)
if err != nil {
log.Printf("unable to load snapshot, %v, continuing", err)
s.eligibleForRemoval = append(s.eligibleForRemoval, snapshotEpoch)
continue
}
indexSnapshot.epoch = snapshotEpoch
// set the nextSegmentID
s.nextSegmentID, err = s.maxSegmentIDOnDisk()
if err != nil {
return err
}
s.nextSegmentID++
s.nextSnapshotEpoch = snapshotEpoch + 1
s.rootLock.Lock()
if s.root != nil {
_ = s.root.DecRef()
}
s.root = indexSnapshot
s.rootLock.Unlock()
foundRoot = true
}
return nil
})
}
func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
rv := &IndexSnapshot{
parent: s,
internal: make(map[string][]byte),
refs: 1,
}
var running uint64
c := snapshot.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
if k[0] == boltInternalKey[0] {
internalBucket := snapshot.Bucket(k)
err := internalBucket.ForEach(func(key []byte, val []byte) error {
copiedVal := append([]byte(nil), val...)
rv.internal[string(key)] = copiedVal
return nil
})
if err != nil {
_ = rv.DecRef()
return nil, err
}
} else {
segmentBucket := snapshot.Bucket(k)
if segmentBucket == nil {
_ = rv.DecRef()
return nil, fmt.Errorf("segment key, but bucket missing % x", k)
}
segmentSnapshot, err := s.loadSegment(segmentBucket)
if err != nil {
_ = rv.DecRef()
return nil, fmt.Errorf("failed to load segment: %v", err)
}
_, segmentSnapshot.id, err = segment.DecodeUvarintAscending(k)
if err != nil {
_ = rv.DecRef()
return nil, fmt.Errorf("failed to decode segment id: %v", err)
}
rv.segment = append(rv.segment, segmentSnapshot)
rv.offsets = append(rv.offsets, running)
running += segmentSnapshot.segment.Count()
}
}
return rv, nil
}
func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, error) {
pathBytes := segmentBucket.Get(boltPathKey)
if pathBytes == nil {
return nil, fmt.Errorf("segment path missing")
}
segmentPath := s.path + string(os.PathSeparator) + string(pathBytes)
segment, err := zap.Open(segmentPath)
if err != nil {
return nil, fmt.Errorf("error opening bolt segment: %v", err)
}
rv := &SegmentSnapshot{
segment: segment,
cachedDocs: &cachedDocs{cache: nil},
}
deletedBytes := segmentBucket.Get(boltDeletedKey)
if deletedBytes != nil {
deletedBitmap := roaring.NewBitmap()
r := bytes.NewReader(deletedBytes)
_, err := deletedBitmap.ReadFrom(r)
if err != nil {
_ = segment.Close()
return nil, fmt.Errorf("error reading deleted bytes: %v", err)
}
rv.deleted = deletedBitmap
}
return rv, nil
}
type uint64Descending []uint64
func (p uint64Descending) Len() int { return len(p) }
func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] }
func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (s *Scorch) removeOldData() {
removed, err := s.removeOldBoltSnapshots()
if err != nil {
log.Printf("got err removing old bolt snapshots: %v", err)
}
if removed > 0 {
err = s.removeOldZapFiles()
if err != nil {
log.Printf("got err removing old zap files: %v", err)
}
}
}
// NumSnapshotsToKeep represents how many recent, old snapshots to
// keep around per Scorch instance. Useful for apps that require
// rollback'ability.
var NumSnapshotsToKeep int
// Removes enough snapshots from the rootBolt so that the
// s.eligibleForRemoval stays under the NumSnapshotsToKeep policy.
func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) {
var epochsToRemove []uint64
s.rootLock.Lock()
if len(s.eligibleForRemoval) > NumSnapshotsToKeep {
sort.Sort(uint64Descending(s.eligibleForRemoval))
epochsToRemove = append([]uint64(nil), s.eligibleForRemoval[NumSnapshotsToKeep:]...) // Copy.
s.eligibleForRemoval = s.eligibleForRemoval[0:NumSnapshotsToKeep]
}
s.rootLock.Unlock()
if len(epochsToRemove) <= 0 {
return 0, nil
}
tx, err := s.rootBolt.Begin(true)
if err != nil {
return 0, err
}
defer func() {
if err == nil {
err = s.rootBolt.Sync()
}
}()
defer func() {
if err == nil {
err = tx.Commit()
} else {
_ = tx.Rollback()
}
}()
for _, epochToRemove := range epochsToRemove {
k := segment.EncodeUvarintAscending(nil, epochToRemove)
err = tx.DeleteBucket(k)
if err == bolt.ErrBucketNotFound {
err = nil
}
if err == nil {
numRemoved++
}
}
return numRemoved, err
}
func (s *Scorch) maxSegmentIDOnDisk() (uint64, error) {
currFileInfos, err := ioutil.ReadDir(s.path)
if err != nil {
return 0, err
}
var rv uint64
for _, finfo := range currFileInfos {
fname := finfo.Name()
if filepath.Ext(fname) == ".zap" {
prefix := strings.TrimSuffix(fname, ".zap")
id, err2 := strconv.ParseUint(prefix, 16, 64)
if err2 != nil {
return 0, err2
}
if id > rv {
rv = id
}
}
}
return rv, err
}
// Removes any *.zap files which aren't listed in the rootBolt.
func (s *Scorch) removeOldZapFiles() error {
liveFileNames, err := s.loadZapFileNames()
if err != nil {
return err
}
currFileInfos, err := ioutil.ReadDir(s.path)
if err != nil {
return err
}
s.rootLock.RLock()
for _, finfo := range currFileInfos {
fname := finfo.Name()
if filepath.Ext(fname) == ".zap" {
if _, exists := liveFileNames[fname]; !exists && !s.ineligibleForRemoval[fname] {
err := os.Remove(s.path + string(os.PathSeparator) + fname)
if err != nil {
log.Printf("got err removing file: %s, err: %v", fname, err)
}
}
}
}
s.rootLock.RUnlock()
return nil
}
// Returns the *.zap file names that are listed in the rootBolt.
func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) {
rv := map[string]struct{}{}
err := s.rootBolt.View(func(tx *bolt.Tx) error {
snapshots := tx.Bucket(boltSnapshotsBucket)
if snapshots == nil {
return nil
}
sc := snapshots.Cursor()
for sk, _ := sc.First(); sk != nil; sk, _ = sc.Next() {
snapshot := snapshots.Bucket(sk)
if snapshot == nil {
continue
}
segc := snapshot.Cursor()
for segk, _ := segc.First(); segk != nil; segk, _ = segc.Next() {
if segk[0] == boltInternalKey[0] {
continue
}
segmentBucket := snapshot.Bucket(segk)
if segmentBucket == nil {
continue
}
pathBytes := segmentBucket.Get(boltPathKey)
if pathBytes == nil {
continue
}
pathString := string(pathBytes)
rv[string(pathString)] = struct{}{}
}
}
return nil
})
return rv, err
}