0
0

Merge pull request #651 from steveyen/scorch

scorch ref-counting
This commit is contained in:
Marty Schoch 2017-12-13 18:01:13 -05:00 committed by GitHub
commit 8ffa978ce4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 145 additions and 25 deletions

View File

@ -72,6 +72,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
offsets: make([]uint64, len(s.root.segment)+1), offsets: make([]uint64, len(s.root.segment)+1),
internal: make(map[string][]byte, len(s.root.segment)), internal: make(map[string][]byte, len(s.root.segment)),
epoch: s.nextSnapshotEpoch, epoch: s.nextSnapshotEpoch,
refs: 1,
} }
s.nextSnapshotEpoch++ s.nextSnapshotEpoch++
@ -86,6 +87,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
if err != nil { if err != nil {
next.applied <- fmt.Errorf("error computing doc numbers: %v", err) next.applied <- fmt.Errorf("error computing doc numbers: %v", err)
close(next.applied) close(next.applied)
_ = newSnapshot.DecRef()
return err return err
} }
} }
@ -94,6 +96,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
segment: s.root.segment[i].segment, segment: s.root.segment[i].segment,
notify: s.root.segment[i].notify, notify: s.root.segment[i].notify,
} }
s.root.segment[i].segment.AddRef()
// apply new obsoletions // apply new obsoletions
if s.root.segment[i].deleted == nil { if s.root.segment[i].deleted == nil {
newSnapshot.segment[i].deleted = delta newSnapshot.segment[i].deleted = delta
@ -108,7 +111,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
// put new segment at end // put new segment at end
newSnapshot.segment[len(s.root.segment)] = &SegmentSnapshot{ newSnapshot.segment[len(s.root.segment)] = &SegmentSnapshot{
id: next.id, id: next.id,
segment: next.data, segment: next.data, // Take ownership of next.data's ref-count.
} }
newSnapshot.offsets[len(s.root.segment)] = running newSnapshot.offsets[len(s.root.segment)] = running
if !s.unsafeBatch { if !s.unsafeBatch {
@ -130,9 +133,15 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
} }
} }
// swap in new segment // swap in new segment
rootPrev := s.root
s.root = newSnapshot s.root = newSnapshot
// release lock // release lock
s.rootLock.Unlock() s.rootLock.Unlock()
if rootPrev != nil {
_ = rootPrev.DecRef()
}
close(next.applied) close(next.applied)
return nil return nil
@ -150,6 +159,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
offsets: make([]uint64, 0, newSize), offsets: make([]uint64, 0, newSize),
internal: make(map[string][]byte, len(s.root.segment)), internal: make(map[string][]byte, len(s.root.segment)),
epoch: s.nextSnapshotEpoch, epoch: s.nextSnapshotEpoch,
refs: 1,
} }
s.nextSnapshotEpoch++ s.nextSnapshotEpoch++
@ -182,6 +192,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
notify: s.root.segment[i].notify, notify: s.root.segment[i].notify,
deleted: s.root.segment[i].deleted, deleted: s.root.segment[i].deleted,
}) })
s.root.segment[i].segment.AddRef()
newSnapshot.offsets = append(newSnapshot.offsets, running) newSnapshot.offsets = append(newSnapshot.offsets, running)
running += s.root.segment[i].Count() running += s.root.segment[i].Count()
} }
@ -190,7 +201,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
// put new segment at end // put new segment at end
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{ newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
id: nextMerge.id, id: nextMerge.id,
segment: nextMerge.new, segment: nextMerge.new, // Take ownership for nextMerge.new's ref-count.
deleted: newSegmentDeleted, deleted: newSegmentDeleted,
}) })
newSnapshot.offsets = append(newSnapshot.offsets, running) newSnapshot.offsets = append(newSnapshot.offsets, running)
@ -201,10 +212,15 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
} }
// swap in new segment // swap in new segment
rootPrev := s.root
s.root = newSnapshot s.root = newSnapshot
// release lock // release lock
s.rootLock.Unlock() s.rootLock.Unlock()
if rootPrev != nil {
_ = rootPrev.DecRef()
}
// notify merger we incorporated this // notify merger we incorporated this
close(nextMerge.notify) close(nextMerge.notify)
} }

View File

@ -38,6 +38,7 @@ OUTER:
// check to see if there is a new snapshot to persist // check to see if there is a new snapshot to persist
s.rootLock.RLock() s.rootLock.RLock()
ourSnapshot := s.root ourSnapshot := s.root
ourSnapshot.AddRef()
s.rootLock.RUnlock() s.rootLock.RUnlock()
if ourSnapshot.epoch != lastEpochMergePlanned { if ourSnapshot.epoch != lastEpochMergePlanned {
@ -45,10 +46,12 @@ OUTER:
err := s.planMergeAtSnapshot(ourSnapshot) err := s.planMergeAtSnapshot(ourSnapshot)
if err != nil { if err != nil {
log.Printf("merging err: %v", err) log.Printf("merging err: %v", err)
_ = ourSnapshot.DecRef()
continue OUTER continue OUTER
} }
lastEpochMergePlanned = ourSnapshot.epoch lastEpochMergePlanned = ourSnapshot.epoch
} }
_ = ourSnapshot.DecRef()
// tell the persister we're waiting for changes // tell the persister we're waiting for changes
// first make a notification chan // first make a notification chan
@ -64,16 +67,19 @@ OUTER:
// check again // check again
s.rootLock.RLock() s.rootLock.RLock()
ourSnapshot = s.root ourSnapshot = s.root
ourSnapshot.AddRef()
s.rootLock.RUnlock() s.rootLock.RUnlock()
if ourSnapshot.epoch != lastEpochMergePlanned { if ourSnapshot.epoch != lastEpochMergePlanned {
// lets get started // lets get started
err := s.planMergeAtSnapshot(ourSnapshot) err := s.planMergeAtSnapshot(ourSnapshot)
if err != nil { if err != nil {
_ = ourSnapshot.DecRef()
continue OUTER continue OUTER
} }
lastEpochMergePlanned = ourSnapshot.epoch lastEpochMergePlanned = ourSnapshot.epoch
} }
_ = ourSnapshot.DecRef()
// now wait for it (but also detect close) // now wait for it (but also detect close)
select { select {

View File

@ -44,6 +44,7 @@ OUTER:
// check to see if there is a new snapshot to persist // check to see if there is a new snapshot to persist
s.rootLock.RLock() s.rootLock.RLock()
ourSnapshot := s.root ourSnapshot := s.root
ourSnapshot.AddRef()
s.rootLock.RUnlock() s.rootLock.RUnlock()
//for ourSnapshot.epoch != lastPersistedEpoch { //for ourSnapshot.epoch != lastPersistedEpoch {
@ -52,6 +53,7 @@ OUTER:
err := s.persistSnapshot(ourSnapshot) err := s.persistSnapshot(ourSnapshot)
if err != nil { if err != nil {
log.Printf("got err persisting snapshot: %v", err) log.Printf("got err persisting snapshot: %v", err)
_ = ourSnapshot.DecRef()
continue OUTER continue OUTER
} }
lastPersistedEpoch = ourSnapshot.epoch lastPersistedEpoch = ourSnapshot.epoch
@ -60,6 +62,7 @@ OUTER:
notify = nil notify = nil
} }
} }
_ = ourSnapshot.DecRef()
// tell the introducer we're waiting for changes // tell the introducer we're waiting for changes
// first make a notification chan // first make a notification chan
@ -75,13 +78,15 @@ OUTER:
// check again // check again
s.rootLock.RLock() s.rootLock.RLock()
ourSnapshot = s.root ourSnapshot = s.root
ourSnapshot.AddRef()
s.rootLock.RUnlock() s.rootLock.RUnlock()
if ourSnapshot.epoch != lastPersistedEpoch {
if ourSnapshot.epoch != lastPersistedEpoch {
// lets get started // lets get started
err := s.persistSnapshot(ourSnapshot) err := s.persistSnapshot(ourSnapshot)
if err != nil { if err != nil {
log.Printf("got err persisting snapshot: %v", err) log.Printf("got err persisting snapshot: %v", err)
_ = ourSnapshot.DecRef()
continue OUTER continue OUTER
} }
lastPersistedEpoch = ourSnapshot.epoch lastPersistedEpoch = ourSnapshot.epoch
@ -90,6 +95,7 @@ OUTER:
notify = nil notify = nil
} }
} }
_ = ourSnapshot.DecRef()
// now wait for it (but also detect close) // now wait for it (but also detect close)
select { select {
@ -199,6 +205,9 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
for segmentID, path := range newSegmentPaths { for segmentID, path := range newSegmentPaths {
newSegments[segmentID], err = zap.Open(path) newSegments[segmentID], err = zap.Open(path)
if err != nil { if err != nil {
for _, s := range newSegments {
_ = s.Close() // cleanup segments that were successfully opened
}
return fmt.Errorf("error opening new segment at %s, %v", path, err) return fmt.Errorf("error opening new segment at %s, %v", path, err)
} }
} }
@ -212,6 +221,7 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
segment: make([]*SegmentSnapshot, len(s.root.segment)), segment: make([]*SegmentSnapshot, len(s.root.segment)),
offsets: make([]uint64, len(s.root.offsets)), offsets: make([]uint64, len(s.root.offsets)),
internal: make(map[string][]byte, len(s.root.internal)), internal: make(map[string][]byte, len(s.root.internal)),
refs: 1,
} }
for i, segmentSnapshot := range s.root.segment { for i, segmentSnapshot := range s.root.segment {
// see if this segment has been replaced // see if this segment has been replaced
@ -228,15 +238,21 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
} }
} else { } else {
newIndexSnapshot.segment[i] = s.root.segment[i] newIndexSnapshot.segment[i] = s.root.segment[i]
newIndexSnapshot.segment[i].segment.AddRef()
} }
newIndexSnapshot.offsets[i] = s.root.offsets[i] newIndexSnapshot.offsets[i] = s.root.offsets[i]
} }
for k, v := range s.root.internal { for k, v := range s.root.internal {
newIndexSnapshot.internal[k] = v newIndexSnapshot.internal[k] = v
} }
rootPrev := s.root
s.root = newIndexSnapshot s.root = newIndexSnapshot
s.rootLock.Unlock() s.rootLock.Unlock()
if rootPrev != nil {
_ = rootPrev.DecRef()
}
// now that we've given up the lock, notify everyone that we've safely // now that we've given up the lock, notify everyone that we've safely
// persisted their data // persisted their data
for _, notification := range notifications { for _, notification := range notifications {
@ -263,17 +279,17 @@ func (s *Scorch) loadFromBolt() error {
for k, _ := c.Last(); k != nil; k, _ = c.Prev() { for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
_, snapshotEpoch, err := segment.DecodeUvarintAscending(k) _, snapshotEpoch, err := segment.DecodeUvarintAscending(k)
if err != nil { if err != nil {
log.Printf("unable to parse segment epoch % x, contiuing", k) log.Printf("unable to parse segment epoch %x, continuing", k)
continue continue
} }
snapshot := snapshots.Bucket(k) snapshot := snapshots.Bucket(k)
if snapshot == nil { if snapshot == nil {
log.Printf("snapshot key, but bucket missing % x, continuing", k) log.Printf("snapshot key, but bucket missing %x, continuing", k)
continue continue
} }
indexSnapshot, err := s.loadSnapshot(snapshot) indexSnapshot, err := s.loadSnapshot(snapshot)
if err != nil { if err != nil {
log.Printf("unable to load snapshot, %v continuing", err) log.Printf("unable to load snapshot, %v, continuing", err)
continue continue
} }
indexSnapshot.epoch = snapshotEpoch indexSnapshot.epoch = snapshotEpoch
@ -296,6 +312,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
rv := &IndexSnapshot{ rv := &IndexSnapshot{
internal: make(map[string][]byte), internal: make(map[string][]byte),
refs: 1,
} }
var running uint64 var running uint64
c := snapshot.Cursor() c := snapshot.Cursor()
@ -308,19 +325,23 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
return nil return nil
}) })
if err != nil { if err != nil {
_ = rv.DecRef()
return nil, err return nil, err
} }
} else { } else {
segmentBucket := snapshot.Bucket(k) segmentBucket := snapshot.Bucket(k)
if segmentBucket == nil { if segmentBucket == nil {
_ = rv.DecRef()
return nil, fmt.Errorf("segment key, but bucket missing % x", k) return nil, fmt.Errorf("segment key, but bucket missing % x", k)
} }
segmentSnapshot, err := s.loadSegment(segmentBucket) segmentSnapshot, err := s.loadSegment(segmentBucket)
if err != nil { if err != nil {
_ = rv.DecRef()
return nil, fmt.Errorf("failed to load segment: %v", err) return nil, fmt.Errorf("failed to load segment: %v", err)
} }
_, segmentSnapshot.id, err = segment.DecodeUvarintAscending(k) _, segmentSnapshot.id, err = segment.DecodeUvarintAscending(k)
if err != nil { if err != nil {
_ = rv.DecRef()
return nil, fmt.Errorf("failed to decode segment id: %v", err) return nil, fmt.Errorf("failed to decode segment id: %v", err)
} }
rv.segment = append(rv.segment, segmentSnapshot) rv.segment = append(rv.segment, segmentSnapshot)
@ -351,6 +372,7 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro
r := bytes.NewReader(deletedBytes) r := bytes.NewReader(deletedBytes)
_, err := deletedBitmap.ReadFrom(r) _, err := deletedBitmap.ReadFrom(r)
if err != nil { if err != nil {
_ = segment.Close()
return nil, fmt.Errorf("error reading deleted bytes: %v", err) return nil, fmt.Errorf("error reading deleted bytes: %v", err)
} }
rv.deleted = deletedBitmap rv.deleted = deletedBitmap

View File

@ -20,7 +20,7 @@ import (
) )
type Reader struct { type Reader struct {
root *IndexSnapshot root *IndexSnapshot // Owns 1 ref-count on the index snapshot.
} }
func (r *Reader) TermFieldReader(term []byte, field string, includeFreq, func (r *Reader) TermFieldReader(term []byte, field string, includeFreq,
@ -106,5 +106,5 @@ func (r *Reader) DumpFields() chan interface{} {
} }
func (r *Reader) Close() error { func (r *Reader) Close() error {
return nil return r.root.DecRef()
} }

View File

@ -646,6 +646,7 @@ func TestSegmentIndexAndLocalDocNumFromGlobal(t *testing.T) {
for _, test := range tests { for _, test := range tests {
i := &IndexSnapshot{ i := &IndexSnapshot{
offsets: test.offsets, offsets: test.offsets,
refs: 1,
} }
gotSegmentIndex, gotLocalDocNum := i.segmentIndexAndLocalDocNumFromGlobal(test.globalDocNum) gotSegmentIndex, gotLocalDocNum := i.segmentIndexAndLocalDocNumFromGlobal(test.globalDocNum)
if gotSegmentIndex != test.segmentIndex { if gotSegmentIndex != test.segmentIndex {
@ -654,5 +655,9 @@ func TestSegmentIndexAndLocalDocNumFromGlobal(t *testing.T) {
if gotLocalDocNum != test.localDocNum { if gotLocalDocNum != test.localDocNum {
t.Errorf("got localDocNum %d expected %d for offsets %v globalDocNum %d", gotLocalDocNum, test.localDocNum, test.offsets, test.globalDocNum) t.Errorf("got localDocNum %d expected %d for offsets %v globalDocNum %d", gotLocalDocNum, test.localDocNum, test.offsets, test.globalDocNum)
} }
err := i.DecRef()
if err != nil {
t.Errorf("expected no err, got: %v", err)
}
} }
} }

View File

@ -50,7 +50,7 @@ type Scorch struct {
unsafeBatch bool unsafeBatch bool
rootLock sync.RWMutex rootLock sync.RWMutex
root *IndexSnapshot root *IndexSnapshot // holds 1 ref-count on the root
closeCh chan struct{} closeCh chan struct{}
introductions chan *segmentIntroduction introductions chan *segmentIntroduction
@ -67,7 +67,7 @@ func NewScorch(storeName string, config map[string]interface{}, analysisQueue *i
config: config, config: config,
analysisQueue: analysisQueue, analysisQueue: analysisQueue,
stats: &Stats{}, stats: &Stats{},
root: &IndexSnapshot{}, root: &IndexSnapshot{refs: 1},
nextSnapshotEpoch: 1, nextSnapshotEpoch: 1,
} }
ro, ok := config["read_only"].(bool) ro, ok := config["read_only"].(bool)
@ -140,16 +140,12 @@ func (s *Scorch) Close() (err error) {
// wait for them to close // wait for them to close
s.asyncTasks.Wait() s.asyncTasks.Wait()
// now close the root bolt // now close the root bolt
if s.rootBolt != nil { if s.rootBolt != nil {
err = s.rootBolt.Close() err = s.rootBolt.Close()
s.rootLock.Lock() s.rootLock.Lock()
for _, segment := range s.root.segment { _ = s.root.DecRef()
cerr := segment.Close() s.root = nil
if err == nil { s.rootLock.Unlock()
err = cerr
}
}
} }
return return
@ -218,7 +214,12 @@ func (s *Scorch) Batch(batch *index.Batch) error {
} else { } else {
newSegment = mem.New() newSegment = mem.New()
} }
return s.prepareSegment(newSegment, ids, batch.InternalOps)
err := s.prepareSegment(newSegment, ids, batch.InternalOps)
if err != nil {
_ = newSegment.Close()
}
return err
} }
func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string, func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string,
@ -240,12 +241,13 @@ func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string,
// get read lock, to optimistically prepare obsoleted info // get read lock, to optimistically prepare obsoleted info
s.rootLock.RLock() s.rootLock.RLock()
for i := range s.root.segment { for _, seg := range s.root.segment {
delta, err := s.root.segment[i].segment.DocNumbers(ids) delta, err := seg.segment.DocNumbers(ids)
if err != nil { if err != nil {
s.rootLock.RUnlock()
return err return err
} }
introduction.obsoletes[s.root.segment[i].id] = delta introduction.obsoletes[seg.id] = delta
} }
s.rootLock.RUnlock() s.rootLock.RUnlock()
@ -280,10 +282,10 @@ func (s *Scorch) DeleteInternal(key []byte) error {
// release associated resources. // release associated resources.
func (s *Scorch) Reader() (index.IndexReader, error) { func (s *Scorch) Reader() (index.IndexReader, error) {
s.rootLock.RLock() s.rootLock.RLock()
defer s.rootLock.RUnlock() rv := &Reader{root: s.root}
return &Reader{ rv.root.AddRef()
root: s.root, s.rootLock.RUnlock()
}, nil return rv, nil
} }
func (s *Scorch) Stats() json.Marshaler { func (s *Scorch) Stats() json.Marshaler {

View File

@ -46,6 +46,13 @@ func (e *EmptySegment) Close() error {
return nil return nil
} }
func (e *EmptySegment) AddRef() {
}
func (e *EmptySegment) DecRef() error {
return nil
}
type EmptyDictionary struct{} type EmptyDictionary struct{}
func (e *EmptyDictionary) PostingsList(term string, func (e *EmptyDictionary) PostingsList(term string,

View File

@ -96,6 +96,13 @@ func New() *Segment {
} }
} }
func (s *Segment) AddRef() {
}
func (s *Segment) DecRef() error {
return nil
}
// Fields returns the field names used in this segment // Fields returns the field names used in this segment
func (s *Segment) Fields() []string { func (s *Segment) Fields() []string {
return s.FieldsInv return s.FieldsInv

View File

@ -35,6 +35,9 @@ type Segment interface {
Fields() []string Fields() []string
Close() error Close() error
AddRef()
DecRef() error
} }
type TermDictionary interface { type TermDictionary interface {

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"sync"
"github.com/RoaringBitmap/roaring" "github.com/RoaringBitmap/roaring"
"github.com/Smerity/govarint" "github.com/Smerity/govarint"
@ -47,6 +48,7 @@ func Open(path string) (segment.Segment, error) {
mm: mm, mm: mm,
path: path, path: path,
fieldsMap: make(map[string]uint16), fieldsMap: make(map[string]uint16),
refs: 1,
} }
err = rv.loadConfig() err = rv.loadConfig()
@ -79,6 +81,25 @@ type Segment struct {
fieldsMap map[string]uint16 fieldsMap map[string]uint16
fieldsInv []string fieldsInv []string
fieldsOffsets []uint64 fieldsOffsets []uint64
m sync.Mutex // Protects the fields that follow.
refs int64
}
func (s *Segment) AddRef() {
s.m.Lock()
s.refs++
s.m.Unlock()
}
func (s *Segment) DecRef() (err error) {
s.m.Lock()
s.refs--
if s.refs == 0 {
err = s.closeActual()
}
s.m.Unlock()
return err
} }
func (s *Segment) loadConfig() error { func (s *Segment) loadConfig() error {
@ -272,6 +293,10 @@ func (s *Segment) Path() string {
// Close releases all resources associated with this segment // Close releases all resources associated with this segment
func (s *Segment) Close() (err error) { func (s *Segment) Close() (err error) {
return s.DecRef()
}
func (s *Segment) closeActual() (err error) {
if s.mm != nil { if s.mm != nil {
err = s.mm.Unmap() err = s.mm.Unmap()
} }

View File

@ -20,6 +20,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"sort" "sort"
"sync"
"github.com/RoaringBitmap/roaring" "github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/document" "github.com/blevesearch/bleve/document"
@ -43,6 +44,32 @@ type IndexSnapshot struct {
offsets []uint64 offsets []uint64
internal map[string][]byte internal map[string][]byte
epoch uint64 epoch uint64
m sync.Mutex // Protects the fields that follow.
refs int64
}
func (i *IndexSnapshot) AddRef() {
i.m.Lock()
i.refs++
i.m.Unlock()
}
func (i *IndexSnapshot) DecRef() (err error) {
i.m.Lock()
i.refs--
if i.refs == 0 {
for _, s := range i.segment {
if s != nil {
err2 := s.segment.DecRef()
if err == nil {
err = err2
}
}
}
}
i.m.Unlock()
return err
} }
func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string, makeItr func(i segment.TermDictionary) segment.DictionaryIterator) (*IndexSnapshotFieldDict, error) { func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string, makeItr func(i segment.TermDictionary) segment.DictionaryIterator) (*IndexSnapshotFieldDict, error) {