From c13ff85aaf682079e549674d54eff7d59d16eb3c Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Wed, 13 Dec 2017 13:10:44 -0800 Subject: [PATCH] scorch ref-counting Future commits will provide actual cleanup when ref-counts reach 0. --- index/scorch/introducer.go | 20 ++++++++++++++-- index/scorch/merge.go | 6 +++++ index/scorch/persister.go | 30 ++++++++++++++++++++---- index/scorch/reader.go | 4 ++-- index/scorch/reader_test.go | 5 ++++ index/scorch/scorch.go | 36 +++++++++++++++-------------- index/scorch/segment/empty.go | 7 ++++++ index/scorch/segment/mem/segment.go | 7 ++++++ index/scorch/segment/segment.go | 3 +++ index/scorch/segment/zap/segment.go | 25 ++++++++++++++++++++ index/scorch/snapshot_index.go | 27 ++++++++++++++++++++++ 11 files changed, 145 insertions(+), 25 deletions(-) diff --git a/index/scorch/introducer.go b/index/scorch/introducer.go index 76c14551..9402e5a4 100644 --- a/index/scorch/introducer.go +++ b/index/scorch/introducer.go @@ -72,6 +72,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { offsets: make([]uint64, len(s.root.segment)+1), internal: make(map[string][]byte, len(s.root.segment)), epoch: s.nextSnapshotEpoch, + refs: 1, } s.nextSnapshotEpoch++ @@ -86,6 +87,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { if err != nil { next.applied <- fmt.Errorf("error computing doc numbers: %v", err) close(next.applied) + _ = newSnapshot.DecRef() return err } } @@ -94,6 +96,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { segment: s.root.segment[i].segment, notify: s.root.segment[i].notify, } + s.root.segment[i].segment.AddRef() // apply new obsoletions if s.root.segment[i].deleted == nil { newSnapshot.segment[i].deleted = delta @@ -108,7 +111,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { // put new segment at end newSnapshot.segment[len(s.root.segment)] = &SegmentSnapshot{ id: next.id, - segment: next.data, + segment: next.data, // Take ownership of next.data's ref-count. } newSnapshot.offsets[len(s.root.segment)] = running if !s.unsafeBatch { @@ -130,9 +133,15 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { } } // swap in new segment + rootPrev := s.root s.root = newSnapshot // release lock s.rootLock.Unlock() + + if rootPrev != nil { + _ = rootPrev.DecRef() + } + close(next.applied) return nil @@ -150,6 +159,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { offsets: make([]uint64, 0, newSize), internal: make(map[string][]byte, len(s.root.segment)), epoch: s.nextSnapshotEpoch, + refs: 1, } s.nextSnapshotEpoch++ @@ -182,6 +192,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { notify: s.root.segment[i].notify, deleted: s.root.segment[i].deleted, }) + s.root.segment[i].segment.AddRef() newSnapshot.offsets = append(newSnapshot.offsets, running) running += s.root.segment[i].Count() } @@ -190,7 +201,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { // put new segment at end newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{ id: nextMerge.id, - segment: nextMerge.new, + segment: nextMerge.new, // Take ownership for nextMerge.new's ref-count. deleted: newSegmentDeleted, }) newSnapshot.offsets = append(newSnapshot.offsets, running) @@ -201,10 +212,15 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { } // swap in new segment + rootPrev := s.root s.root = newSnapshot // release lock s.rootLock.Unlock() + if rootPrev != nil { + _ = rootPrev.DecRef() + } + // notify merger we incorporated this close(nextMerge.notify) } diff --git a/index/scorch/merge.go b/index/scorch/merge.go index aef21664..818f3dbd 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -38,6 +38,7 @@ OUTER: // check to see if there is a new snapshot to persist s.rootLock.RLock() ourSnapshot := s.root + ourSnapshot.AddRef() s.rootLock.RUnlock() if ourSnapshot.epoch != lastEpochMergePlanned { @@ -45,10 +46,12 @@ OUTER: err := s.planMergeAtSnapshot(ourSnapshot) if err != nil { log.Printf("merging err: %v", err) + _ = ourSnapshot.DecRef() continue OUTER } lastEpochMergePlanned = ourSnapshot.epoch } + _ = ourSnapshot.DecRef() // tell the persister we're waiting for changes // first make a notification chan @@ -64,16 +67,19 @@ OUTER: // check again s.rootLock.RLock() ourSnapshot = s.root + ourSnapshot.AddRef() s.rootLock.RUnlock() if ourSnapshot.epoch != lastEpochMergePlanned { // lets get started err := s.planMergeAtSnapshot(ourSnapshot) if err != nil { + _ = ourSnapshot.DecRef() continue OUTER } lastEpochMergePlanned = ourSnapshot.epoch } + _ = ourSnapshot.DecRef() // now wait for it (but also detect close) select { diff --git a/index/scorch/persister.go b/index/scorch/persister.go index d5484086..964e1db9 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -44,6 +44,7 @@ OUTER: // check to see if there is a new snapshot to persist s.rootLock.RLock() ourSnapshot := s.root + ourSnapshot.AddRef() s.rootLock.RUnlock() //for ourSnapshot.epoch != lastPersistedEpoch { @@ -52,6 +53,7 @@ OUTER: err := s.persistSnapshot(ourSnapshot) if err != nil { log.Printf("got err persisting snapshot: %v", err) + _ = ourSnapshot.DecRef() continue OUTER } lastPersistedEpoch = ourSnapshot.epoch @@ -60,6 +62,7 @@ OUTER: notify = nil } } + _ = ourSnapshot.DecRef() // tell the introducer we're waiting for changes // first make a notification chan @@ -75,13 +78,15 @@ OUTER: // check again s.rootLock.RLock() ourSnapshot = s.root + ourSnapshot.AddRef() s.rootLock.RUnlock() - if ourSnapshot.epoch != lastPersistedEpoch { + if ourSnapshot.epoch != lastPersistedEpoch { // lets get started err := s.persistSnapshot(ourSnapshot) if err != nil { log.Printf("got err persisting snapshot: %v", err) + _ = ourSnapshot.DecRef() continue OUTER } lastPersistedEpoch = ourSnapshot.epoch @@ -90,6 +95,7 @@ OUTER: notify = nil } } + _ = ourSnapshot.DecRef() // now wait for it (but also detect close) select { @@ -199,6 +205,9 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { for segmentID, path := range newSegmentPaths { newSegments[segmentID], err = zap.Open(path) if err != nil { + for _, s := range newSegments { + _ = s.Close() // cleanup segments that were successfully opened + } return fmt.Errorf("error opening new segment at %s, %v", path, err) } } @@ -212,6 +221,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { segment: make([]*SegmentSnapshot, len(s.root.segment)), offsets: make([]uint64, len(s.root.offsets)), internal: make(map[string][]byte, len(s.root.internal)), + refs: 1, } for i, segmentSnapshot := range s.root.segment { // see if this segment has been replaced @@ -228,15 +238,21 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { } } else { newIndexSnapshot.segment[i] = s.root.segment[i] + newIndexSnapshot.segment[i].segment.AddRef() } newIndexSnapshot.offsets[i] = s.root.offsets[i] } for k, v := range s.root.internal { newIndexSnapshot.internal[k] = v } + rootPrev := s.root s.root = newIndexSnapshot s.rootLock.Unlock() + if rootPrev != nil { + _ = rootPrev.DecRef() + } + // now that we've given up the lock, notify everyone that we've safely // persisted their data for _, notification := range notifications { @@ -263,17 +279,17 @@ func (s *Scorch) loadFromBolt() error { for k, _ := c.Last(); k != nil; k, _ = c.Prev() { _, snapshotEpoch, err := segment.DecodeUvarintAscending(k) if err != nil { - log.Printf("unable to parse segment epoch % x, contiuing", k) + log.Printf("unable to parse segment epoch %x, continuing", k) continue } snapshot := snapshots.Bucket(k) if snapshot == nil { - log.Printf("snapshot key, but bucket missing % x, continuing", k) + log.Printf("snapshot key, but bucket missing %x, continuing", k) continue } indexSnapshot, err := s.loadSnapshot(snapshot) if err != nil { - log.Printf("unable to load snapshot, %v continuing", err) + log.Printf("unable to load snapshot, %v, continuing", err) continue } indexSnapshot.epoch = snapshotEpoch @@ -296,6 +312,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { rv := &IndexSnapshot{ internal: make(map[string][]byte), + refs: 1, } var running uint64 c := snapshot.Cursor() @@ -308,19 +325,23 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { return nil }) if err != nil { + _ = rv.DecRef() return nil, err } } else { segmentBucket := snapshot.Bucket(k) if segmentBucket == nil { + _ = rv.DecRef() return nil, fmt.Errorf("segment key, but bucket missing % x", k) } segmentSnapshot, err := s.loadSegment(segmentBucket) if err != nil { + _ = rv.DecRef() return nil, fmt.Errorf("failed to load segment: %v", err) } _, segmentSnapshot.id, err = segment.DecodeUvarintAscending(k) if err != nil { + _ = rv.DecRef() return nil, fmt.Errorf("failed to decode segment id: %v", err) } rv.segment = append(rv.segment, segmentSnapshot) @@ -351,6 +372,7 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro r := bytes.NewReader(deletedBytes) _, err := deletedBitmap.ReadFrom(r) if err != nil { + _ = segment.Close() return nil, fmt.Errorf("error reading deleted bytes: %v", err) } rv.deleted = deletedBitmap diff --git a/index/scorch/reader.go b/index/scorch/reader.go index 9a20aa01..365ecb67 100644 --- a/index/scorch/reader.go +++ b/index/scorch/reader.go @@ -20,7 +20,7 @@ import ( ) type Reader struct { - root *IndexSnapshot + root *IndexSnapshot // Owns 1 ref-count on the index snapshot. } func (r *Reader) TermFieldReader(term []byte, field string, includeFreq, @@ -106,5 +106,5 @@ func (r *Reader) DumpFields() chan interface{} { } func (r *Reader) Close() error { - return nil + return r.root.DecRef() } diff --git a/index/scorch/reader_test.go b/index/scorch/reader_test.go index 2cd42fe4..4eb9b5fb 100644 --- a/index/scorch/reader_test.go +++ b/index/scorch/reader_test.go @@ -646,6 +646,7 @@ func TestSegmentIndexAndLocalDocNumFromGlobal(t *testing.T) { for _, test := range tests { i := &IndexSnapshot{ offsets: test.offsets, + refs: 1, } gotSegmentIndex, gotLocalDocNum := i.segmentIndexAndLocalDocNumFromGlobal(test.globalDocNum) if gotSegmentIndex != test.segmentIndex { @@ -654,5 +655,9 @@ func TestSegmentIndexAndLocalDocNumFromGlobal(t *testing.T) { if gotLocalDocNum != test.localDocNum { t.Errorf("got localDocNum %d expected %d for offsets %v globalDocNum %d", gotLocalDocNum, test.localDocNum, test.offsets, test.globalDocNum) } + err := i.DecRef() + if err != nil { + t.Errorf("expected no err, got: %v", err) + } } } diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 7373e6b4..d59563be 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -50,7 +50,7 @@ type Scorch struct { unsafeBatch bool rootLock sync.RWMutex - root *IndexSnapshot + root *IndexSnapshot // holds 1 ref-count on the root closeCh chan struct{} introductions chan *segmentIntroduction @@ -67,7 +67,7 @@ func NewScorch(storeName string, config map[string]interface{}, analysisQueue *i config: config, analysisQueue: analysisQueue, stats: &Stats{}, - root: &IndexSnapshot{}, + root: &IndexSnapshot{refs: 1}, nextSnapshotEpoch: 1, } ro, ok := config["read_only"].(bool) @@ -140,16 +140,12 @@ func (s *Scorch) Close() (err error) { // wait for them to close s.asyncTasks.Wait() // now close the root bolt - if s.rootBolt != nil { err = s.rootBolt.Close() s.rootLock.Lock() - for _, segment := range s.root.segment { - cerr := segment.Close() - if err == nil { - err = cerr - } - } + _ = s.root.DecRef() + s.root = nil + s.rootLock.Unlock() } return @@ -218,7 +214,12 @@ func (s *Scorch) Batch(batch *index.Batch) error { } else { newSegment = mem.New() } - return s.prepareSegment(newSegment, ids, batch.InternalOps) + + err := s.prepareSegment(newSegment, ids, batch.InternalOps) + if err != nil { + _ = newSegment.Close() + } + return err } func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string, @@ -240,12 +241,13 @@ func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string, // get read lock, to optimistically prepare obsoleted info s.rootLock.RLock() - for i := range s.root.segment { - delta, err := s.root.segment[i].segment.DocNumbers(ids) + for _, seg := range s.root.segment { + delta, err := seg.segment.DocNumbers(ids) if err != nil { + s.rootLock.RUnlock() return err } - introduction.obsoletes[s.root.segment[i].id] = delta + introduction.obsoletes[seg.id] = delta } s.rootLock.RUnlock() @@ -280,10 +282,10 @@ func (s *Scorch) DeleteInternal(key []byte) error { // release associated resources. func (s *Scorch) Reader() (index.IndexReader, error) { s.rootLock.RLock() - defer s.rootLock.RUnlock() - return &Reader{ - root: s.root, - }, nil + rv := &Reader{root: s.root} + rv.root.AddRef() + s.rootLock.RUnlock() + return rv, nil } func (s *Scorch) Stats() json.Marshaler { diff --git a/index/scorch/segment/empty.go b/index/scorch/segment/empty.go index 72419500..83454644 100644 --- a/index/scorch/segment/empty.go +++ b/index/scorch/segment/empty.go @@ -46,6 +46,13 @@ func (e *EmptySegment) Close() error { return nil } +func (e *EmptySegment) AddRef() { +} + +func (e *EmptySegment) DecRef() error { + return nil +} + type EmptyDictionary struct{} func (e *EmptyDictionary) PostingsList(term string, diff --git a/index/scorch/segment/mem/segment.go b/index/scorch/segment/mem/segment.go index cdbff583..75ff50cc 100644 --- a/index/scorch/segment/mem/segment.go +++ b/index/scorch/segment/mem/segment.go @@ -96,6 +96,13 @@ func New() *Segment { } } +func (s *Segment) AddRef() { +} + +func (s *Segment) DecRef() error { + return nil +} + // Fields returns the field names used in this segment func (s *Segment) Fields() []string { return s.FieldsInv diff --git a/index/scorch/segment/segment.go b/index/scorch/segment/segment.go index 6a9d7073..14b97ec8 100644 --- a/index/scorch/segment/segment.go +++ b/index/scorch/segment/segment.go @@ -35,6 +35,9 @@ type Segment interface { Fields() []string Close() error + + AddRef() + DecRef() error } type TermDictionary interface { diff --git a/index/scorch/segment/zap/segment.go b/index/scorch/segment/zap/segment.go index 9f80b703..49869907 100644 --- a/index/scorch/segment/zap/segment.go +++ b/index/scorch/segment/zap/segment.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "os" + "sync" "github.com/RoaringBitmap/roaring" "github.com/Smerity/govarint" @@ -47,6 +48,7 @@ func Open(path string) (segment.Segment, error) { mm: mm, path: path, fieldsMap: make(map[string]uint16), + refs: 1, } err = rv.loadConfig() @@ -79,6 +81,25 @@ type Segment struct { fieldsMap map[string]uint16 fieldsInv []string fieldsOffsets []uint64 + + m sync.Mutex // Protects the fields that follow. + refs int64 +} + +func (s *Segment) AddRef() { + s.m.Lock() + s.refs++ + s.m.Unlock() +} + +func (s *Segment) DecRef() (err error) { + s.m.Lock() + s.refs-- + if s.refs == 0 { + err = s.closeActual() + } + s.m.Unlock() + return err } func (s *Segment) loadConfig() error { @@ -272,6 +293,10 @@ func (s *Segment) Path() string { // Close releases all resources associated with this segment func (s *Segment) Close() (err error) { + return s.DecRef() +} + +func (s *Segment) closeActual() (err error) { if s.mm != nil { err = s.mm.Unmap() } diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index 10d208ef..19581f75 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -20,6 +20,7 @@ import ( "encoding/binary" "fmt" "sort" + "sync" "github.com/RoaringBitmap/roaring" "github.com/blevesearch/bleve/document" @@ -43,6 +44,32 @@ type IndexSnapshot struct { offsets []uint64 internal map[string][]byte epoch uint64 + + m sync.Mutex // Protects the fields that follow. + refs int64 +} + +func (i *IndexSnapshot) AddRef() { + i.m.Lock() + i.refs++ + i.m.Unlock() +} + +func (i *IndexSnapshot) DecRef() (err error) { + i.m.Lock() + i.refs-- + if i.refs == 0 { + for _, s := range i.segment { + if s != nil { + err2 := s.segment.DecRef() + if err == nil { + err = err2 + } + } + } + } + i.m.Unlock() + return err } func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string, makeItr func(i segment.TermDictionary) segment.DictionaryIterator) (*IndexSnapshotFieldDict, error) {