0
0

Merge pull request #305 from steveyen/WIP-perf-20160102

perf 20160102
This commit is contained in:
Marty Schoch 2016-01-05 08:54:47 -05:00
commit db7363fba1
12 changed files with 318 additions and 198 deletions

View File

@ -55,28 +55,28 @@ func (tfs TokenFrequencies) MergeAll(remoteField string, other TokenFrequencies)
func TokenFrequency(tokens TokenStream, arrayPositions []uint64) TokenFrequencies {
rv := make(map[string]*TokenFreq, len(tokens))
tls := make([]TokenLocation, len(tokens))
tlNext := 0
for _, token := range tokens {
tls[tlNext] = TokenLocation{
ArrayPositions: arrayPositions,
Start: token.Start,
End: token.End,
Position: token.Position,
}
curr, ok := rv[string(token.Term)]
if ok {
curr.Locations = append(curr.Locations, &TokenLocation{
ArrayPositions: arrayPositions,
Start: token.Start,
End: token.End,
Position: token.Position,
})
curr.Locations = append(curr.Locations, &tls[tlNext])
} else {
rv[string(token.Term)] = &TokenFreq{
Term: token.Term,
Locations: []*TokenLocation{
&TokenLocation{
ArrayPositions: arrayPositions,
Start: token.Start,
End: token.End,
Position: token.Position,
},
},
Term: token.Term,
Locations: []*TokenLocation{&tls[tlNext]},
}
}
tlNext++
}
return rv

View File

@ -26,29 +26,82 @@ func NewUnicodeTokenizer() *UnicodeTokenizer {
}
func (rt *UnicodeTokenizer) Tokenize(input []byte) analysis.TokenStream {
rvx := make([]analysis.TokenStream, 0, 10) // When rv gets full, append to rvx.
rv := make(analysis.TokenStream, 0, 1)
rv := make(analysis.TokenStream, 0)
ta := []analysis.Token(nil)
taNext := 0
segmenter := segment.NewWordSegmenterDirect(input)
start := 0
pos := 1
guessRemaining := func(end int) int {
avgSegmentLen := end / (len(rv) + 1)
if avgSegmentLen < 1 {
avgSegmentLen = 1
}
remainingLen := len(input) - end
return remainingLen / avgSegmentLen
}
for segmenter.Segment() {
segmentBytes := segmenter.Bytes()
end := start + len(segmentBytes)
if segmenter.Type() != segment.None {
token := analysis.Token{
Term: segmentBytes,
Start: start,
End: end,
Position: pos,
Type: convertType(segmenter.Type()),
if taNext >= len(ta) {
remainingSegments := guessRemaining(end)
if remainingSegments > 1000 {
remainingSegments = 1000
}
if remainingSegments < 1 {
remainingSegments = 1
}
ta = make([]analysis.Token, remainingSegments)
taNext = 0
}
rv = append(rv, &token)
token := &ta[taNext]
taNext++
token.Term = segmentBytes
token.Start = start
token.End = end
token.Position = pos
token.Type = convertType(segmenter.Type())
if len(rv) >= cap(rv) { // When rv is full, save it into rvx.
rvx = append(rvx, rv)
rvCap := cap(rv) * 2
if rvCap > 256 {
rvCap = 256
}
rv = make(analysis.TokenStream, 0, rvCap) // Next rv cap is bigger.
}
rv = append(rv, token)
pos++
}
start = end
}
if len(rvx) > 0 {
n := len(rv)
for _, r := range rvx {
n += len(r)
}
rall := make(analysis.TokenStream, 0, n)
for _, r := range rvx {
rall = append(rall, r...)
}
return append(rall, rv...)
}
return rv
}

View File

@ -24,22 +24,24 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult {
Rows: make([]index.IndexRow, 0, 100),
}
docIDBytes := []byte(d.ID)
// add the _id row
rv.Rows = append(rv.Rows, NewTermFreqRow(0, nil, docIDBytes, d.Number, 0, 0, nil))
// information we collate as we merge fields with same name
fieldTermFreqs := make(map[uint16]analysis.TokenFrequencies)
fieldLengths := make(map[uint16]int)
fieldIncludeTermVectors := make(map[uint16]bool)
fieldNames := make(map[uint16]string)
for _, field := range d.Fields {
analyzeField := func(field document.Field, storable bool) {
fieldIndex, newFieldRow := f.fieldIndexOrNewRow(field.Name())
if newFieldRow != nil {
rv.Rows = append(rv.Rows, newFieldRow)
}
fieldNames[fieldIndex] = field.Name()
// add the _id row
rv.Rows = append(rv.Rows, NewTermFreqRow(0, nil, []byte(d.ID), d.Number, 0, 0, nil))
if field.Options().IsIndexed() {
fieldLength, tokenFreqs := field.Analyze()
existingFreqs := fieldTermFreqs[fieldIndex]
@ -53,69 +55,74 @@ func (f *Firestorm) Analyze(d *document.Document) *index.AnalysisResult {
fieldIncludeTermVectors[fieldIndex] = field.Options().IncludeTermVectors()
}
if field.Options().IsStored() {
storeRow := f.storeField(d.ID, d.Number, field, fieldIndex)
if storable && field.Options().IsStored() {
storeRow := f.storeField(docIDBytes, d.Number, field, fieldIndex)
rv.Rows = append(rv.Rows, storeRow)
}
}
for _, field := range d.Fields {
analyzeField(field, true)
}
for fieldIndex, tokenFreqs := range fieldTermFreqs {
// see if any of the composite fields need this
for _, compositeField := range d.CompositeFields {
compositeField.Compose(fieldNames[fieldIndex], fieldLengths[fieldIndex], tokenFreqs)
}
}
for _, compositeField := range d.CompositeFields {
analyzeField(compositeField, false)
}
rowsCapNeeded := len(rv.Rows)
for _, tokenFreqs := range fieldTermFreqs {
rowsCapNeeded += len(tokenFreqs)
}
rows := make([]index.IndexRow, 0, rowsCapNeeded)
rv.Rows = append(rows, rv.Rows...)
// walk through the collated information and proccess
// once for each indexed field (unique name)
for fieldIndex, tokenFreqs := range fieldTermFreqs {
fieldLength := fieldLengths[fieldIndex]
includeTermVectors := fieldIncludeTermVectors[fieldIndex]
// see if any of the composite fields need this
for _, compositeField := range d.CompositeFields {
compositeField.Compose(fieldNames[fieldIndex], fieldLength, tokenFreqs)
}
// encode this field
indexRows := f.indexField(d.ID, d.Number, includeTermVectors, fieldIndex, fieldLength, tokenFreqs)
rv.Rows = append(rv.Rows, indexRows...)
}
// now index the composite fields
for _, compositeField := range d.CompositeFields {
fieldIndex, newFieldRow := f.fieldIndexOrNewRow(compositeField.Name())
if newFieldRow != nil {
rv.Rows = append(rv.Rows, newFieldRow)
}
if compositeField.Options().IsIndexed() {
fieldLength, tokenFreqs := compositeField.Analyze()
// encode this field
indexRows := f.indexField(d.ID, d.Number, compositeField.Options().IncludeTermVectors(), fieldIndex, fieldLength, tokenFreqs)
rv.Rows = append(rv.Rows, indexRows...)
}
rv.Rows = f.indexField(docIDBytes, d.Number, includeTermVectors, fieldIndex, fieldLength, tokenFreqs, rv.Rows)
}
return rv
}
func (f *Firestorm) indexField(docID string, docNum uint64, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies) []index.IndexRow {
func (f *Firestorm) indexField(docID []byte, docNum uint64, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies, rows []index.IndexRow) []index.IndexRow {
tfrs := make([]TermFreqRow, len(tokenFreqs))
rows := make([]index.IndexRow, 0, 100)
fieldNorm := float32(1.0 / math.Sqrt(float64(fieldLength)))
for _, tf := range tokenFreqs {
var termFreqRow *TermFreqRow
if includeTermVectors {
tv, newFieldRows := f.termVectorsFromTokenFreq(fieldIndex, tf)
rows = append(rows, newFieldRows...)
termFreqRow = NewTermFreqRow(fieldIndex, tf.Term, []byte(docID), docNum, uint64(tf.Frequency()), fieldNorm, tv)
} else {
termFreqRow = NewTermFreqRow(fieldIndex, tf.Term, []byte(docID), docNum, uint64(tf.Frequency()), fieldNorm, nil)
if !includeTermVectors {
i := 0
for _, tf := range tokenFreqs {
rows = append(rows, InitTermFreqRow(&tfrs[i], fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, nil))
i++
}
rows = append(rows, termFreqRow)
return rows
}
i := 0
for _, tf := range tokenFreqs {
var tv []*TermVector
tv, rows = f.termVectorsFromTokenFreq(fieldIndex, tf, rows)
rows = append(rows, InitTermFreqRow(&tfrs[i], fieldIndex, tf.Term, docID, docNum, uint64(tf.Frequency()), fieldNorm, tv))
i++
}
return rows
}
func (f *Firestorm) termVectorsFromTokenFreq(field uint16, tf *analysis.TokenFreq) ([]*TermVector, []index.IndexRow) {
func (f *Firestorm) termVectorsFromTokenFreq(field uint16, tf *analysis.TokenFreq, rows []index.IndexRow) ([]*TermVector, []index.IndexRow) {
rv := make([]*TermVector, len(tf.Locations))
newFieldRows := make([]index.IndexRow, 0)
for i, l := range tf.Locations {
var newFieldRow *FieldRow
@ -124,21 +131,21 @@ func (f *Firestorm) termVectorsFromTokenFreq(field uint16, tf *analysis.TokenFre
// lookup correct field
fieldIndex, newFieldRow = f.fieldIndexOrNewRow(l.Field)
if newFieldRow != nil {
newFieldRows = append(newFieldRows, newFieldRow)
rows = append(rows, newFieldRow)
}
}
tv := NewTermVector(fieldIndex, uint64(l.Position), uint64(l.Start), uint64(l.End), l.ArrayPositions)
rv[i] = tv
}
return rv, newFieldRows
return rv, rows
}
func (f *Firestorm) storeField(docID string, docNum uint64, field document.Field, fieldIndex uint16) index.IndexRow {
func (f *Firestorm) storeField(docID []byte, docNum uint64, field document.Field, fieldIndex uint16) index.IndexRow {
fieldValue := make([]byte, 1+len(field.Value()))
fieldValue[0] = encodeFieldType(field)
copy(fieldValue[1:], field.Value())
storedRow := NewStoredRow([]byte(docID), docNum, fieldIndex, field.ArrayPositions(), fieldValue)
storedRow := NewStoredRow(docID, docNum, fieldIndex, field.ArrayPositions(), fieldValue)
return storedRow
}

View File

@ -78,8 +78,8 @@ func TestAnalysis(t *testing.T) {
r: &index.AnalysisResult{
DocID: "a",
Rows: []index.IndexRow{
NewFieldRow(1, "name"),
NewTermFreqRow(0, nil, []byte("a"), 1, 0, 0.0, nil),
NewFieldRow(1, "name"),
NewStoredRow([]byte("a"), 1, 1, nil, []byte("ttest")),
NewTermFreqRow(1, []byte("test"), []byte("a"), 1, 1, 1.0, []*TermVector{NewTermVector(1, 1, 0, 4, nil)}),
},

View File

@ -15,7 +15,6 @@ import (
"sort"
"sync"
"github.com/blevesearch/bleve/document"
"github.com/steveyen/gtreap"
"github.com/willf/bitset"
)
@ -80,17 +79,13 @@ func (c *Compensator) Mutate(docID []byte, docNum uint64) {
}
}
func (c *Compensator) MutateBatch(docs map[string]*document.Document, docNum uint64) {
func (c *Compensator) MutateBatch(inflightItems []*InFlightItem, lastDocNum uint64) {
c.inFlightMutex.Lock()
defer c.inFlightMutex.Unlock()
for docID, doc := range docs {
if doc != nil {
c.inFlight = c.inFlight.Upsert(&InFlightItem{docID: []byte(docID), docNum: doc.Number}, rand.Int())
} else {
c.inFlight = c.inFlight.Upsert(&InFlightItem{docID: []byte(docID), docNum: 0}, rand.Int())
}
for _, item := range inflightItems {
c.inFlight = c.inFlight.Upsert(item, rand.Int())
}
c.maxRead = docNum
c.maxRead = lastDocNum
}
func (c *Compensator) Migrate(docID []byte, docNum uint64, oldDocNums []uint64) {
@ -111,7 +106,7 @@ func (c *Compensator) Migrate(docID []byte, docNum uint64, oldDocNums []uint64)
// remove entry from in-flight if it still has same doc num
val := c.inFlight.Get(&InFlightItem{docID: docID})
if val.(*InFlightItem).docNum == docNum {
if val != nil && val.(*InFlightItem).docNum == docNum {
c.inFlight = c.inFlight.Delete(&InFlightItem{docID: docID})
}
}

View File

@ -25,6 +25,7 @@ type DictUpdater struct {
f *Firestorm
dictUpdateSleep time.Duration
quit chan struct{}
incoming chan map[string]int64
mutex sync.RWMutex
workingSet map[string]int64
@ -41,6 +42,7 @@ func NewDictUpdater(f *Firestorm) *DictUpdater {
workingSet: make(map[string]int64),
batchesStarted: 1,
quit: make(chan struct{}),
incoming: make(chan map[string]int64, 8),
}
return &rv
}
@ -52,15 +54,12 @@ func (d *DictUpdater) Notify(term string, usage int64) {
}
func (d *DictUpdater) NotifyBatch(termUsages map[string]int64) {
d.mutex.Lock()
defer d.mutex.Unlock()
for term, usage := range termUsages {
d.workingSet[term] += usage
}
d.incoming <- termUsages
}
func (d *DictUpdater) Start() {
d.closeWait.Add(1)
go d.runIncoming()
go d.run()
}
@ -69,6 +68,24 @@ func (d *DictUpdater) Stop() {
d.closeWait.Wait()
}
func (d *DictUpdater) runIncoming() {
for {
select {
case <-d.quit:
return
case termUsages, ok := <-d.incoming:
if !ok {
return
}
d.mutex.Lock()
for term, usage := range termUsages {
d.workingSet[term] += usage
}
d.mutex.Unlock()
}
}
}
func (d *DictUpdater) run() {
tick := time.Tick(d.dictUpdateSleep)
for {

View File

@ -10,6 +10,7 @@
package firestorm
import (
"runtime"
"testing"
"github.com/blevesearch/bleve/index"
@ -38,6 +39,9 @@ func TestDictUpdater(t *testing.T) {
f.(*Firestorm).dictUpdater.NotifyBatch(dictBatch)
// invoke updater manually
for len(f.(*Firestorm).dictUpdater.incoming) > 0 {
runtime.Gosched()
}
f.(*Firestorm).dictUpdater.update()
// assert that dictionary rows are correct
@ -77,6 +81,9 @@ func TestDictUpdater(t *testing.T) {
f.(*Firestorm).dictUpdater.NotifyBatch(dictBatch)
// invoke updater manually
for len(f.(*Firestorm).dictUpdater.incoming) > 0 {
runtime.Gosched()
}
f.(*Firestorm).dictUpdater.update()
// assert that dictionary rows are correct
@ -116,6 +123,9 @@ func TestDictUpdater(t *testing.T) {
f.(*Firestorm).dictUpdater.NotifyBatch(dictBatch)
// invoke updater manually
for len(f.(*Firestorm).dictUpdater.incoming) > 0 {
runtime.Gosched()
}
f.(*Firestorm).dictUpdater.update()
// assert that dictionary rows are correct

View File

@ -144,10 +144,9 @@ func (f *Firestorm) Update(doc *document.Document) (err error) {
analysisStart := time.Now()
resultChan := make(chan *index.AnalysisResult)
aw := index.NewAnalysisWork(f, doc, resultChan)
// put the work on the queue
go func() {
f.analysisQueue.Queue(aw)
}()
go f.analysisQueue.Queue(aw)
// wait for the result
result := <-resultChan
@ -168,7 +167,7 @@ func (f *Firestorm) Update(doc *document.Document) (err error) {
}()
var dictionaryDeltas map[string]int64
dictionaryDeltas, err = f.batchRows(kvwriter, result.Rows, nil)
dictionaryDeltas, err = f.batchRows(kvwriter, [][]index.IndexRow{result.Rows}, nil)
if err != nil {
_ = kvwriter.Close()
atomic.AddUint64(&f.stats.errors, 1)
@ -176,7 +175,7 @@ func (f *Firestorm) Update(doc *document.Document) (err error) {
}
f.compensator.Mutate([]byte(doc.ID), doc.Number)
f.lookuper.Notify(doc.Number, []byte(doc.ID))
f.lookuper.NotifyBatch([]*InFlightItem{&InFlightItem{[]byte(doc.ID), doc.Number}})
f.dictUpdater.NotifyBatch(dictionaryDeltas)
atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart)))
@ -186,26 +185,57 @@ func (f *Firestorm) Update(doc *document.Document) (err error) {
func (f *Firestorm) Delete(id string) error {
indexStart := time.Now()
f.compensator.Mutate([]byte(id), 0)
f.lookuper.Notify(0, []byte(id))
f.lookuper.NotifyBatch([]*InFlightItem{&InFlightItem{[]byte(id), 0}})
atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart)))
return nil
}
func (f *Firestorm) batchRows(writer store.KVWriter, rows []index.IndexRow, deleteKeys [][]byte) (map[string]int64, error) {
func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexRow, deleteKeys [][]byte) (map[string]int64, error) {
// prepare batch
wb := writer.NewBatch()
dictionaryDeltas := make(map[string]int64)
for _, row := range rows {
tfr, ok := row.(*TermFreqRow)
if ok {
if tfr.Field() != 0 {
drk := tfr.DictionaryRowKey()
dictionaryDeltas[string(drk)] += 1
}
var kbuf []byte
var vbuf []byte
prepareBuf := func(buf []byte, sizeNeeded int) []byte {
if cap(buf) < sizeNeeded {
return make([]byte, sizeNeeded, sizeNeeded+128)
}
return buf[0:sizeNeeded]
}
dictionaryDeltas := make(map[string]int64)
for _, rows := range rowsOfRows {
for _, row := range rows {
tfr, ok := row.(*TermFreqRow)
if ok {
if tfr.Field() != 0 {
kbuf = prepareBuf(kbuf, tfr.DictionaryRowKeySize())
klen, err := tfr.DictionaryRowKeyTo(kbuf)
if err != nil {
return nil, err
}
dictionaryDeltas[string(kbuf[0:klen])] += 1
}
}
kbuf = prepareBuf(kbuf, row.KeySize())
klen, err := row.KeyTo(kbuf)
if err != nil {
return nil, err
}
vbuf = prepareBuf(vbuf, row.ValueSize())
vlen, err := row.ValueTo(vbuf)
if err != nil {
return nil, err
}
wb.Set(kbuf[0:klen], vbuf[0:vlen])
}
wb.Set(row.Key(), row.Value())
}
for _, dk := range deleteKeys {
@ -231,13 +261,13 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) {
analysisStart := time.Now()
resultChan := make(chan *index.AnalysisResult)
var numUpdates uint64
var docsUpdated uint64
var docsDeleted uint64
for _, doc := range batch.IndexOps {
if doc != nil {
doc.Number = firstDocNumber // actually assign doc numbers here
firstDocNumber++
numUpdates++
docsUpdated++
} else {
docsDeleted++
}
@ -251,7 +281,7 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) {
for _, doc := range batch.IndexOps {
if doc != nil {
sofar++
if sofar > numUpdates {
if sofar > docsUpdated {
detectedUnsafeMutex.Lock()
detectedUnsafe = true
detectedUnsafeMutex.Unlock()
@ -264,12 +294,14 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) {
}
}()
allRows := make([]index.IndexRow, 0, 1000)
// extra 1 capacity for internal updates.
collectRows := make([][]index.IndexRow, 0, docsUpdated+1)
// wait for the result
var itemsDeQueued uint64
for itemsDeQueued < numUpdates {
for itemsDeQueued < docsUpdated {
result := <-resultChan
allRows = append(allRows, result.Rows...)
collectRows = append(collectRows, result.Rows)
itemsDeQueued++
}
close(resultChan)
@ -282,16 +314,31 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) {
atomic.AddUint64(&f.stats.analysisTime, uint64(time.Since(analysisStart)))
deleteKeys := make([][]byte, 0)
// add the internal ops
for internalKey, internalValue := range batch.InternalOps {
if internalValue == nil {
// delete
deleteInternalRow := NewInternalRow([]byte(internalKey), nil)
deleteKeys = append(deleteKeys, deleteInternalRow.Key())
var deleteKeys [][]byte
if len(batch.InternalOps) > 0 {
// add the internal ops
updateInternalRows := make([]index.IndexRow, 0, len(batch.InternalOps))
for internalKey, internalValue := range batch.InternalOps {
if internalValue == nil {
// delete
deleteInternalRow := NewInternalRow([]byte(internalKey), nil)
deleteKeys = append(deleteKeys, deleteInternalRow.Key())
} else {
updateInternalRow := NewInternalRow([]byte(internalKey), internalValue)
updateInternalRows = append(updateInternalRows, updateInternalRow)
}
}
collectRows = append(collectRows, updateInternalRows)
}
inflightItems := make([]*InFlightItem, 0, len(batch.IndexOps))
for docID, doc := range batch.IndexOps {
if doc != nil {
inflightItems = append(inflightItems,
&InFlightItem{[]byte(docID), doc.Number})
} else {
updateInternalRow := NewInternalRow([]byte(internalKey), internalValue)
allRows = append(allRows, updateInternalRow)
inflightItems = append(inflightItems,
&InFlightItem{[]byte(docID), 0})
}
}
@ -304,28 +351,22 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) {
}
var dictionaryDeltas map[string]int64
dictionaryDeltas, err = f.batchRows(kvwriter, allRows, deleteKeys)
dictionaryDeltas, err = f.batchRows(kvwriter, collectRows, deleteKeys)
if err != nil {
_ = kvwriter.Close()
atomic.AddUint64(&f.stats.errors, 1)
return
}
f.compensator.MutateBatch(batch.IndexOps, lastDocNumber)
for docID, doc := range batch.IndexOps {
if doc != nil {
f.lookuper.Notify(doc.Number, []byte(doc.ID))
} else {
f.lookuper.Notify(0, []byte(docID))
}
}
f.compensator.MutateBatch(inflightItems, lastDocNumber)
f.lookuper.NotifyBatch(inflightItems)
f.dictUpdater.NotifyBatch(dictionaryDeltas)
err = kvwriter.Close()
atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart)))
if err == nil {
atomic.AddUint64(&f.stats.updates, numUpdates)
atomic.AddUint64(&f.stats.updates, docsUpdated)
atomic.AddUint64(&f.stats.deletes, docsDeleted)
atomic.AddUint64(&f.stats.batches, 1)
} else {

View File

@ -58,7 +58,6 @@ Once the Delete operation returns, the document should no longer be returned by
## Details
### Terminology
Document ID (`doc_id`)
@ -74,15 +73,15 @@ DocIdNumber
By including a new unique identifier as a part of every row generated, the index operation no longer concerns itself with updating existing values or deleting previous values.
Removal of old rows is handled indepenently by a separate thread.
Removal of old rows is handled indepenently by separate threads.
Ensuring of correct semantics with respect to added/updated/deleted documents is maintained through synchronized in memory data structures, to compensate for the decoupling of these other operations.
Ensuring of correct semantics with respect to added/updated/deleted documents is maintained through synchronized in-memory data structures, to compensate for the decoupling of these other operations.
The Dictionary becomes a best effort data element. In kill-9 scenarios it could become incorrect, but it is believed that this will generally only affect scoring not correctness, and we can pursue read-repair operations.
### Index State
The following pseudo-structure will be used to explain changes to the internal state. Keep in mind the datatypes shows represent the logical strcture required for correct behavior. The actual implementation may be different to achieve performance goals.
The following pseudo-structure will be used to explain changes to the internal state. Keep in mind the datatypes shown represent the logical structure required for correct behavior. The actual implementation may be different to achieve performance goals.
indexState {
docCount uint64
@ -114,6 +113,7 @@ The following pseudo-structure will be used to explain changes to the internal s
inFlightDocIds = {}
deletedDocIdNumbers = {}
}
- Garbage Collector Thread is started
- Old Doc Number Lookup Thread is started
- Index marked open
@ -124,7 +124,7 @@ The following pseudo-structure will be used to explain changes to the internal s
- ITERATE all FieldRows{}
- ITERATE all TermFrequencyRow{ where field_id = 0 }
- Identify consecutive rows with same doc_id but different doc_number
- Lower document numbers get added to the deleted doc numbers list
- Lower document numbers are added to the deletedDocIdNumbers list
- Count all non-duplicate rows, seed the docCount
- Observe highest document number seen, seed nextDocNumber
@ -156,12 +156,13 @@ Currently, only two types of rows include document numbers:
The current thought is that the garbage collector thread will use a single iterator to iterate the following key spaces:
TermFrequencyRow { where field_id > 0}
StoredRow {all}
- TermFrequencyRow { where field_id > 0}
- StoredRow {all}
For any row refering to a document number on the deletedDocNumbers list, that key will be DELETED.
The garbage collector will track loop iterations or start key for each deletedDocNumber so that it knows when it has walked a full circle for a given doc number. At point the following happen in order:
- docNumber is removed from the deletecDocNumbers list
- DELETE is issued on TermFreqRow{ field_id=0, term=doc_id, doc_id=doc_id_number }
@ -201,35 +202,29 @@ It is notified via a channel of increased term usage (by index ops) and of decre
#### Indexing a Document
Perform all analysis on the document.
new_doc_number = indexState.nextDocNumber++
Create New Batch
Batch will contain SET operations for:
- any new Fields
- Term Frequency Rows for indexed fields terms
- Stored Rows for stored fields
Execute Batch
Acquire indexState.docIdNumberMutex for writing:
set maxReadDocNumber new_doc_number
set inFlightDocIds{ docId = new_doc_number }
Release indexState.docIdNumberMutex
Notify Term Frequency Updater thread of increased term usage.
Notify Old Doc Number Lookup Thread of doc_id.
- Perform all analysis on the document.
- new_doc_number = indexState.nextDocNumber++
- Create New Batch
- Batch will contain SET operations for:
- any new Fields
- Term Frequency Rows for indexed fields terms
- Stored Rows for stored fields
- Execute Batch
- Acquire indexState.docIdNumberMutex for writing:
- set maxReadDocNumber new_doc_number
- set inFlightDocIds{ docId = new_doc_number }
- Release indexState.docIdNumberMutex
- Notify Term Frequency Updater thread of increased term usage.
- Notify Old Doc Number Lookup Thread of doc_id.
The key property is that a search matching the updated document *SHOULD* return the document once this method returns. If the document was an update, it should return the previous document until this method returns. There should be no period of time where neither document matches.
#### Deleting a Document
Acquire indexState.docIdNumberMutex for writing:
set inFlightDocIds{ docId = 0 } // 0 is a doc number we never use, indicates pending deltion of docId
Release indexState.docIdNumberMutex
Notify Old Doc Number Lookup Thread of doc_id.
- Acquire indexState.docIdNumberMutex for writing:
- set inFlightDocIds{ docId = 0 } // 0 is a doc number we never use, indicates pending deltion of docId
- Release indexState.docIdNumberMutex
- Notify Old Doc Number Lookup Thread of doc_id.
#### Batch Operations
@ -241,13 +236,12 @@ Batch operations look largely just like the indexing/deleting operations. Two o
#### Term Field Iteration
- Acquire indexState.docIdNumberMutex for reading:
- Get copy of: (it is assumed some COW datastructure is used, or MVCC is accomodated in some way by the impl)
- Get copy of: (it is assumed some COW data structure is used, or MVCC is accomodated in some way by the impl)
- maxReadDocNumber
- inFlightDocIds
- deletedDocIdNumbers
- Release indexState.docIdNumberMutex
Term Field Iteration is used by the basic term search. It produces the set of documents (and related info like term vectors) which used the specified term in the specified field.
Iterator starts at key:
@ -256,16 +250,18 @@ Iterator starts at key:
Iterator ends when the term does not match.
Any row with doc_number > maxReadDocNumber MUST be ignored.
Any row with doc_id_number on the deletedDocIdNumber list MUST be ignored.
Any row with the same doc_id as an entry in the inFlightDocIds map, MUST have the same number.
- Any row with doc_number > maxReadDocNumber MUST be ignored.
- Any row with doc_id_number on the deletedDocIdNumber list MUST be ignored.
- Any row with the same doc_id as an entry in the inFlightDocIds map, MUST have the same number.
Any row satisfying the above conditions is a candidate document.
### Row Encoding
All keys are manually encoded to ensure a precise row ordering.
Internal Row values are opaque byte arrays.
All other values are encoded using protobuf for a balance of efficiency and flexibility. Dictionary and TermFrequency rows are the most likely to take advantage of this flexibility, but other rows are read/written infrequently enough that the flexibility outweighs any overhead.
#### Version
@ -375,7 +371,7 @@ In state d, we have completed the lookup for the old document numbers of X, and
In state e, the garbage collector has removed all record of X#1.
The Index method returns after it has transitioned to state C, which maintains the semantics we desire.
The Index method returns after it has transitioned to state c, which maintains the semantics we desire.
2\. Wait, what happens if I kill -9 the process, won't you forget about the deleted documents?

View File

@ -18,14 +18,9 @@ import (
const channelBufferSize = 1000
type lookupTask struct {
docID []byte
docNum uint64
}
type Lookuper struct {
f *Firestorm
workChan chan *lookupTask
workChan chan []*InFlightItem
quit chan struct{}
closeWait sync.WaitGroup
@ -36,15 +31,15 @@ type Lookuper struct {
func NewLookuper(f *Firestorm) *Lookuper {
rv := Lookuper{
f: f,
workChan: make(chan *lookupTask, channelBufferSize),
workChan: make(chan []*InFlightItem, channelBufferSize),
quit: make(chan struct{}),
}
return &rv
}
func (l *Lookuper) Notify(docNum uint64, docID []byte) {
func (l *Lookuper) NotifyBatch(items []*InFlightItem) {
atomic.AddUint64(&l.tasksQueued, 1)
l.workChan <- &lookupTask{docID: docID, docNum: docNum}
l.workChan <- items
}
func (l *Lookuper) Start() {
@ -65,17 +60,24 @@ func (l *Lookuper) run() {
logger.Printf("lookuper asked to quit")
l.closeWait.Done()
return
case task, ok := <-l.workChan:
case items, ok := <-l.workChan:
if !ok {
logger.Printf("lookuper work channel closed unexpectedly, stopping")
return
}
l.lookup(task)
l.lookupItems(items)
}
}
}
func (l *Lookuper) lookup(task *lookupTask) {
func (l *Lookuper) lookupItems(items []*InFlightItem) {
for _, item := range items {
l.lookup(item)
}
atomic.AddUint64(&l.tasksDone, 1)
}
func (l *Lookuper) lookup(item *InFlightItem) {
reader, err := l.f.store.Reader()
if err != nil {
logger.Printf("lookuper fatal: %v", err)
@ -87,7 +89,7 @@ func (l *Lookuper) lookup(task *lookupTask) {
}
}()
prefix := TermFreqPrefixFieldTermDocId(0, nil, task.docID)
prefix := TermFreqPrefixFieldTermDocId(0, nil, item.docID)
logger.Printf("lookuper prefix - % x", prefix)
docNums := make(DocNumberList, 0)
err = visitPrefix(reader, prefix, func(key, val []byte) (bool, error) {
@ -106,20 +108,19 @@ func (l *Lookuper) lookup(task *lookupTask) {
}
oldDocNums := make(DocNumberList, 0, len(docNums))
for _, docNum := range docNums {
if task.docNum == 0 || docNum < task.docNum {
if item.docNum == 0 || docNum < item.docNum {
oldDocNums = append(oldDocNums, docNum)
}
}
logger.Printf("lookup migrating '%s' - %d - oldDocNums: %v", task.docID, task.docNum, oldDocNums)
l.f.compensator.Migrate(task.docID, task.docNum, oldDocNums)
if len(oldDocNums) == 0 && task.docNum != 0 {
logger.Printf("lookup migrating '%s' - %d - oldDocNums: %v", item.docID, item.docNum, oldDocNums)
l.f.compensator.Migrate(item.docID, item.docNum, oldDocNums)
if len(oldDocNums) == 0 && item.docNum != 0 {
// this was an add, not an update
atomic.AddUint64(l.f.docCount, 1)
} else if len(oldDocNums) > 0 && task.docNum == 0 {
} else if len(oldDocNums) > 0 && item.docNum == 0 {
// this was a delete (and it previously existed)
atomic.AddUint64(l.f.docCount, ^uint64(0))
}
atomic.AddUint64(&l.tasksDone, 1)
}
// this is not intended to be used publicly, only for unit tests

View File

@ -62,7 +62,7 @@ func TestLookups(t *testing.T) {
if val == nil {
t.Errorf("expected key: % x to be in the inflight list", tfr.DocID())
}
f.(*Firestorm).lookuper.lookup(&lookupTask{docID: tfr.DocID(), docNum: tfr.DocNum()})
f.(*Firestorm).lookuper.lookup(&InFlightItem{docID: tfr.DocID(), docNum: tfr.DocNum()})
// now expect this mutation to NOT be in the in-flight list
val = f.(*Firestorm).compensator.inFlight.Get(&InFlightItem{docID: tfr.DocID()})
if val != nil {

View File

@ -46,18 +46,18 @@ func NewTermVector(field uint16, pos uint64, start uint64, end uint64, arrayPos
}
func NewTermFreqRow(field uint16, term []byte, docID []byte, docNum uint64, freq uint64, norm float32, termVectors []*TermVector) *TermFreqRow {
rv := TermFreqRow{
field: field,
term: term,
docID: docID,
docNum: docNum,
}
return InitTermFreqRow(&TermFreqRow{}, field, term, docID, docNum, freq, norm, termVectors)
}
rv.value.Freq = proto.Uint64(freq)
rv.value.Norm = proto.Float32(norm)
rv.value.Vectors = termVectors
return &rv
func InitTermFreqRow(tfr *TermFreqRow, field uint16, term []byte, docID []byte, docNum uint64, freq uint64, norm float32, termVectors []*TermVector) *TermFreqRow {
tfr.field = field
tfr.term = term
tfr.docID = docID
tfr.docNum = docNum
tfr.value.Freq = proto.Uint64(freq)
tfr.value.Norm = proto.Float32(norm)
tfr.value.Vectors = termVectors
return tfr
}
func NewTermFreqRowKV(key, value []byte) (*TermFreqRow, error) {