0
0
Fork 0

Merge pull request #767 from steveyen/persistSnapshot-err-handling

improvements to err handling in persistSnapshot(), etc
This commit is contained in:
Steve Yen 2018-02-13 14:53:42 -08:00 committed by GitHub
commit 030469a351
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 23 deletions

View File

@ -146,6 +146,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
// give it to the introducer
select {
case <-s.closeCh:
_ = segment.Close()
return nil
case s.merges <- sm:
}

View File

@ -145,23 +145,15 @@ OUTER:
}
}
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) {
// start a write transaction
tx, err := s.rootBolt.Begin(true)
if err != nil {
return err
}
// defer fsync of the rootbolt
// defer rollback on error
defer func() {
if err == nil {
err = s.rootBolt.Sync()
}
}()
// defer commit/rollback transaction
defer func() {
if err == nil {
err = tx.Commit()
} else {
if err != nil {
_ = tx.Rollback()
}
}()
@ -195,18 +187,18 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
// first ensure that each segment in this snapshot has been persisted
for _, segmentSnapshot := range snapshot.segment {
snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id)
snapshotSegmentBucket, err2 := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey)
if err2 != nil {
return err2
snapshotSegmentBucket, err := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey)
if err != nil {
return err
}
switch seg := segmentSnapshot.segment.(type) {
case *zap.SegmentBase:
// need to persist this to disk
filename := zapFileName(segmentSnapshot.id)
path := s.path + string(os.PathSeparator) + filename
err2 := zap.PersistSegmentBase(seg, path)
if err2 != nil {
return fmt.Errorf("error persisting segment: %v", err2)
err = zap.PersistSegmentBase(seg, path)
if err != nil {
return fmt.Errorf("error persisting segment: %v", err)
}
newSegmentPaths[segmentSnapshot.id] = path
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
@ -249,14 +241,18 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
if len(newSegmentPaths) > 0 {
// now try to open all the new snapshots
newSegments := make(map[uint64]segment.Segment)
defer func() {
for _, s := range newSegments {
if s != nil {
// cleanup segments that were opened but not
// swapped into the new root
_ = s.Close()
}
}
}()
for segmentID, path := range newSegmentPaths {
newSegments[segmentID], err = zap.Open(path)
if err != nil {
for _, s := range newSegments {
if s != nil {
_ = s.Close() // cleanup segments that were successfully opened
}
}
return fmt.Errorf("error opening new segment at %s, %v", path, err)
}
}
@ -281,6 +277,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
cachedDocs: segmentSnapshot.cachedDocs,
}
newIndexSnapshot.segment[i] = newSegmentSnapshot
delete(newSegments, segmentSnapshot.id)
// update items persisted incase of a new segment snapshot
atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count())
} else {
@ -300,7 +297,19 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
_ = rootPrev.DecRef()
}
}
// allow files to become eligible for removal
err = tx.Commit()
if err != nil {
return err
}
err = s.rootBolt.Sync()
if err != nil {
return err
}
// allow files to become eligible for removal after commit, such
// as file segments from snapshots that came from the merger
s.rootLock.Lock()
for _, filename := range filenames {
delete(s.ineligibleForRemoval, filename)