0
0

firestorm DictUpdater NotifyBatch is more async

This commit is contained in:
Steve Yen 2015-12-31 11:07:59 -08:00
parent b241242465
commit 1c5b84911d
2 changed files with 32 additions and 5 deletions

View File

@ -25,6 +25,7 @@ type DictUpdater struct {
f *Firestorm f *Firestorm
dictUpdateSleep time.Duration dictUpdateSleep time.Duration
quit chan struct{} quit chan struct{}
incoming chan map[string]int64
mutex sync.RWMutex mutex sync.RWMutex
workingSet map[string]int64 workingSet map[string]int64
@ -41,6 +42,7 @@ func NewDictUpdater(f *Firestorm) *DictUpdater {
workingSet: make(map[string]int64), workingSet: make(map[string]int64),
batchesStarted: 1, batchesStarted: 1,
quit: make(chan struct{}), quit: make(chan struct{}),
incoming: make(chan map[string]int64, 8),
} }
return &rv return &rv
} }
@ -52,15 +54,12 @@ func (d *DictUpdater) Notify(term string, usage int64) {
} }
func (d *DictUpdater) NotifyBatch(termUsages map[string]int64) { func (d *DictUpdater) NotifyBatch(termUsages map[string]int64) {
d.mutex.Lock() d.incoming <- termUsages
defer d.mutex.Unlock()
for term, usage := range termUsages {
d.workingSet[term] += usage
}
} }
func (d *DictUpdater) Start() { func (d *DictUpdater) Start() {
d.closeWait.Add(1) d.closeWait.Add(1)
go d.runIncoming()
go d.run() go d.run()
} }
@ -69,6 +68,24 @@ func (d *DictUpdater) Stop() {
d.closeWait.Wait() d.closeWait.Wait()
} }
func (d *DictUpdater) runIncoming() {
for {
select {
case <-d.quit:
return
case termUsages, ok := <-d.incoming:
if !ok {
return
}
d.mutex.Lock()
for term, usage := range termUsages {
d.workingSet[term] += usage
}
d.mutex.Unlock()
}
}
}
func (d *DictUpdater) run() { func (d *DictUpdater) run() {
tick := time.Tick(d.dictUpdateSleep) tick := time.Tick(d.dictUpdateSleep)
for { for {

View File

@ -10,6 +10,7 @@
package firestorm package firestorm
import ( import (
"runtime"
"testing" "testing"
"github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/index"
@ -38,6 +39,9 @@ func TestDictUpdater(t *testing.T) {
f.(*Firestorm).dictUpdater.NotifyBatch(dictBatch) f.(*Firestorm).dictUpdater.NotifyBatch(dictBatch)
// invoke updater manually // invoke updater manually
for len(f.(*Firestorm).dictUpdater.incoming) > 0 {
runtime.Gosched()
}
f.(*Firestorm).dictUpdater.update() f.(*Firestorm).dictUpdater.update()
// assert that dictionary rows are correct // assert that dictionary rows are correct
@ -77,6 +81,9 @@ func TestDictUpdater(t *testing.T) {
f.(*Firestorm).dictUpdater.NotifyBatch(dictBatch) f.(*Firestorm).dictUpdater.NotifyBatch(dictBatch)
// invoke updater manually // invoke updater manually
for len(f.(*Firestorm).dictUpdater.incoming) > 0 {
runtime.Gosched()
}
f.(*Firestorm).dictUpdater.update() f.(*Firestorm).dictUpdater.update()
// assert that dictionary rows are correct // assert that dictionary rows are correct
@ -116,6 +123,9 @@ func TestDictUpdater(t *testing.T) {
f.(*Firestorm).dictUpdater.NotifyBatch(dictBatch) f.(*Firestorm).dictUpdater.NotifyBatch(dictBatch)
// invoke updater manually // invoke updater manually
for len(f.(*Firestorm).dictUpdater.incoming) > 0 {
runtime.Gosched()
}
f.(*Firestorm).dictUpdater.update() f.(*Firestorm).dictUpdater.update()
// assert that dictionary rows are correct // assert that dictionary rows are correct