diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 41abe065..fb4e80d2 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -146,6 +146,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error { // give it to the introducer select { case <-s.closeCh: + _ = segment.Close() return nil case s.merges <- sm: } diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 61a266ad..83909a88 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -145,23 +145,15 @@ OUTER: } } -func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { +func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) { // start a write transaction tx, err := s.rootBolt.Begin(true) if err != nil { return err } - // defer fsync of the rootbolt + // defer rollback on error defer func() { - if err == nil { - err = s.rootBolt.Sync() - } - }() - // defer commit/rollback transaction - defer func() { - if err == nil { - err = tx.Commit() - } else { + if err != nil { _ = tx.Rollback() } }() @@ -195,18 +187,18 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { // first ensure that each segment in this snapshot has been persisted for _, segmentSnapshot := range snapshot.segment { snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id) - snapshotSegmentBucket, err2 := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey) - if err2 != nil { - return err2 + snapshotSegmentBucket, err := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey) + if err != nil { + return err } switch seg := segmentSnapshot.segment.(type) { case *zap.SegmentBase: // need to persist this to disk filename := zapFileName(segmentSnapshot.id) path := s.path + string(os.PathSeparator) + filename - err2 := zap.PersistSegmentBase(seg, path) - if err2 != nil { - return fmt.Errorf("error persisting segment: %v", err2) + err = zap.PersistSegmentBase(seg, path) + if err != nil { + return fmt.Errorf("error persisting segment: %v", err) } newSegmentPaths[segmentSnapshot.id] = path err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename)) @@ -249,14 +241,18 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { if len(newSegmentPaths) > 0 { // now try to open all the new snapshots newSegments := make(map[uint64]segment.Segment) + defer func() { + for _, s := range newSegments { + if s != nil { + // cleanup segments that were opened but not + // swapped into the new root + _ = s.Close() + } + } + }() for segmentID, path := range newSegmentPaths { newSegments[segmentID], err = zap.Open(path) if err != nil { - for _, s := range newSegments { - if s != nil { - _ = s.Close() // cleanup segments that were successfully opened - } - } return fmt.Errorf("error opening new segment at %s, %v", path, err) } } @@ -281,6 +277,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { cachedDocs: segmentSnapshot.cachedDocs, } newIndexSnapshot.segment[i] = newSegmentSnapshot + delete(newSegments, segmentSnapshot.id) // update items persisted incase of a new segment snapshot atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count()) } else { @@ -300,7 +297,19 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { _ = rootPrev.DecRef() } } - // allow files to become eligible for removal + + err = tx.Commit() + if err != nil { + return err + } + + err = s.rootBolt.Sync() + if err != nil { + return err + } + + // allow files to become eligible for removal after commit, such + // as file segments from snapshots that came from the merger s.rootLock.Lock() for _, filename := range filenames { delete(s.ineligibleForRemoval, filename)