0
0
Fork 0

Adding onEvent callback support for scorch

Event types:
- EventKindCloseStart
- EventKindClose
- EventKindMergerProgress
- EventKindPersisterProgress
- EventKindBatchIntroductionStart
- EventKindBatchIntroduction
This commit is contained in:
abhinavdangeti 2017-12-26 19:11:14 -07:00
parent a475ee886d
commit 055d3e12df
3 changed files with 78 additions and 3 deletions

View File

@ -19,6 +19,7 @@ import (
"log"
"os"
"sync/atomic"
"time"
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/index/scorch/mergeplan"
@ -42,6 +43,8 @@ OUTER:
s.rootLock.RUnlock()
if ourSnapshot.epoch != lastEpochMergePlanned {
startTime := time.Now()
// lets get started
err := s.planMergeAtSnapshot(ourSnapshot)
if err != nil {
@ -50,6 +53,9 @@ OUTER:
continue OUTER
}
lastEpochMergePlanned = ourSnapshot.epoch
s.fireEvent(EventKindMergerProgress, time.Since(startTime))
}
_ = ourSnapshot.DecRef()
@ -71,6 +77,8 @@ OUTER:
s.rootLock.RUnlock()
if ourSnapshot.epoch != lastEpochMergePlanned {
startTime := time.Now()
// lets get started
err := s.planMergeAtSnapshot(ourSnapshot)
if err != nil {
@ -78,6 +86,8 @@ OUTER:
continue OUTER
}
lastEpochMergePlanned = ourSnapshot.epoch
s.fireEvent(EventKindMergerProgress, time.Since(startTime))
}
_ = ourSnapshot.DecRef()

View File

@ -25,6 +25,7 @@ import (
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/index/scorch/segment"
@ -64,6 +65,8 @@ OUTER:
s.rootLock.Unlock()
if ourSnapshot != nil {
startTime := time.Now()
err := s.persistSnapshot(ourSnapshot)
for _, ch := range ourPersisted {
if err != nil {
@ -90,6 +93,9 @@ OUTER:
changed = true
}
s.rootLock.RUnlock()
s.fireEvent(EventKindPersisterProgress, time.Since(startTime))
if changed {
continue OUTER
}

View File

@ -64,9 +64,44 @@ type Scorch struct {
eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC.
ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet.
onEvent func(event Event)
}
func NewScorch(storeName string, config map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
// Event represents the information provided in an OnEvent() callback.
type Event struct {
Kind EventKind
Scorch *Scorch
Duration time.Duration
}
// EventKind represents an event code for OnEvent() callbacks.
type EventKind int
// EventKindCLoseStart is fired when a Scorch.Close() has begun.
var EventKindCloseStart = EventKind(1)
// EventKindClose is fired when a scorch index has been fully closed.
var EventKindClose = EventKind(2)
// EventKindMergerProgress is fired when the merger has completed a
// round of merge processing.
var EventKindMergerProgress = EventKind(3)
// EventKindPersisterProgress is fired when the persister has completed
// a round of persistence processing.
var EventKindPersisterProgress = EventKind(4)
// EventKindBatchIntroductionStart is fired when Batch() is invoked which
// introduces a new segment.
var EventKindBatchIntroductionStart = EventKind(5)
// EventKindBatchIntroduction is fired when Batch() completes.
var EventKindBatchIntroduction = EventKind(6)
func NewScorch(storeName string,
config map[string]interface{},
analysisQueue *index.AnalysisQueue) (index.Index, error) {
rv := &Scorch{
version: Version,
config: config,
@ -88,6 +123,16 @@ func NewScorch(storeName string, config map[string]interface{}, analysisQueue *i
return rv, nil
}
func (s *Scorch) SetEventCallback(f func(Event)) {
s.onEvent = f
}
func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) {
if s.onEvent != nil {
s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur})
}
}
func (s *Scorch) Open() error {
var ok bool
s.path, ok = s.config["path"].(string)
@ -155,6 +200,13 @@ func (s *Scorch) Open() error {
}
func (s *Scorch) Close() (err error) {
startTime := time.Now()
defer func() {
s.fireEvent(EventKindClose, time.Since(startTime))
}()
s.fireEvent(EventKindCloseStart, 0)
// signal to async tasks we want to close
close(s.closeCh)
// wait for them to close
@ -187,7 +239,11 @@ func (s *Scorch) Delete(id string) error {
// Batch applices a batch of changes to the index atomically
func (s *Scorch) Batch(batch *index.Batch) error {
analysisStart := time.Now()
start := time.Now()
defer func() {
s.fireEvent(EventKindBatchIntroduction, time.Since(start))
}()
resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
@ -229,7 +285,10 @@ func (s *Scorch) Batch(batch *index.Batch) error {
}
close(resultChan)
atomic.AddUint64(&s.stats.analysisTime, uint64(time.Since(analysisStart)))
atomic.AddUint64(&s.stats.analysisTime, uint64(time.Since(start)))
// notify handlers that we're about to introduce a segment
s.fireEvent(EventKindBatchIntroductionStart, 0)
var newSegment segment.Segment
if len(analysisResults) > 0 {