parent
3dc64de478
commit
724684a4f1
|
@ -22,6 +22,9 @@ const DefaultDictUpdateThreshold = 10
|
||||||
var DefaultDictUpdateSleep = 1 * time.Second
|
var DefaultDictUpdateSleep = 1 * time.Second
|
||||||
|
|
||||||
type DictUpdater struct {
|
type DictUpdater struct {
|
||||||
|
batchesStarted uint64
|
||||||
|
batchesFlushed uint64
|
||||||
|
|
||||||
f *Firestorm
|
f *Firestorm
|
||||||
dictUpdateSleep time.Duration
|
dictUpdateSleep time.Duration
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
|
@ -30,9 +33,6 @@ type DictUpdater struct {
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
workingSet map[string]int64
|
workingSet map[string]int64
|
||||||
closeWait sync.WaitGroup
|
closeWait sync.WaitGroup
|
||||||
|
|
||||||
batchesStarted uint64
|
|
||||||
batchesFlushed uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDictUpdater(f *Firestorm) *DictUpdater {
|
func NewDictUpdater(f *Firestorm) *DictUpdater {
|
||||||
|
|
|
@ -27,14 +27,15 @@ const Name = "firestorm"
|
||||||
var UnsafeBatchUseDetected = fmt.Errorf("bleve.Batch is NOT thread-safe, modification after execution detected")
|
var UnsafeBatchUseDetected = fmt.Errorf("bleve.Batch is NOT thread-safe, modification after execution detected")
|
||||||
|
|
||||||
type Firestorm struct {
|
type Firestorm struct {
|
||||||
|
highDocNumber uint64
|
||||||
|
docCount uint64
|
||||||
|
|
||||||
storeName string
|
storeName string
|
||||||
storeConfig map[string]interface{}
|
storeConfig map[string]interface{}
|
||||||
store store.KVStore
|
store store.KVStore
|
||||||
compensator *Compensator
|
compensator *Compensator
|
||||||
analysisQueue *index.AnalysisQueue
|
analysisQueue *index.AnalysisQueue
|
||||||
fieldCache *index.FieldCache
|
fieldCache *index.FieldCache
|
||||||
highDocNumber uint64
|
|
||||||
docCount *uint64
|
|
||||||
garbageCollector *GarbageCollector
|
garbageCollector *GarbageCollector
|
||||||
lookuper *Lookuper
|
lookuper *Lookuper
|
||||||
dictUpdater *DictUpdater
|
dictUpdater *DictUpdater
|
||||||
|
@ -42,14 +43,13 @@ type Firestorm struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFirestorm(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
|
func NewFirestorm(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
|
||||||
initialCount := uint64(0)
|
|
||||||
rv := Firestorm{
|
rv := Firestorm{
|
||||||
storeName: storeName,
|
storeName: storeName,
|
||||||
storeConfig: storeConfig,
|
storeConfig: storeConfig,
|
||||||
compensator: NewCompensator(),
|
compensator: NewCompensator(),
|
||||||
analysisQueue: analysisQueue,
|
analysisQueue: analysisQueue,
|
||||||
fieldCache: index.NewFieldCache(),
|
fieldCache: index.NewFieldCache(),
|
||||||
docCount: &initialCount,
|
docCount: 0,
|
||||||
highDocNumber: 0,
|
highDocNumber: 0,
|
||||||
stats: &indexStat{},
|
stats: &indexStat{},
|
||||||
}
|
}
|
||||||
|
@ -130,7 +130,7 @@ func (f *Firestorm) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Firestorm) DocCount() (uint64, error) {
|
func (f *Firestorm) DocCount() (uint64, error) {
|
||||||
count := atomic.LoadUint64(f.docCount)
|
count := atomic.LoadUint64(&f.docCount)
|
||||||
return count, nil
|
return count, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,13 +19,13 @@ import (
|
||||||
const channelBufferSize = 1000
|
const channelBufferSize = 1000
|
||||||
|
|
||||||
type Lookuper struct {
|
type Lookuper struct {
|
||||||
|
tasksQueued uint64
|
||||||
|
tasksDone uint64
|
||||||
|
|
||||||
f *Firestorm
|
f *Firestorm
|
||||||
workChan chan []*InFlightItem
|
workChan chan []*InFlightItem
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
closeWait sync.WaitGroup
|
closeWait sync.WaitGroup
|
||||||
|
|
||||||
tasksQueued uint64
|
|
||||||
tasksDone uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLookuper(f *Firestorm) *Lookuper {
|
func NewLookuper(f *Firestorm) *Lookuper {
|
||||||
|
@ -117,10 +117,10 @@ func (l *Lookuper) lookup(item *InFlightItem) {
|
||||||
l.f.compensator.Migrate(item.docID, item.docNum, oldDocNums)
|
l.f.compensator.Migrate(item.docID, item.docNum, oldDocNums)
|
||||||
if len(oldDocNums) == 0 && item.docNum != 0 {
|
if len(oldDocNums) == 0 && item.docNum != 0 {
|
||||||
// this was an add, not an update
|
// this was an add, not an update
|
||||||
atomic.AddUint64(l.f.docCount, 1)
|
atomic.AddUint64(&l.f.docCount, 1)
|
||||||
} else if len(oldDocNums) > 0 && item.docNum == 0 {
|
} else if len(oldDocNums) > 0 && item.docNum == 0 {
|
||||||
// this was a delete (and it previously existed)
|
// this was a delete (and it previously existed)
|
||||||
atomic.AddUint64(l.f.docCount, ^uint64(0))
|
atomic.AddUint64(&l.f.docCount, ^uint64(0))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -90,7 +90,7 @@ func (f *Firestorm) warmup(reader store.KVReader) error {
|
||||||
lastDocNumbers = append(lastDocNumbers, docNum)
|
lastDocNumbers = append(lastDocNumbers, docNum)
|
||||||
} else {
|
} else {
|
||||||
// new doc id
|
// new doc id
|
||||||
atomic.AddUint64(f.docCount, 1)
|
atomic.AddUint64(&f.docCount, 1)
|
||||||
|
|
||||||
// last docID had multiple doc numbers
|
// last docID had multiple doc numbers
|
||||||
if len(lastDocNumbers) > 1 {
|
if len(lastDocNumbers) > 1 {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user