0
0
Fork 0

scorch ref-counting

Future commits will provide actual cleanup when ref-counts reach 0.
This commit is contained in:
Steve Yen 2017-12-13 13:10:44 -08:00
parent 50471003dc
commit c13ff85aaf
11 changed files with 145 additions and 25 deletions

View File

@ -72,6 +72,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
offsets: make([]uint64, len(s.root.segment)+1),
internal: make(map[string][]byte, len(s.root.segment)),
epoch: s.nextSnapshotEpoch,
refs: 1,
}
s.nextSnapshotEpoch++
@ -86,6 +87,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
if err != nil {
next.applied <- fmt.Errorf("error computing doc numbers: %v", err)
close(next.applied)
_ = newSnapshot.DecRef()
return err
}
}
@ -94,6 +96,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
segment: s.root.segment[i].segment,
notify: s.root.segment[i].notify,
}
s.root.segment[i].segment.AddRef()
// apply new obsoletions
if s.root.segment[i].deleted == nil {
newSnapshot.segment[i].deleted = delta
@ -108,7 +111,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
// put new segment at end
newSnapshot.segment[len(s.root.segment)] = &SegmentSnapshot{
id: next.id,
segment: next.data,
segment: next.data, // Take ownership of next.data's ref-count.
}
newSnapshot.offsets[len(s.root.segment)] = running
if !s.unsafeBatch {
@ -130,9 +133,15 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
}
}
// swap in new segment
rootPrev := s.root
s.root = newSnapshot
// release lock
s.rootLock.Unlock()
if rootPrev != nil {
_ = rootPrev.DecRef()
}
close(next.applied)
return nil
@ -150,6 +159,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
offsets: make([]uint64, 0, newSize),
internal: make(map[string][]byte, len(s.root.segment)),
epoch: s.nextSnapshotEpoch,
refs: 1,
}
s.nextSnapshotEpoch++
@ -182,6 +192,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
notify: s.root.segment[i].notify,
deleted: s.root.segment[i].deleted,
})
s.root.segment[i].segment.AddRef()
newSnapshot.offsets = append(newSnapshot.offsets, running)
running += s.root.segment[i].Count()
}
@ -190,7 +201,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
// put new segment at end
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
id: nextMerge.id,
segment: nextMerge.new,
segment: nextMerge.new, // Take ownership for nextMerge.new's ref-count.
deleted: newSegmentDeleted,
})
newSnapshot.offsets = append(newSnapshot.offsets, running)
@ -201,10 +212,15 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
}
// swap in new segment
rootPrev := s.root
s.root = newSnapshot
// release lock
s.rootLock.Unlock()
if rootPrev != nil {
_ = rootPrev.DecRef()
}
// notify merger we incorporated this
close(nextMerge.notify)
}

View File

@ -38,6 +38,7 @@ OUTER:
// check to see if there is a new snapshot to persist
s.rootLock.RLock()
ourSnapshot := s.root
ourSnapshot.AddRef()
s.rootLock.RUnlock()
if ourSnapshot.epoch != lastEpochMergePlanned {
@ -45,10 +46,12 @@ OUTER:
err := s.planMergeAtSnapshot(ourSnapshot)
if err != nil {
log.Printf("merging err: %v", err)
_ = ourSnapshot.DecRef()
continue OUTER
}
lastEpochMergePlanned = ourSnapshot.epoch
}
_ = ourSnapshot.DecRef()
// tell the persister we're waiting for changes
// first make a notification chan
@ -64,16 +67,19 @@ OUTER:
// check again
s.rootLock.RLock()
ourSnapshot = s.root
ourSnapshot.AddRef()
s.rootLock.RUnlock()
if ourSnapshot.epoch != lastEpochMergePlanned {
// lets get started
err := s.planMergeAtSnapshot(ourSnapshot)
if err != nil {
_ = ourSnapshot.DecRef()
continue OUTER
}
lastEpochMergePlanned = ourSnapshot.epoch
}
_ = ourSnapshot.DecRef()
// now wait for it (but also detect close)
select {

View File

@ -44,6 +44,7 @@ OUTER:
// check to see if there is a new snapshot to persist
s.rootLock.RLock()
ourSnapshot := s.root
ourSnapshot.AddRef()
s.rootLock.RUnlock()
//for ourSnapshot.epoch != lastPersistedEpoch {
@ -52,6 +53,7 @@ OUTER:
err := s.persistSnapshot(ourSnapshot)
if err != nil {
log.Printf("got err persisting snapshot: %v", err)
_ = ourSnapshot.DecRef()
continue OUTER
}
lastPersistedEpoch = ourSnapshot.epoch
@ -60,6 +62,7 @@ OUTER:
notify = nil
}
}
_ = ourSnapshot.DecRef()
// tell the introducer we're waiting for changes
// first make a notification chan
@ -75,13 +78,15 @@ OUTER:
// check again
s.rootLock.RLock()
ourSnapshot = s.root
ourSnapshot.AddRef()
s.rootLock.RUnlock()
if ourSnapshot.epoch != lastPersistedEpoch {
if ourSnapshot.epoch != lastPersistedEpoch {
// lets get started
err := s.persistSnapshot(ourSnapshot)
if err != nil {
log.Printf("got err persisting snapshot: %v", err)
_ = ourSnapshot.DecRef()
continue OUTER
}
lastPersistedEpoch = ourSnapshot.epoch
@ -90,6 +95,7 @@ OUTER:
notify = nil
}
}
_ = ourSnapshot.DecRef()
// now wait for it (but also detect close)
select {
@ -199,6 +205,9 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
for segmentID, path := range newSegmentPaths {
newSegments[segmentID], err = zap.Open(path)
if err != nil {
for _, s := range newSegments {
_ = s.Close() // cleanup segments that were successfully opened
}
return fmt.Errorf("error opening new segment at %s, %v", path, err)
}
}
@ -212,6 +221,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
segment: make([]*SegmentSnapshot, len(s.root.segment)),
offsets: make([]uint64, len(s.root.offsets)),
internal: make(map[string][]byte, len(s.root.internal)),
refs: 1,
}
for i, segmentSnapshot := range s.root.segment {
// see if this segment has been replaced
@ -228,15 +238,21 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
}
} else {
newIndexSnapshot.segment[i] = s.root.segment[i]
newIndexSnapshot.segment[i].segment.AddRef()
}
newIndexSnapshot.offsets[i] = s.root.offsets[i]
}
for k, v := range s.root.internal {
newIndexSnapshot.internal[k] = v
}
rootPrev := s.root
s.root = newIndexSnapshot
s.rootLock.Unlock()
if rootPrev != nil {
_ = rootPrev.DecRef()
}
// now that we've given up the lock, notify everyone that we've safely
// persisted their data
for _, notification := range notifications {
@ -263,17 +279,17 @@ func (s *Scorch) loadFromBolt() error {
for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
_, snapshotEpoch, err := segment.DecodeUvarintAscending(k)
if err != nil {
log.Printf("unable to parse segment epoch % x, contiuing", k)
log.Printf("unable to parse segment epoch %x, continuing", k)
continue
}
snapshot := snapshots.Bucket(k)
if snapshot == nil {
log.Printf("snapshot key, but bucket missing % x, continuing", k)
log.Printf("snapshot key, but bucket missing %x, continuing", k)
continue
}
indexSnapshot, err := s.loadSnapshot(snapshot)
if err != nil {
log.Printf("unable to load snapshot, %v continuing", err)
log.Printf("unable to load snapshot, %v, continuing", err)
continue
}
indexSnapshot.epoch = snapshotEpoch
@ -296,6 +312,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
rv := &IndexSnapshot{
internal: make(map[string][]byte),
refs: 1,
}
var running uint64
c := snapshot.Cursor()
@ -308,19 +325,23 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
return nil
})
if err != nil {
_ = rv.DecRef()
return nil, err
}
} else {
segmentBucket := snapshot.Bucket(k)
if segmentBucket == nil {
_ = rv.DecRef()
return nil, fmt.Errorf("segment key, but bucket missing % x", k)
}
segmentSnapshot, err := s.loadSegment(segmentBucket)
if err != nil {
_ = rv.DecRef()
return nil, fmt.Errorf("failed to load segment: %v", err)
}
_, segmentSnapshot.id, err = segment.DecodeUvarintAscending(k)
if err != nil {
_ = rv.DecRef()
return nil, fmt.Errorf("failed to decode segment id: %v", err)
}
rv.segment = append(rv.segment, segmentSnapshot)
@ -351,6 +372,7 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro
r := bytes.NewReader(deletedBytes)
_, err := deletedBitmap.ReadFrom(r)
if err != nil {
_ = segment.Close()
return nil, fmt.Errorf("error reading deleted bytes: %v", err)
}
rv.deleted = deletedBitmap

View File

@ -20,7 +20,7 @@ import (
)
type Reader struct {
root *IndexSnapshot
root *IndexSnapshot // Owns 1 ref-count on the index snapshot.
}
func (r *Reader) TermFieldReader(term []byte, field string, includeFreq,
@ -106,5 +106,5 @@ func (r *Reader) DumpFields() chan interface{} {
}
func (r *Reader) Close() error {
return nil
return r.root.DecRef()
}

View File

@ -646,6 +646,7 @@ func TestSegmentIndexAndLocalDocNumFromGlobal(t *testing.T) {
for _, test := range tests {
i := &IndexSnapshot{
offsets: test.offsets,
refs: 1,
}
gotSegmentIndex, gotLocalDocNum := i.segmentIndexAndLocalDocNumFromGlobal(test.globalDocNum)
if gotSegmentIndex != test.segmentIndex {
@ -654,5 +655,9 @@ func TestSegmentIndexAndLocalDocNumFromGlobal(t *testing.T) {
if gotLocalDocNum != test.localDocNum {
t.Errorf("got localDocNum %d expected %d for offsets %v globalDocNum %d", gotLocalDocNum, test.localDocNum, test.offsets, test.globalDocNum)
}
err := i.DecRef()
if err != nil {
t.Errorf("expected no err, got: %v", err)
}
}
}

View File

@ -50,7 +50,7 @@ type Scorch struct {
unsafeBatch bool
rootLock sync.RWMutex
root *IndexSnapshot
root *IndexSnapshot // holds 1 ref-count on the root
closeCh chan struct{}
introductions chan *segmentIntroduction
@ -67,7 +67,7 @@ func NewScorch(storeName string, config map[string]interface{}, analysisQueue *i
config: config,
analysisQueue: analysisQueue,
stats: &Stats{},
root: &IndexSnapshot{},
root: &IndexSnapshot{refs: 1},
nextSnapshotEpoch: 1,
}
ro, ok := config["read_only"].(bool)
@ -140,16 +140,12 @@ func (s *Scorch) Close() (err error) {
// wait for them to close
s.asyncTasks.Wait()
// now close the root bolt
if s.rootBolt != nil {
err = s.rootBolt.Close()
s.rootLock.Lock()
for _, segment := range s.root.segment {
cerr := segment.Close()
if err == nil {
err = cerr
}
}
_ = s.root.DecRef()
s.root = nil
s.rootLock.Unlock()
}
return
@ -218,7 +214,12 @@ func (s *Scorch) Batch(batch *index.Batch) error {
} else {
newSegment = mem.New()
}
return s.prepareSegment(newSegment, ids, batch.InternalOps)
err := s.prepareSegment(newSegment, ids, batch.InternalOps)
if err != nil {
_ = newSegment.Close()
}
return err
}
func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string,
@ -240,12 +241,13 @@ func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string,
// get read lock, to optimistically prepare obsoleted info
s.rootLock.RLock()
for i := range s.root.segment {
delta, err := s.root.segment[i].segment.DocNumbers(ids)
for _, seg := range s.root.segment {
delta, err := seg.segment.DocNumbers(ids)
if err != nil {
s.rootLock.RUnlock()
return err
}
introduction.obsoletes[s.root.segment[i].id] = delta
introduction.obsoletes[seg.id] = delta
}
s.rootLock.RUnlock()
@ -280,10 +282,10 @@ func (s *Scorch) DeleteInternal(key []byte) error {
// release associated resources.
func (s *Scorch) Reader() (index.IndexReader, error) {
s.rootLock.RLock()
defer s.rootLock.RUnlock()
return &Reader{
root: s.root,
}, nil
rv := &Reader{root: s.root}
rv.root.AddRef()
s.rootLock.RUnlock()
return rv, nil
}
func (s *Scorch) Stats() json.Marshaler {

View File

@ -46,6 +46,13 @@ func (e *EmptySegment) Close() error {
return nil
}
func (e *EmptySegment) AddRef() {
}
func (e *EmptySegment) DecRef() error {
return nil
}
type EmptyDictionary struct{}
func (e *EmptyDictionary) PostingsList(term string,

View File

@ -96,6 +96,13 @@ func New() *Segment {
}
}
func (s *Segment) AddRef() {
}
func (s *Segment) DecRef() error {
return nil
}
// Fields returns the field names used in this segment
func (s *Segment) Fields() []string {
return s.FieldsInv

View File

@ -35,6 +35,9 @@ type Segment interface {
Fields() []string
Close() error
AddRef()
DecRef() error
}
type TermDictionary interface {

View File

@ -20,6 +20,7 @@ import (
"fmt"
"io"
"os"
"sync"
"github.com/RoaringBitmap/roaring"
"github.com/Smerity/govarint"
@ -47,6 +48,7 @@ func Open(path string) (segment.Segment, error) {
mm: mm,
path: path,
fieldsMap: make(map[string]uint16),
refs: 1,
}
err = rv.loadConfig()
@ -79,6 +81,25 @@ type Segment struct {
fieldsMap map[string]uint16
fieldsInv []string
fieldsOffsets []uint64
m sync.Mutex // Protects the fields that follow.
refs int64
}
func (s *Segment) AddRef() {
s.m.Lock()
s.refs++
s.m.Unlock()
}
func (s *Segment) DecRef() (err error) {
s.m.Lock()
s.refs--
if s.refs == 0 {
err = s.closeActual()
}
s.m.Unlock()
return err
}
func (s *Segment) loadConfig() error {
@ -272,6 +293,10 @@ func (s *Segment) Path() string {
// Close releases all resources associated with this segment
func (s *Segment) Close() (err error) {
return s.DecRef()
}
func (s *Segment) closeActual() (err error) {
if s.mm != nil {
err = s.mm.Unmap()
}

View File

@ -20,6 +20,7 @@ import (
"encoding/binary"
"fmt"
"sort"
"sync"
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/document"
@ -43,6 +44,32 @@ type IndexSnapshot struct {
offsets []uint64
internal map[string][]byte
epoch uint64
m sync.Mutex // Protects the fields that follow.
refs int64
}
func (i *IndexSnapshot) AddRef() {
i.m.Lock()
i.refs++
i.m.Unlock()
}
func (i *IndexSnapshot) DecRef() (err error) {
i.m.Lock()
i.refs--
if i.refs == 0 {
for _, s := range i.segment {
if s != nil {
err2 := s.segment.DecRef()
if err == nil {
err = err2
}
}
}
}
i.m.Unlock()
return err
}
func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string, makeItr func(i segment.TermDictionary) segment.DictionaryIterator) (*IndexSnapshotFieldDict, error) {