2015-08-25 20:52:42 +02:00
|
|
|
// Copyright (c) 2015 Couchbase, Inc.
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
|
|
|
// except in compliance with the License. You may obtain a copy of the License at
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the
|
|
|
|
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
|
|
|
// either express or implied. See the License for the specific language governing permissions
|
|
|
|
// and limitations under the License.
|
|
|
|
|
|
|
|
package firestorm
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/binary"
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
const DefaultDictUpdateThreshold = 10
|
|
|
|
|
|
|
|
var DefaultDictUpdateSleep = 1 * time.Second
|
|
|
|
|
|
|
|
type DictUpdater struct {
|
2016-03-20 16:02:13 +01:00
|
|
|
batchesStarted uint64
|
|
|
|
batchesFlushed uint64
|
|
|
|
|
2015-08-25 20:52:42 +02:00
|
|
|
f *Firestorm
|
|
|
|
dictUpdateSleep time.Duration
|
|
|
|
quit chan struct{}
|
2015-12-31 20:07:59 +01:00
|
|
|
incoming chan map[string]int64
|
2015-08-25 20:52:42 +02:00
|
|
|
|
|
|
|
mutex sync.RWMutex
|
|
|
|
workingSet map[string]int64
|
|
|
|
closeWait sync.WaitGroup
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewDictUpdater(f *Firestorm) *DictUpdater {
|
|
|
|
rv := DictUpdater{
|
|
|
|
f: f,
|
|
|
|
dictUpdateSleep: DefaultDictUpdateSleep,
|
|
|
|
workingSet: make(map[string]int64),
|
|
|
|
batchesStarted: 1,
|
|
|
|
quit: make(chan struct{}),
|
2015-12-31 20:07:59 +01:00
|
|
|
incoming: make(chan map[string]int64, 8),
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
|
|
|
return &rv
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DictUpdater) Notify(term string, usage int64) {
|
|
|
|
d.mutex.Lock()
|
|
|
|
defer d.mutex.Unlock()
|
|
|
|
d.workingSet[term] += usage
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DictUpdater) NotifyBatch(termUsages map[string]int64) {
|
2015-12-31 20:07:59 +01:00
|
|
|
d.incoming <- termUsages
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DictUpdater) Start() {
|
|
|
|
d.closeWait.Add(1)
|
2016-01-05 19:58:38 +01:00
|
|
|
go d.runIncoming()
|
2015-08-25 20:52:42 +02:00
|
|
|
go d.run()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DictUpdater) Stop() {
|
|
|
|
close(d.quit)
|
|
|
|
d.closeWait.Wait()
|
|
|
|
}
|
|
|
|
|
2015-12-31 20:07:59 +01:00
|
|
|
func (d *DictUpdater) runIncoming() {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-d.quit:
|
|
|
|
return
|
|
|
|
case termUsages, ok := <-d.incoming:
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
d.mutex.Lock()
|
|
|
|
for term, usage := range termUsages {
|
|
|
|
d.workingSet[term] += usage
|
|
|
|
}
|
|
|
|
d.mutex.Unlock()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-08-25 20:52:42 +02:00
|
|
|
func (d *DictUpdater) run() {
|
|
|
|
tick := time.Tick(d.dictUpdateSleep)
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-d.quit:
|
|
|
|
logger.Printf("dictionary updater asked to quit")
|
|
|
|
d.closeWait.Done()
|
|
|
|
return
|
|
|
|
case <-tick:
|
|
|
|
logger.Printf("dictionary updater ticked")
|
|
|
|
d.update()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DictUpdater) update() {
|
|
|
|
d.mutex.Lock()
|
|
|
|
oldWorkingSet := d.workingSet
|
|
|
|
d.workingSet = make(map[string]int64)
|
|
|
|
atomic.AddUint64(&d.batchesStarted, 1)
|
|
|
|
d.mutex.Unlock()
|
|
|
|
|
|
|
|
// open a writer
|
|
|
|
writer, err := d.f.store.Writer()
|
|
|
|
if err != nil {
|
2015-11-24 20:32:33 +01:00
|
|
|
_ = writer.Close()
|
2015-08-25 21:13:13 +02:00
|
|
|
logger.Printf("dict updater fatal: %v", err)
|
2015-08-25 20:52:42 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// prepare batch
|
|
|
|
wb := writer.NewBatch()
|
|
|
|
|
|
|
|
dictionaryTermDelta := make([]byte, 8)
|
|
|
|
for term, delta := range oldWorkingSet {
|
|
|
|
binary.LittleEndian.PutUint64(dictionaryTermDelta, uint64(delta))
|
|
|
|
wb.Merge([]byte(term), dictionaryTermDelta)
|
|
|
|
}
|
|
|
|
|
2015-10-28 16:26:01 +01:00
|
|
|
err = writer.ExecuteBatch(wb)
|
2015-08-25 20:52:42 +02:00
|
|
|
if err != nil {
|
2015-11-24 20:32:33 +01:00
|
|
|
_ = writer.Close()
|
2015-08-25 21:13:13 +02:00
|
|
|
logger.Printf("dict updater fatal: %v", err)
|
2015-08-25 20:52:42 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
atomic.AddUint64(&d.batchesFlushed, 1)
|
|
|
|
|
|
|
|
err = writer.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
// this is not intended to be used publicly, only for unit tests
|
|
|
|
// which depend on consistency we no longer provide
|
|
|
|
func (d *DictUpdater) waitTasksDone(dur time.Duration) error {
|
2015-11-24 20:32:33 +01:00
|
|
|
initial := atomic.LoadUint64(&d.batchesStarted)
|
2015-08-25 20:52:42 +02:00
|
|
|
timeout := time.After(dur)
|
|
|
|
tick := time.Tick(100 * time.Millisecond)
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
// Got a timeout! fail with a timeout error
|
|
|
|
case <-timeout:
|
2015-11-24 20:32:33 +01:00
|
|
|
flushed := atomic.LoadUint64(&d.batchesFlushed)
|
|
|
|
return fmt.Errorf("timeout, %d/%d", initial, flushed)
|
2015-08-25 20:52:42 +02:00
|
|
|
// Got a tick, we should check on doSomething()
|
|
|
|
case <-tick:
|
|
|
|
flushed := atomic.LoadUint64(&d.batchesFlushed)
|
2015-11-24 20:32:33 +01:00
|
|
|
if flushed > initial {
|
2015-08-25 20:52:42 +02:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|