fix race condition in setting up event callbacks
previous approach used SetEventCallback method which allowed you to change the callback, unfotunately that also included times after the goroutines were started and potentially firing the callback. checking lock on this would be too expensive, so instead we go for an approach that allows callbacks to be registered by name during process init(), then upon opening up an index a string config key 'eventCallbackName' is used to look up the appropriate callback function. also, since this string config name is serializable, it fits into the existing bleve index metadata without any new issues.
This commit is contained in:
parent
57a075afdb
commit
6237479605
|
@ -0,0 +1,52 @@
|
|||
// Copyright (c) 2018 Couchbase, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package scorch
|
||||
|
||||
import "time"
|
||||
|
||||
// RegistryEventCallbacks should be treated as read-only after
|
||||
// process init()'ialization.
|
||||
var RegistryEventCallbacks = map[string]func(Event){}
|
||||
|
||||
// Event represents the information provided in an OnEvent() callback.
|
||||
type Event struct {
|
||||
Kind EventKind
|
||||
Scorch *Scorch
|
||||
Duration time.Duration
|
||||
}
|
||||
|
||||
// EventKind represents an event code for OnEvent() callbacks.
|
||||
type EventKind int
|
||||
|
||||
// EventKindCloseStart is fired when a Scorch.Close() has begun.
|
||||
var EventKindCloseStart = EventKind(1)
|
||||
|
||||
// EventKindClose is fired when a scorch index has been fully closed.
|
||||
var EventKindClose = EventKind(2)
|
||||
|
||||
// EventKindMergerProgress is fired when the merger has completed a
|
||||
// round of merge processing.
|
||||
var EventKindMergerProgress = EventKind(3)
|
||||
|
||||
// EventKindPersisterProgress is fired when the persister has completed
|
||||
// a round of persistence processing.
|
||||
var EventKindPersisterProgress = EventKind(4)
|
||||
|
||||
// EventKindBatchIntroductionStart is fired when Batch() is invoked which
|
||||
// introduces a new segment.
|
||||
var EventKindBatchIntroductionStart = EventKind(5)
|
||||
|
||||
// EventKindBatchIntroduction is fired when Batch() completes.
|
||||
var EventKindBatchIntroduction = EventKind(6)
|
|
@ -0,0 +1,73 @@
|
|||
// Copyright (c) 2018 Couchbase, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package scorch
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/blevesearch/bleve/document"
|
||||
"github.com/blevesearch/bleve/index"
|
||||
)
|
||||
|
||||
func TestEventBatchIntroductionStart(t *testing.T) {
|
||||
defer func() {
|
||||
err := DestroyTest()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
var count int
|
||||
RegistryEventCallbacks["test"] = func(e Event) {
|
||||
if e.Kind == EventKindBatchIntroductionStart {
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
ourConfig := make(map[string]interface{}, len(testConfig))
|
||||
for k, v := range testConfig {
|
||||
ourConfig[k] = v
|
||||
}
|
||||
ourConfig["eventCallbackName"] = "test"
|
||||
|
||||
analysisQueue := index.NewAnalysisQueue(1)
|
||||
idx, err := NewScorch(Name, ourConfig, analysisQueue)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = idx.Open()
|
||||
if err != nil {
|
||||
t.Fatalf("error opening index: %v", err)
|
||||
}
|
||||
|
||||
doc := document.NewDocument("1")
|
||||
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test")))
|
||||
err = idx.Update(doc)
|
||||
if err != nil {
|
||||
t.Errorf("Error updating index: %v", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err := idx.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
if count != 1 {
|
||||
t.Fatalf("expected to see 1 batch introduction event event, saw %d", count)
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
// Copyright (c) 2017 Couchbase, Inc.
|
||||
// Copyright (c) 2018 Couchbase, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
@ -75,37 +75,6 @@ type Scorch struct {
|
|||
onEvent func(event Event)
|
||||
}
|
||||
|
||||
// Event represents the information provided in an OnEvent() callback.
|
||||
type Event struct {
|
||||
Kind EventKind
|
||||
Scorch *Scorch
|
||||
Duration time.Duration
|
||||
}
|
||||
|
||||
// EventKind represents an event code for OnEvent() callbacks.
|
||||
type EventKind int
|
||||
|
||||
// EventKindCLoseStart is fired when a Scorch.Close() has begun.
|
||||
var EventKindCloseStart = EventKind(1)
|
||||
|
||||
// EventKindClose is fired when a scorch index has been fully closed.
|
||||
var EventKindClose = EventKind(2)
|
||||
|
||||
// EventKindMergerProgress is fired when the merger has completed a
|
||||
// round of merge processing.
|
||||
var EventKindMergerProgress = EventKind(3)
|
||||
|
||||
// EventKindPersisterProgress is fired when the persister has completed
|
||||
// a round of persistence processing.
|
||||
var EventKindPersisterProgress = EventKind(4)
|
||||
|
||||
// EventKindBatchIntroductionStart is fired when Batch() is invoked which
|
||||
// introduces a new segment.
|
||||
var EventKindBatchIntroductionStart = EventKind(5)
|
||||
|
||||
// EventKindBatchIntroduction is fired when Batch() completes.
|
||||
var EventKindBatchIntroduction = EventKind(6)
|
||||
|
||||
func NewScorch(storeName string,
|
||||
config map[string]interface{},
|
||||
analysisQueue *index.AnalysisQueue) (index.Index, error) {
|
||||
|
@ -127,13 +96,13 @@ func NewScorch(storeName string,
|
|||
if ok {
|
||||
rv.unsafeBatch = ub
|
||||
}
|
||||
ecbName, ok := config["eventCallbackName"].(string)
|
||||
if ok {
|
||||
rv.onEvent = RegistryEventCallbacks[ecbName]
|
||||
}
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
func (s *Scorch) SetEventCallback(f func(Event)) {
|
||||
s.onEvent = f
|
||||
}
|
||||
|
||||
func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) {
|
||||
if s.onEvent != nil {
|
||||
s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur})
|
||||
|
|
Loading…
Reference in New Issue