0
0
bleve/index/firestorm/lookup.go
Steve Yen 7ae696d661 firestorm lookuper notified via batch
Previously, the firestorm.Batch() would notify the lookuper goroutine
on a document by document basis.  If the lookuper input channel became
full, then that would block the firestorm.Batch() operation.

With this change, lookuper is notified once, with a "batch" that is an
[]*InFlightItem.

This change also reuses that same []*InFlightItem to invoke the
compensator.MutateBatch().

This also has the advantage of only converting the docID's from string
to []byte just once, outside of the lock that's used by the
compensator.

Micro-benchmark of this change with null-firestorm bleve-blast does
not show large impact, neither degradation or improvement.
2016-01-02 12:21:24 -08:00

146 lines
3.6 KiB
Go

// Copyright (c) 2015 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package firestorm
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
const channelBufferSize = 1000
type Lookuper struct {
f *Firestorm
workChan chan []*InFlightItem
quit chan struct{}
closeWait sync.WaitGroup
tasksQueued uint64
tasksDone uint64
}
func NewLookuper(f *Firestorm) *Lookuper {
rv := Lookuper{
f: f,
workChan: make(chan []*InFlightItem, channelBufferSize),
quit: make(chan struct{}),
}
return &rv
}
func (l *Lookuper) NotifyBatch(items []*InFlightItem) {
atomic.AddUint64(&l.tasksQueued, 1)
l.workChan <- items
}
func (l *Lookuper) Start() {
l.closeWait.Add(1)
go l.run()
}
func (l *Lookuper) Stop() {
close(l.quit)
l.closeWait.Wait()
}
func (l *Lookuper) run() {
for {
select {
case <-l.quit:
logger.Printf("lookuper asked to quit")
l.closeWait.Done()
return
case items, ok := <-l.workChan:
if !ok {
logger.Printf("lookuper work channel closed unexpectedly, stopping")
return
}
l.lookupItems(items)
}
}
}
func (l *Lookuper) lookupItems(items []*InFlightItem) {
for _, item := range items {
l.lookup(item)
}
atomic.AddUint64(&l.tasksDone, 1)
}
func (l *Lookuper) lookup(item *InFlightItem) {
reader, err := l.f.store.Reader()
if err != nil {
logger.Printf("lookuper fatal: %v", err)
return
}
defer func() {
if cerr := reader.Close(); err == nil && cerr != nil {
err = cerr
}
}()
prefix := TermFreqPrefixFieldTermDocId(0, nil, item.docID)
logger.Printf("lookuper prefix - % x", prefix)
docNums := make(DocNumberList, 0)
err = visitPrefix(reader, prefix, func(key, val []byte) (bool, error) {
logger.Printf("lookuper sees key % x", key)
tfk, err := NewTermFreqRowKV(key, val)
if err != nil {
return false, err
}
docNum := tfk.DocNum()
docNums = append(docNums, docNum)
return true, nil
})
if err != nil {
logger.Printf("lookuper fatal: %v", err)
return
}
oldDocNums := make(DocNumberList, 0, len(docNums))
for _, docNum := range docNums {
if item.docNum == 0 || docNum < item.docNum {
oldDocNums = append(oldDocNums, docNum)
}
}
logger.Printf("lookup migrating '%s' - %d - oldDocNums: %v", item.docID, item.docNum, oldDocNums)
l.f.compensator.Migrate(item.docID, item.docNum, oldDocNums)
if len(oldDocNums) == 0 && item.docNum != 0 {
// this was an add, not an update
atomic.AddUint64(l.f.docCount, 1)
} else if len(oldDocNums) > 0 && item.docNum == 0 {
// this was a delete (and it previously existed)
atomic.AddUint64(l.f.docCount, ^uint64(0))
}
}
// this is not intended to be used publicly, only for unit tests
// which depend on consistency we no longer provide
func (l *Lookuper) waitTasksDone(d time.Duration) error {
timeout := time.After(d)
tick := time.Tick(100 * time.Millisecond)
for {
select {
// Got a timeout! fail with a timeout error
case <-timeout:
return fmt.Errorf("timeout")
// Got a tick, we should check on doSomething()
case <-tick:
queued := atomic.LoadUint64(&l.tasksQueued)
done := atomic.LoadUint64(&l.tasksDone)
if queued == done {
return nil
}
}
}
}