From dee6a2b1c64802c6a84a830f4a149bf25fbf91c4 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Thu, 8 Feb 2018 12:33:19 -0800 Subject: [PATCH 1/3] scorch persistSnapshot() consistently uses err to commit vs abort Some codepaths in persistSnapshot() were saving errors into an err2 local variable, which might lead incorrectly to commit during an error situation rather than abort. --- index/scorch/persister.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 61a266ad..07f38b81 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -196,17 +196,17 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { for _, segmentSnapshot := range snapshot.segment { snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id) snapshotSegmentBucket, err2 := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey) - if err2 != nil { - return err2 + if err != nil { + return err } switch seg := segmentSnapshot.segment.(type) { case *zap.SegmentBase: // need to persist this to disk filename := zapFileName(segmentSnapshot.id) path := s.path + string(os.PathSeparator) + filename - err2 := zap.PersistSegmentBase(seg, path) - if err2 != nil { - return fmt.Errorf("error persisting segment: %v", err2) + err = zap.PersistSegmentBase(seg, path) + if err != nil { + return fmt.Errorf("error persisting segment: %v", err) } newSegmentPaths[segmentSnapshot.id] = path err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename)) From 83272a9629509d40e8db4fe382e7b95e1fd3dcff Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Thu, 8 Feb 2018 12:47:07 -0800 Subject: [PATCH 2/3] scorch persistSnapshot() err handling & propagation --- index/scorch/persister.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 07f38b81..dab753d7 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -145,23 +145,15 @@ OUTER: } } -func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { +func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { // start a write transaction tx, err := s.rootBolt.Begin(true) if err != nil { return err } - // defer fsync of the rootbolt + // defer rollback on error defer func() { - if err == nil { - err = s.rootBolt.Sync() - } - }() - // defer commit/rollback transaction - defer func() { - if err == nil { - err = tx.Commit() - } else { + if err != nil { _ = tx.Rollback() } }() @@ -195,7 +187,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { // first ensure that each segment in this snapshot has been persisted for _, segmentSnapshot := range snapshot.segment { snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id) - snapshotSegmentBucket, err2 := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey) + snapshotSegmentBucket, err := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey) if err != nil { return err } @@ -300,7 +292,19 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { _ = rootPrev.DecRef() } } - // allow files to become eligible for removal + + err = tx.Commit() + if err != nil { + return err + } + + err = s.rootBolt.Sync() + if err != nil { + return err + } + + // allow files to become eligible for removal after commit, such + // as file segments from snapshots that came from the merger s.rootLock.Lock() for _, filename := range filenames { delete(s.ineligibleForRemoval, filename) From 6f5f90cd41720665b04e36805605638fef3120f9 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 7 Feb 2018 16:54:58 -0800 Subject: [PATCH 3/3] scorch zap segment cleanup handling for some edge cases Two cases in this commit... If we're shutting down, the merger might not have handed off its latest merged segment to the introducer yet, so the merger still owns the segment and needs to Close() that segment itself. In persistSnapshot(), there migth be cases where the persister might not be able to swap in its newly persisted segments -- so, the persistSnapshot() needs to Close() those segments itself. --- index/scorch/merge.go | 1 + index/scorch/persister.go | 15 ++++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 41abe065..fb4e80d2 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -146,6 +146,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error { // give it to the introducer select { case <-s.closeCh: + _ = segment.Close() return nil case s.merges <- sm: } diff --git a/index/scorch/persister.go b/index/scorch/persister.go index dab753d7..83909a88 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -241,14 +241,18 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { if len(newSegmentPaths) > 0 { // now try to open all the new snapshots newSegments := make(map[uint64]segment.Segment) + defer func() { + for _, s := range newSegments { + if s != nil { + // cleanup segments that were opened but not + // swapped into the new root + _ = s.Close() + } + } + }() for segmentID, path := range newSegmentPaths { newSegments[segmentID], err = zap.Open(path) if err != nil { - for _, s := range newSegments { - if s != nil { - _ = s.Close() // cleanup segments that were successfully opened - } - } return fmt.Errorf("error opening new segment at %s, %v", path, err) } } @@ -273,6 +277,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { cachedDocs: segmentSnapshot.cachedDocs, } newIndexSnapshot.segment[i] = newSegmentSnapshot + delete(newSegments, segmentSnapshot.id) // update items persisted incase of a new segment snapshot atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count()) } else {