0
0
Fork 0

add initial support for async error callback

This commit is contained in:
Marty Schoch 2018-01-05 15:29:01 -05:00
parent 038880571c
commit e756c7acf0
4 changed files with 20 additions and 6 deletions

View File

@ -16,6 +16,10 @@ package scorch
import "time"
// RegistryAsyncErrorCallbacks should be treated as read-only after
// process init()'ialization.
var RegistryAsyncErrorCallbacks = map[string]func(error){}
// RegistryEventCallbacks should be treated as read-only after
// process init()'ialization.
var RegistryEventCallbacks = map[string]func(Event){}

View File

@ -16,7 +16,6 @@ package scorch
import (
"fmt"
"log"
"os"
"sync/atomic"
"time"
@ -48,7 +47,7 @@ OUTER:
// lets get started
err := s.planMergeAtSnapshot(ourSnapshot)
if err != nil {
log.Printf("merging err: %v", err)
s.fireAsyncError(fmt.Errorf("merging err: %v", err))
_ = ourSnapshot.DecRef()
continue OUTER
}

View File

@ -74,7 +74,7 @@ OUTER:
close(ch)
}
if err != nil {
log.Printf("got err persisting snapshot: %v", err)
s.fireAsyncError(fmt.Errorf("got err persisting snapshot: %v", err))
_ = ourSnapshot.DecRef()
continue OUTER
}
@ -446,13 +446,13 @@ func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (s *Scorch) removeOldData() {
removed, err := s.removeOldBoltSnapshots()
if err != nil {
log.Printf("got err removing old bolt snapshots: %v", err)
s.fireAsyncError(fmt.Errorf("got err removing old bolt snapshots: %v", err))
}
if removed > 0 {
err = s.removeOldZapFiles()
if err != nil {
log.Printf("got err removing old zap files: %v", err)
s.fireAsyncError(fmt.Errorf("got err removing old zap files: %v", err))
}
}
}

View File

@ -72,7 +72,8 @@ type Scorch struct {
rootBolt *bolt.DB
asyncTasks sync.WaitGroup
onEvent func(event Event)
onEvent func(event Event)
onAsyncError func(err error)
}
func NewScorch(storeName string,
@ -100,6 +101,10 @@ func NewScorch(storeName string,
if ok {
rv.onEvent = RegistryEventCallbacks[ecbName]
}
aecbName, ok := config["asyncErrorCallbackName"].(string)
if ok {
rv.onAsyncError = RegistryAsyncErrorCallbacks[aecbName]
}
return rv, nil
}
@ -109,6 +114,12 @@ func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) {
}
}
func (s *Scorch) fireAsyncError(err error) {
if s.onAsyncError != nil {
s.onAsyncError(err)
}
}
func (s *Scorch) Open() error {
var ok bool
s.path, ok = s.config["path"].(string)