Merge pull request #710 from abhinavdangeti/scorch2
Adding onEvent callback support for scorch
This commit is contained in:
commit
780c3e9c43
|
@ -19,6 +19,7 @@ import (
|
|||
"log"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/blevesearch/bleve/index/scorch/mergeplan"
|
||||
|
@ -42,6 +43,8 @@ OUTER:
|
|||
s.rootLock.RUnlock()
|
||||
|
||||
if ourSnapshot.epoch != lastEpochMergePlanned {
|
||||
startTime := time.Now()
|
||||
|
||||
// lets get started
|
||||
err := s.planMergeAtSnapshot(ourSnapshot)
|
||||
if err != nil {
|
||||
|
@ -50,6 +53,9 @@ OUTER:
|
|||
continue OUTER
|
||||
}
|
||||
lastEpochMergePlanned = ourSnapshot.epoch
|
||||
|
||||
s.fireEvent(EventKindMergerProgress, time.Since(startTime))
|
||||
|
||||
}
|
||||
_ = ourSnapshot.DecRef()
|
||||
|
||||
|
@ -71,6 +77,8 @@ OUTER:
|
|||
s.rootLock.RUnlock()
|
||||
|
||||
if ourSnapshot.epoch != lastEpochMergePlanned {
|
||||
startTime := time.Now()
|
||||
|
||||
// lets get started
|
||||
err := s.planMergeAtSnapshot(ourSnapshot)
|
||||
if err != nil {
|
||||
|
@ -78,6 +86,8 @@ OUTER:
|
|||
continue OUTER
|
||||
}
|
||||
lastEpochMergePlanned = ourSnapshot.epoch
|
||||
|
||||
s.fireEvent(EventKindMergerProgress, time.Since(startTime))
|
||||
}
|
||||
_ = ourSnapshot.DecRef()
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment"
|
||||
|
@ -64,6 +65,8 @@ OUTER:
|
|||
s.rootLock.Unlock()
|
||||
|
||||
if ourSnapshot != nil {
|
||||
startTime := time.Now()
|
||||
|
||||
err := s.persistSnapshot(ourSnapshot)
|
||||
for _, ch := range ourPersisted {
|
||||
if err != nil {
|
||||
|
@ -90,6 +93,9 @@ OUTER:
|
|||
changed = true
|
||||
}
|
||||
s.rootLock.RUnlock()
|
||||
|
||||
s.fireEvent(EventKindPersisterProgress, time.Since(startTime))
|
||||
|
||||
if changed {
|
||||
continue OUTER
|
||||
}
|
||||
|
|
|
@ -64,9 +64,44 @@ type Scorch struct {
|
|||
|
||||
eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC.
|
||||
ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet.
|
||||
|
||||
onEvent func(event Event)
|
||||
}
|
||||
|
||||
func NewScorch(storeName string, config map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
|
||||
// Event represents the information provided in an OnEvent() callback.
|
||||
type Event struct {
|
||||
Kind EventKind
|
||||
Scorch *Scorch
|
||||
Duration time.Duration
|
||||
}
|
||||
|
||||
// EventKind represents an event code for OnEvent() callbacks.
|
||||
type EventKind int
|
||||
|
||||
// EventKindCLoseStart is fired when a Scorch.Close() has begun.
|
||||
var EventKindCloseStart = EventKind(1)
|
||||
|
||||
// EventKindClose is fired when a scorch index has been fully closed.
|
||||
var EventKindClose = EventKind(2)
|
||||
|
||||
// EventKindMergerProgress is fired when the merger has completed a
|
||||
// round of merge processing.
|
||||
var EventKindMergerProgress = EventKind(3)
|
||||
|
||||
// EventKindPersisterProgress is fired when the persister has completed
|
||||
// a round of persistence processing.
|
||||
var EventKindPersisterProgress = EventKind(4)
|
||||
|
||||
// EventKindBatchIntroductionStart is fired when Batch() is invoked which
|
||||
// introduces a new segment.
|
||||
var EventKindBatchIntroductionStart = EventKind(5)
|
||||
|
||||
// EventKindBatchIntroduction is fired when Batch() completes.
|
||||
var EventKindBatchIntroduction = EventKind(6)
|
||||
|
||||
func NewScorch(storeName string,
|
||||
config map[string]interface{},
|
||||
analysisQueue *index.AnalysisQueue) (index.Index, error) {
|
||||
rv := &Scorch{
|
||||
version: Version,
|
||||
config: config,
|
||||
|
@ -88,6 +123,16 @@ func NewScorch(storeName string, config map[string]interface{}, analysisQueue *i
|
|||
return rv, nil
|
||||
}
|
||||
|
||||
func (s *Scorch) SetEventCallback(f func(Event)) {
|
||||
s.onEvent = f
|
||||
}
|
||||
|
||||
func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) {
|
||||
if s.onEvent != nil {
|
||||
s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scorch) Open() error {
|
||||
var ok bool
|
||||
s.path, ok = s.config["path"].(string)
|
||||
|
@ -155,6 +200,13 @@ func (s *Scorch) Open() error {
|
|||
}
|
||||
|
||||
func (s *Scorch) Close() (err error) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
s.fireEvent(EventKindClose, time.Since(startTime))
|
||||
}()
|
||||
|
||||
s.fireEvent(EventKindCloseStart, 0)
|
||||
|
||||
// signal to async tasks we want to close
|
||||
close(s.closeCh)
|
||||
// wait for them to close
|
||||
|
@ -187,7 +239,11 @@ func (s *Scorch) Delete(id string) error {
|
|||
|
||||
// Batch applices a batch of changes to the index atomically
|
||||
func (s *Scorch) Batch(batch *index.Batch) error {
|
||||
analysisStart := time.Now()
|
||||
start := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.fireEvent(EventKindBatchIntroduction, time.Since(start))
|
||||
}()
|
||||
|
||||
resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
|
||||
|
||||
|
@ -229,7 +285,10 @@ func (s *Scorch) Batch(batch *index.Batch) error {
|
|||
}
|
||||
close(resultChan)
|
||||
|
||||
atomic.AddUint64(&s.stats.analysisTime, uint64(time.Since(analysisStart)))
|
||||
atomic.AddUint64(&s.stats.analysisTime, uint64(time.Since(start)))
|
||||
|
||||
// notify handlers that we're about to introduce a segment
|
||||
s.fireEvent(EventKindBatchIntroductionStart, 0)
|
||||
|
||||
var newSegment segment.Segment
|
||||
if len(analysisResults) > 0 {
|
||||
|
|
Loading…
Reference in New Issue