0
0
Fork 0

text analysis now moved out of index write lock onto goroutine

1. text analysis is now done before the write lock is acquired
2. there is now a pool of analysis workers
3. the size of this pool is configurable
4. this allows for documents in a batch to be analyzed concurrently

as a part of benchmarking these changes i've also introduce a new
null storage implementation.  this should never be used, as it
does not actualy build an index.  it does however let us go
through all the normal indexing machinery, without incuring
any indexing I/O.  this is very helpful in measuring improvements
made to the text analsysis pipeline, which are often overshadowed
by indexing times in benchmarks actually building an index.
This commit is contained in:
Marty Schoch 2014-09-24 08:13:14 -04:00
parent 1dc466a800
commit 97902e2619
18 changed files with 949 additions and 175 deletions

View File

@ -11,9 +11,11 @@ package bleve
import (
"expvar"
"github.com/blevesearch/bleve/registry"
"time"
"github.com/blevesearch/bleve/index/upside_down"
"github.com/blevesearch/bleve/registry"
// token maps
_ "github.com/blevesearch/bleve/analysis/token_map"
@ -108,11 +110,13 @@ type configuration struct {
Cache *registry.Cache
DefaultHighlighter string
DefaultKVStore string
analysisQueue upside_down.AnalysisQueue
}
func newConfiguration() *configuration {
return &configuration{
Cache: registry.NewCache(),
Cache: registry.NewCache(),
analysisQueue: upside_down.NewAnalysisQueue(4),
}
}

181
index/store/null/null.go Normal file
View File

@ -0,0 +1,181 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package null
import (
"github.com/blevesearch/bleve/index/store"
)
type Store struct{}
func Open() (*Store, error) {
rv := Store{}
return &rv, nil
}
func (i *Store) Close() error {
return nil
}
func (i *Store) iterator(key []byte) store.KVIterator {
rv := newIterator(i)
return rv
}
func (i *Store) Reader() store.KVReader {
return newReader(i)
}
func (i *Store) Writer() store.KVWriter {
return newWriter(i)
}
func (i *Store) newBatch() store.KVBatch {
return newBatch(i)
}
type Reader struct {
store *Store
}
func newReader(store *Store) *Reader {
return &Reader{
store: store,
}
}
func (r *Reader) Get(key []byte) ([]byte, error) {
return nil, nil
}
func (r *Reader) Iterator(key []byte) store.KVIterator {
return r.store.iterator(key)
}
func (r *Reader) Close() error {
return nil
}
type Iterator struct {
store *Store
}
func newIterator(store *Store) *Iterator {
rv := Iterator{
store: store,
}
return &rv
}
func (i *Iterator) SeekFirst() {}
func (i *Iterator) Seek(k []byte) {}
func (i *Iterator) Next() {}
func (i *Iterator) Current() ([]byte, []byte, bool) {
return nil, nil, false
}
func (i *Iterator) Key() []byte {
return nil
}
func (i *Iterator) Value() []byte {
return nil
}
func (i *Iterator) Valid() bool {
return false
}
func (i *Iterator) Close() {
}
type Batch struct {
store *Store
keys [][]byte
vals [][]byte
merges map[string]store.AssociativeMergeChain
}
func newBatch(s *Store) *Batch {
rv := Batch{
store: s,
keys: make([][]byte, 0),
vals: make([][]byte, 0),
merges: make(map[string]store.AssociativeMergeChain),
}
return &rv
}
func (i *Batch) Set(key, val []byte) {
i.keys = append(i.keys, key)
i.vals = append(i.vals, val)
}
func (i *Batch) Delete(key []byte) {
i.keys = append(i.keys, key)
i.vals = append(i.vals, nil)
}
func (i *Batch) Merge(key []byte, oper store.AssociativeMerge) {
opers, ok := i.merges[string(key)]
if !ok {
opers = make(store.AssociativeMergeChain, 0, 1)
}
opers = append(opers, oper)
i.merges[string(key)] = opers
}
func (i *Batch) Execute() error {
return nil
}
func (i *Batch) Close() error {
return nil
}
type Writer struct {
store *Store
}
func newWriter(store *Store) *Writer {
return &Writer{
store: store,
}
}
func (w *Writer) Set(key, val []byte) error {
return nil
}
func (w *Writer) Delete(key []byte) error {
return nil
}
func (w *Writer) NewBatch() store.KVBatch {
return newBatch(w.store)
}
func (w *Writer) Close() error {
return nil
}
// these two methods can safely read using the regular
// methods without a read transaction, because we know
// that no one else is writing but us
func (w *Writer) Get(key []byte) ([]byte, error) {
return nil, nil
}
func (w *Writer) Iterator(key []byte) store.KVIterator {
return w.store.iterator(key)
}

View File

@ -0,0 +1,101 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package upside_down
import (
"github.com/blevesearch/bleve/document"
)
type AnalysisResult struct {
docID string
rows []UpsideDownCouchRow
}
type AnalysisWork struct {
udc *UpsideDownCouch
d *document.Document
rc chan *AnalysisResult
}
type AnalysisQueue chan AnalysisWork
func NewAnalysisQueue(numWorkers int) AnalysisQueue {
rv := make(AnalysisQueue)
for i := 0; i < numWorkers; i++ {
go AnalysisWorker(rv)
}
return rv
}
func AnalysisWorker(q AnalysisQueue) {
// read work off the queue
for {
w := <-q
rv := &AnalysisResult{
docID: w.d.ID,
rows: make([]UpsideDownCouchRow, 0, 100),
}
// track our back index entries
backIndexTermEntries := make([]*BackIndexTermEntry, 0)
backIndexStoredEntries := make([]*BackIndexStoreEntry, 0)
for _, field := range w.d.Fields {
fieldIndex, newFieldRow := w.udc.fieldIndexCache.FieldIndex(field.Name())
if newFieldRow != nil {
rv.rows = append(rv.rows, newFieldRow)
}
if field.Options().IsIndexed() {
fieldLength, tokenFreqs := field.Analyze()
// see if any of the composite fields need this
for _, compositeField := range w.d.CompositeFields {
compositeField.Compose(field.Name(), fieldLength, tokenFreqs)
}
// encode this field
indexRows, indexBackIndexTermEntries := w.udc.indexField(w.d.ID, field, fieldIndex, fieldLength, tokenFreqs)
rv.rows = append(rv.rows, indexRows...)
backIndexTermEntries = append(backIndexTermEntries, indexBackIndexTermEntries...)
}
if field.Options().IsStored() {
storeRows, indexBackIndexStoreEntries := w.udc.storeField(w.d.ID, field, fieldIndex)
rv.rows = append(rv.rows, storeRows...)
backIndexStoredEntries = append(backIndexStoredEntries, indexBackIndexStoreEntries...)
}
}
// now index the composite fields
for _, compositeField := range w.d.CompositeFields {
fieldIndex, newFieldRow := w.udc.fieldIndexCache.FieldIndex(compositeField.Name())
if newFieldRow != nil {
rv.rows = append(rv.rows, newFieldRow)
}
if compositeField.Options().IsIndexed() {
fieldLength, tokenFreqs := compositeField.Analyze()
// encode this field
indexRows, indexBackIndexTermEntries := w.udc.indexField(w.d.ID, compositeField, fieldIndex, fieldLength, tokenFreqs)
rv.rows = append(rv.rows, indexRows...)
backIndexTermEntries = append(backIndexTermEntries, indexBackIndexTermEntries...)
}
}
// build the back index row
backIndexRow := NewBackIndexRow(w.d.ID, backIndexTermEntries, backIndexStoredEntries)
rv.rows = append(rv.rows, backIndexRow)
w.rc <- rv
}
}

View File

@ -16,7 +16,7 @@ import (
"github.com/blevesearch/bleve/index/store/boltdb"
)
func BenchmarkBoltDBIndexing(b *testing.B) {
func BenchmarkBoltDBIndexing1Workers(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
@ -24,5 +24,95 @@ func BenchmarkBoltDBIndexing(b *testing.B) {
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndex(b, s)
CommonBenchmarkIndex(b, s, 1)
}
func BenchmarkBoltDBIndexing2Workers(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndex(b, s, 2)
}
func BenchmarkBoltDBIndexing4Workers(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndex(b, s, 4)
}
// batches
func BenchmarkBoltDBIndexing1Workers10Batch(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 1, 10)
}
func BenchmarkBoltDBIndexing2Workers10Batch(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 2, 10)
}
func BenchmarkBoltDBIndexing4Workers10Batch(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 4, 10)
}
func BenchmarkBoltDBIndexing1Workers100Batch(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 1, 100)
}
func BenchmarkBoltDBIndexing2Workers100Batch(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 2, 100)
}
func BenchmarkBoltDBIndexing4Workers100Batch(b *testing.B) {
s, err := boltdb.Open("test", "bleve")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 4, 100)
}

View File

@ -13,24 +13,85 @@ import (
"strconv"
"testing"
_ "github.com/blevesearch/bleve/analysis/analyzers/standard_analyzer"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/registry"
)
func CommonBenchmarkIndex(b *testing.B, s store.KVStore) {
var benchmarkDocBodies = []string{
"A boiling liquid expanding vapor explosion (BLEVE, /ˈblɛviː/ blev-ee) is an explosion caused by the rupture of a vessel containing a pressurized liquid above its boiling point.",
"A boiler explosion is a catastrophic failure of a boiler. As seen today, boiler explosions are of two kinds. One kind is a failure of the pressure parts of the steam and water sides. There can be many different causes, such as failure of the safety valve, corrosion of critical parts of the boiler, or low water level. Corrosion along the edges of lap joints was a common cause of early boiler explosions.",
"A boiler is a closed vessel in which water or other fluid is heated. The fluid does not necessarily boil. (In North America the term \"furnace\" is normally used if the purpose is not actually to boil the fluid.) The heated or vaporized fluid exits the boiler for use in various processes or heating applications,[1][2] including central heating, boiler-based power generation, cooking, and sanitation.",
"A pressure vessel is a closed container designed to hold gases or liquids at a pressure substantially different from the ambient pressure.",
"Pressure (symbol: p or P) is the ratio of force to the area over which that force is distributed.",
"Liquid is one of the four fundamental states of matter (the others being solid, gas, and plasma), and is the only state with a definite volume but no fixed shape.",
"The boiling point of a substance is the temperature at which the vapor pressure of the liquid equals the pressure surrounding the liquid[1][2] and the liquid changes into a vapor.",
"Vapor pressure or equilibrium vapor pressure is defined as the pressure exerted by a vapor in thermodynamic equilibrium with its condensed phases (solid or liquid) at a given temperature in a closed system.",
"Industrial gases are a group of gases that are specifically manufactured for use in a wide range of industries, which include oil and gas, petrochemicals, chemicals, power, mining, steelmaking, metals, environmental protection, medicine, pharmaceuticals, biotechnology, food, water, fertilizers, nuclear power, electronics and aerospace.",
"The expansion ratio of a liquefied and cryogenic substance is the volume of a given amount of that substance in liquid form compared to the volume of the same amount of substance in gaseous form, at room temperature and normal atmospheric pressure.",
}
index := NewUpsideDownCouch(s)
func CommonBenchmarkIndex(b *testing.B, s store.KVStore, analysisWorkers int) {
analysisQueue := NewAnalysisQueue(analysisWorkers)
idx := NewUpsideDownCouch(s, analysisQueue)
cache := registry.NewCache()
analyzer, err := cache.AnalyzerNamed("standard")
if err != nil {
b.Fatal(err)
}
indexDocument := document.NewDocument("").
AddField(document.NewTextField("body", []uint64{}, []byte("A boiling liquid expanding vapor explosion (BLEVE, /ˈblɛviː/ blev-ee) is an explosion caused by the rupture of a vessel containing a pressurized liquid above its boiling point.")))
AddField(document.NewTextFieldWithAnalyzer("body", []uint64{}, []byte(benchmarkDocBodies[0]), analyzer))
b.ResetTimer()
for i := 0; i < b.N; i++ {
indexDocument.ID = strconv.Itoa(i)
err := index.Update(indexDocument)
err := idx.Update(indexDocument)
if err != nil {
b.Fatal(err)
}
}
}
func CommonBenchmarkIndexBatch(b *testing.B, s store.KVStore, analysisWorkers, batchSize int) {
analysisQueue := NewAnalysisQueue(analysisWorkers)
idx := NewUpsideDownCouch(s, analysisQueue)
cache := registry.NewCache()
analyzer, err := cache.AnalyzerNamed("standard")
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
var batch index.Batch
for j := 0; j < 1000; j++ {
if j%batchSize == 0 {
if len(batch) > 0 {
err := idx.Batch(batch)
if err != nil {
b.Fatal(err)
}
}
batch = make(index.Batch)
}
indexDocument := document.NewDocument("").
AddField(document.NewTextFieldWithAnalyzer("body", []uint64{}, []byte(benchmarkDocBodies[j%10]), analyzer))
indexDocument.ID = strconv.Itoa(i) + "-" + strconv.Itoa(j)
batch[indexDocument.ID] = indexDocument
}
// close last batch
if len(batch) > 0 {
err := idx.Batch(batch)
if err != nil {
b.Fatal(err)
}
}
}
}

View File

@ -15,12 +15,94 @@ import (
"github.com/blevesearch/bleve/index/store/inmem"
)
func BenchmarkInMemIndexing(b *testing.B) {
func BenchmarkInMemIndexing1Workers(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndex(b, s)
CommonBenchmarkIndex(b, s, 1)
}
func BenchmarkInMemIndexing2Workers(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndex(b, s, 2)
}
func BenchmarkInMemIndexing4Workers(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndex(b, s, 4)
}
// batches
func BenchmarkInMemIndexing1Workers10Batch(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 1, 10)
}
func BenchmarkInMemIndexing2Workers10Batch(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 2, 10)
}
func BenchmarkInMemIndexing4Workers10Batch(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 4, 10)
}
func BenchmarkInMemIndexing1Workers100Batch(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 1, 100)
}
func BenchmarkInMemIndexing2Workers100Batch(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 2, 100)
}
func BenchmarkInMemIndexing4Workers100Batch(b *testing.B) {
s, err := inmem.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 4, 100)
}

View File

@ -18,7 +18,7 @@ import (
"github.com/blevesearch/bleve/index/store/leveldb"
)
func BenchmarkLevelDBIndexing(b *testing.B) {
func BenchmarkLevelDBIndexing1Workers(b *testing.B) {
s, err := leveldb.Open("test", true, true)
if err != nil {
b.Fatal(err)
@ -26,5 +26,95 @@ func BenchmarkLevelDBIndexing(b *testing.B) {
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndex(b, s)
CommonBenchmarkIndex(b, s, 1)
}
func BenchmarkLevelDBIndexing2Workers(b *testing.B) {
s, err := leveldb.Open("test", true, true)
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndex(b, s, 2)
}
func BenchmarkLevelDBIndexing4Workers(b *testing.B) {
s, err := leveldb.Open("test", true, true)
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndex(b, s, 4)
}
// batches
func BenchmarkLevelDBIndexing1Workers10Batch(b *testing.B) {
s, err := leveldb.Open("test", true, true)
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 1, 10)
}
func BenchmarkLevelDBIndexing2Workers10Batch(b *testing.B) {
s, err := leveldb.Open("test", true, true)
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 2, 10)
}
func BenchmarkLevelDBIndexing4Workers10Batch(b *testing.B) {
s, err := leveldb.Open("test", true, true)
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 4, 10)
}
func BenchmarkLevelDBIndexing1Workers100Batch(b *testing.B) {
s, err := leveldb.Open("test", true, true)
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 1, 100)
}
func BenchmarkLevelDBIndexing2Workers100Batch(b *testing.B) {
s, err := leveldb.Open("test", true, true)
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 2, 100)
}
func BenchmarkLevelDBIndexing4Workers100Batch(b *testing.B) {
s, err := leveldb.Open("test", true, true)
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll("test")
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 4, 100)
}

View File

@ -0,0 +1,108 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package upside_down
import (
"testing"
"github.com/blevesearch/bleve/index/store/null"
)
func BenchmarkNullIndexing1Workers(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndex(b, s, 1)
}
func BenchmarkNullIndexing2Workers(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndex(b, s, 2)
}
func BenchmarkNullIndexing4Workers(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndex(b, s, 4)
}
// batches
func BenchmarkNullIndexing1Workers10Batch(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 1, 10)
}
func BenchmarkNullIndexing2Workers10Batch(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 2, 10)
}
func BenchmarkNullIndexing4Workers10Batch(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 4, 10)
}
func BenchmarkNullIndexing1Workers100Batch(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 1, 100)
}
func BenchmarkNullIndexing2Workers100Batch(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 2, 100)
}
func BenchmarkNullIndexing4Workers100Batch(b *testing.B) {
s, err := null.Open()
if err != nil {
b.Fatal(err)
}
defer s.Close()
CommonBenchmarkIndexBatch(b, s, 4, 100)
}

View File

@ -25,7 +25,8 @@ func TestDump(t *testing.T) {
if err != nil {
t.Error(err)
}
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)

View File

@ -0,0 +1,69 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package upside_down
import (
"sync"
)
type FieldIndexCache struct {
fieldIndexes map[string]uint16
lastFieldIndex int
mutex sync.RWMutex
}
func NewFieldIndexCache() *FieldIndexCache {
return &FieldIndexCache{
fieldIndexes: make(map[string]uint16),
}
}
func (f *FieldIndexCache) AddExisting(field string, index uint16) {
f.mutex.Lock()
defer f.mutex.Unlock()
f.fieldIndexes[field] = index
if int(index) > f.lastFieldIndex {
f.lastFieldIndex = int(index)
}
}
func (f *FieldIndexCache) FieldExists(field string) (uint16, bool) {
f.mutex.RLock()
defer f.mutex.RUnlock()
if index, ok := f.fieldIndexes[field]; ok {
return index, true
}
return 0, false
}
func (f *FieldIndexCache) FieldIndex(field string) (uint16, *FieldRow) {
f.mutex.Lock()
defer f.mutex.Unlock()
index, exists := f.fieldIndexes[field]
if exists {
return index, nil
}
// assign next field id
index = uint16(f.lastFieldIndex + 1)
f.fieldIndexes[field] = index
f.lastFieldIndex = int(index)
return index, NewFieldRow(uint16(index), field)
}
func (f *FieldIndexCache) FieldName(index uint16) string {
f.mutex.RLock()
defer f.mutex.RUnlock()
for fieldName, fieldIndex := range f.fieldIndexes {
if index == fieldIndex {
return fieldName
}
}
return ""
}

View File

@ -22,7 +22,8 @@ func TestIndexFieldReader(t *testing.T) {
defer os.RemoveAll("test")
store, err := boltdb.Open("test", "bleve")
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)

View File

@ -24,7 +24,7 @@ type IndexReader struct {
}
func (i *IndexReader) TermFieldReader(term []byte, fieldName string) (index.TermFieldReader, error) {
fieldIndex, fieldExists := i.index.fieldIndexes[fieldName]
fieldIndex, fieldExists := i.index.fieldIndexCache.FieldExists(fieldName)
if fieldExists {
return newUpsideDownCouchTermFieldReader(i, term, uint16(fieldIndex))
}
@ -32,7 +32,7 @@ func (i *IndexReader) TermFieldReader(term []byte, fieldName string) (index.Term
}
func (i *IndexReader) FieldReader(fieldName string, startTerm []byte, endTerm []byte) (index.FieldReader, error) {
fieldIndex, fieldExists := i.index.fieldIndexes[fieldName]
fieldIndex, fieldExists := i.index.fieldIndexCache.FieldExists(fieldName)
if fieldExists {
return newUpsideDownCouchFieldReader(i, uint16(fieldIndex), startTerm, endTerm)
}
@ -67,7 +67,7 @@ func (i *IndexReader) Document(id string) (*document.Document, error) {
return nil, err
}
if row != nil {
fieldName := i.index.fieldIndexToName(row.field)
fieldName := i.index.fieldIndexCache.FieldName(row.field)
field := decodeFieldType(row.typ, fieldName, row.value)
if field != nil {
rv.AddField(field)
@ -87,7 +87,7 @@ func (i *IndexReader) DocumentFieldTerms(id string) (index.FieldTerms, error) {
}
rv := make(index.FieldTerms, len(back.termEntries))
for _, entry := range back.termEntries {
fieldName := i.index.fieldIndexToName(uint16(*entry.Field))
fieldName := i.index.fieldIndexCache.FieldName(uint16(*entry.Field))
terms, ok := rv[fieldName]
if !ok {
terms = make([]string, 0)

View File

@ -23,7 +23,8 @@ func TestIndexReader(t *testing.T) {
defer os.RemoveAll("test")
store, err := boltdb.Open("test", "bleve")
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -168,7 +169,8 @@ func TestIndexDocIdReader(t *testing.T) {
defer os.RemoveAll("test")
store, err := boltdb.Open("test", "bleve")
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)

View File

@ -26,21 +26,20 @@ var VersionKey = []byte{'v'}
const Version uint8 = 1
type UpsideDownCouch struct {
version uint8
path string
store store.KVStore
fieldIndexes map[string]uint16
lastFieldIndex int
analyzer map[string]*analysis.Analyzer
docCount uint64
version uint8
path string
store store.KVStore
fieldIndexCache *FieldIndexCache
docCount uint64
analysisQueue AnalysisQueue
}
func NewUpsideDownCouch(s store.KVStore) *UpsideDownCouch {
func NewUpsideDownCouch(s store.KVStore, analysisQueue AnalysisQueue) *UpsideDownCouch {
return &UpsideDownCouch{
version: Version,
analyzer: make(map[string]*analysis.Analyzer),
fieldIndexes: make(map[string]uint16),
store: s,
version: Version,
fieldIndexCache: NewFieldIndexCache(),
store: s,
analysisQueue: analysisQueue,
}
}
@ -72,10 +71,7 @@ func (udc *UpsideDownCouch) loadSchema(kvreader store.KVReader) (err error) {
if err != nil {
return err
}
udc.fieldIndexes[fieldRow.name] = fieldRow.index
if int(fieldRow.index) > udc.lastFieldIndex {
udc.lastFieldIndex = int(fieldRow.index)
}
udc.fieldIndexCache.AddExisting(fieldRow.name, fieldRow.index)
it.Next()
key, val, valid = it.Current()
@ -196,6 +192,22 @@ func (udc *UpsideDownCouch) Close() {
}
func (udc *UpsideDownCouch) Update(doc *document.Document) error {
// do analysis before acquiring write lock
resultChan := make(chan *AnalysisResult)
aw := AnalysisWork{
udc: udc,
d: doc,
rc: resultChan,
}
// put the work on the queue
go func() {
udc.analysisQueue <- aw
}()
// wait for the result
result := <-resultChan
close(resultChan)
// start a writer for this update
kvwriter := udc.store.Writer()
defer kvwriter.Close()
@ -212,7 +224,7 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error {
updateRows := make([]UpsideDownCouchRow, 0)
deleteRows := make([]UpsideDownCouchRow, 0)
addRows, updateRows, deleteRows = udc.updateSingle(doc, backIndexRow, addRows, updateRows, deleteRows)
addRows, updateRows, deleteRows = udc.mergeOldAndNew(backIndexRow, result.rows, addRows, updateRows, deleteRows)
err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows)
if err == nil && backIndexRow == nil {
@ -221,8 +233,7 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error {
return err
}
func (udc *UpsideDownCouch) updateSingle(doc *document.Document, backIndexRow *BackIndexRow, addRows, updateRows, deleteRows []UpsideDownCouchRow) ([]UpsideDownCouchRow, []UpsideDownCouchRow, []UpsideDownCouchRow) {
func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows, addRows, updateRows, deleteRows []UpsideDownCouchRow) ([]UpsideDownCouchRow, []UpsideDownCouchRow, []UpsideDownCouchRow) {
existingTermKeys := make(map[string]bool)
for _, key := range backIndexRow.AllTermKeys() {
existingTermKeys[string(key)] = true
@ -233,62 +244,30 @@ func (udc *UpsideDownCouch) updateSingle(doc *document.Document, backIndexRow *B
existingStoredKeys[string(key)] = true
}
// track our back index entries
backIndexTermEntries := make([]*BackIndexTermEntry, 0)
backIndexStoredEntries := make([]*BackIndexStoreEntry, 0)
for _, field := range doc.Fields {
fieldIndex, newFieldRow := udc.fieldNameToFieldIndex(field.Name())
if newFieldRow != nil {
updateRows = append(updateRows, newFieldRow)
}
if field.Options().IsIndexed() {
fieldLength, tokenFreqs := field.Analyze()
// see if any of the composite fields need this
for _, compositeField := range doc.CompositeFields {
compositeField.Compose(field.Name(), fieldLength, tokenFreqs)
for _, row := range rows {
switch row := row.(type) {
case *TermFrequencyRow:
rowKey := string(row.Key())
if _, ok := existingTermKeys[rowKey]; ok {
updateRows = append(updateRows, row)
delete(existingTermKeys, rowKey)
} else {
addRows = append(addRows, row)
}
// encode this field
indexAddRows, indexUpdateRows, indexBackIndexTermEntries := udc.indexField(doc.ID, field, fieldIndex, fieldLength, tokenFreqs, existingTermKeys)
addRows = append(addRows, indexAddRows...)
updateRows = append(updateRows, indexUpdateRows...)
backIndexTermEntries = append(backIndexTermEntries, indexBackIndexTermEntries...)
}
if field.Options().IsStored() {
storeAddRows, storeUpdateRows, indexBackIndexStoreEntries := udc.storeField(doc.ID, field, fieldIndex, existingStoredKeys)
addRows = append(addRows, storeAddRows...)
updateRows = append(updateRows, storeUpdateRows...)
backIndexStoredEntries = append(backIndexStoredEntries, indexBackIndexStoreEntries...)
case *StoredRow:
rowKey := string(row.Key())
if _, ok := existingStoredKeys[rowKey]; ok {
updateRows = append(updateRows, row)
delete(existingStoredKeys, rowKey)
} else {
addRows = append(addRows, row)
}
default:
updateRows = append(updateRows, row)
}
}
// now index the composite fields
for _, compositeField := range doc.CompositeFields {
fieldIndex, newFieldRow := udc.fieldNameToFieldIndex(compositeField.Name())
if newFieldRow != nil {
updateRows = append(updateRows, newFieldRow)
}
if compositeField.Options().IsIndexed() {
fieldLength, tokenFreqs := compositeField.Analyze()
// encode this field
indexAddRows, indexUpdateRows, indexBackIndexTermEntries := udc.indexField(doc.ID, compositeField, fieldIndex, fieldLength, tokenFreqs, existingTermKeys)
addRows = append(addRows, indexAddRows...)
updateRows = append(updateRows, indexUpdateRows...)
backIndexTermEntries = append(backIndexTermEntries, indexBackIndexTermEntries...)
}
}
// build the back index row
backIndexRow = NewBackIndexRow(doc.ID, backIndexTermEntries, backIndexStoredEntries)
updateRows = append(updateRows, backIndexRow)
// any of the existing rows that weren't updated need to be deleted
for existingTermKey := range existingTermKeys {
termFreqRow, err := NewTermFrequencyRowK([]byte(existingTermKey))
@ -308,9 +287,8 @@ func (udc *UpsideDownCouch) updateSingle(doc *document.Document, backIndexRow *B
return addRows, updateRows, deleteRows
}
func (udc *UpsideDownCouch) storeField(docID string, field document.Field, fieldIndex uint16, existingKeys map[string]bool) ([]UpsideDownCouchRow, []UpsideDownCouchRow, []*BackIndexStoreEntry) {
updateRows := make([]UpsideDownCouchRow, 0)
addRows := make([]UpsideDownCouchRow, 0)
func (udc *UpsideDownCouch) storeField(docID string, field document.Field, fieldIndex uint16) ([]UpsideDownCouchRow, []*BackIndexStoreEntry) {
rows := make([]UpsideDownCouchRow, 0, 100)
backIndexStoredEntries := make([]*BackIndexStoreEntry, 0)
fieldType := encodeFieldType(field)
storedRow := NewStoredRow(docID, fieldIndex, field.ArrayPositions(), fieldType, field.Value())
@ -319,17 +297,8 @@ func (udc *UpsideDownCouch) storeField(docID string, field document.Field, field
backIndexStoredEntry := BackIndexStoreEntry{Field: proto.Uint32(uint32(fieldIndex)), ArrayPositions: field.ArrayPositions()}
backIndexStoredEntries = append(backIndexStoredEntries, &backIndexStoredEntry)
storedRowKey := string(storedRow.Key())
_, existed := existingKeys[storedRowKey]
if existed {
// this is an update
updateRows = append(updateRows, storedRow)
// this field was stored last time, delete it from that map
delete(existingKeys, storedRowKey)
} else {
addRows = append(addRows, storedRow)
}
return addRows, updateRows, backIndexStoredEntries
rows = append(rows, storedRow)
return rows, backIndexStoredEntries
}
func encodeFieldType(f document.Field) byte {
@ -347,10 +316,9 @@ func encodeFieldType(f document.Field) byte {
return fieldType
}
func (udc *UpsideDownCouch) indexField(docID string, field document.Field, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies, existingKeys map[string]bool) ([]UpsideDownCouchRow, []UpsideDownCouchRow, []*BackIndexTermEntry) {
func (udc *UpsideDownCouch) indexField(docID string, field document.Field, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies) ([]UpsideDownCouchRow, []*BackIndexTermEntry) {
updateRows := make([]UpsideDownCouchRow, 0)
addRows := make([]UpsideDownCouchRow, 0)
rows := make([]UpsideDownCouchRow, 0, 100)
backIndexTermEntries := make([]*BackIndexTermEntry, 0)
fieldNorm := float32(1.0 / math.Sqrt(float64(fieldLength)))
@ -358,7 +326,7 @@ func (udc *UpsideDownCouch) indexField(docID string, field document.Field, field
var termFreqRow *TermFrequencyRow
if field.Options().IncludeTermVectors() {
tv, newFieldRows := udc.termVectorsFromTokenFreq(fieldIndex, tf)
updateRows = append(updateRows, newFieldRows...)
rows = append(rows, newFieldRows...)
termFreqRow = NewTermFrequencyRowWithTermVectors(tf.Term, fieldIndex, docID, uint64(frequencyFromTokenFreq(tf)), fieldNorm, tv)
} else {
termFreqRow = NewTermFrequencyRow(tf.Term, fieldIndex, docID, uint64(frequencyFromTokenFreq(tf)), fieldNorm)
@ -368,34 +336,10 @@ func (udc *UpsideDownCouch) indexField(docID string, field document.Field, field
backIndexTermEntry := BackIndexTermEntry{Term: proto.String(string(tf.Term)), Field: proto.Uint32(uint32(fieldIndex))}
backIndexTermEntries = append(backIndexTermEntries, &backIndexTermEntry)
tfrKeyString := string(termFreqRow.Key())
_, existed := existingKeys[tfrKeyString]
if existed {
// this is an update
updateRows = append(updateRows, termFreqRow)
// this term existed last time, delete it from that map
delete(existingKeys, tfrKeyString)
} else {
// this is an add
addRows = append(addRows, termFreqRow)
}
rows = append(rows, termFreqRow)
}
return addRows, updateRows, backIndexTermEntries
}
func (udc *UpsideDownCouch) fieldNameToFieldIndex(fieldName string) (uint16, *FieldRow) {
var fieldRow *FieldRow
fieldIndex, fieldExists := udc.fieldIndexes[fieldName]
if !fieldExists {
// assign next field id
fieldIndex = uint16(udc.lastFieldIndex + 1)
udc.fieldIndexes[fieldName] = fieldIndex
// ensure this batch adds a row for this field
fieldRow = NewFieldRow(uint16(fieldIndex), fieldName)
udc.lastFieldIndex = int(fieldIndex)
}
return fieldIndex, fieldRow
return rows, backIndexTermEntries
}
func (udc *UpsideDownCouch) Delete(id string) error {
@ -497,7 +441,7 @@ func (udc *UpsideDownCouch) termVectorsFromTokenFreq(field uint16, tf *analysis.
fieldIndex := field
if l.Field != "" {
// lookup correct field
fieldIndex, newFieldRow = udc.fieldNameToFieldIndex(l.Field)
fieldIndex, newFieldRow = udc.fieldIndexCache.FieldIndex(l.Field)
if newFieldRow != nil {
newFieldRows = append(newFieldRows, newFieldRow)
}
@ -518,7 +462,7 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) []
rv := make([]*index.TermFieldVector, len(in))
for i, tv := range in {
fieldName := udc.fieldIndexToName(tv.field)
fieldName := udc.fieldIndexCache.FieldName(tv.field)
tfv := index.TermFieldVector{
Field: fieldName,
Pos: tv.pos,
@ -530,16 +474,40 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) []
return rv
}
func (udc *UpsideDownCouch) fieldIndexToName(i uint16) string {
for fieldName, fieldIndex := range udc.fieldIndexes {
if i == fieldIndex {
return fieldName
func (udc *UpsideDownCouch) Batch(batch index.Batch) error {
resultChan := make(chan *AnalysisResult)
numUpdates := 0
for _, doc := range batch {
if doc != nil {
numUpdates++
}
}
return ""
}
func (udc *UpsideDownCouch) Batch(batch index.Batch) error {
go func() {
for _, doc := range batch {
if doc != nil {
aw := AnalysisWork{
udc: udc,
d: doc,
rc: resultChan,
}
// put the work on the queue
udc.analysisQueue <- aw
}
}
}()
newRowsMap := make(map[string][]UpsideDownCouchRow)
// wait for the result
itemsDeQueued := 0
for itemsDeQueued < numUpdates {
result := <-resultChan
newRowsMap[result.docID] = result.rows
itemsDeQueued++
}
close(resultChan)
// start a writer for this batch
kvwriter := udc.store.Writer()
defer kvwriter.Close()
@ -564,7 +532,7 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error {
deleteRows = udc.deleteSingle(docID, backIndexRow, deleteRows)
docsDeleted++
} else if doc != nil {
addRows, updateRows, deleteRows = udc.updateSingle(doc, backIndexRow, addRows, updateRows, deleteRows)
addRows, updateRows, deleteRows = udc.mergeOldAndNew(backIndexRow, newRowsMap[docID], addRows, updateRows, deleteRows)
if backIndexRow == nil {
docsAdded++
}

View File

@ -31,7 +31,8 @@ func TestIndexOpenReopen(t *testing.T) {
defer os.RemoveAll("test")
store, err := boltdb.Open("test", "bleve")
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -57,7 +58,7 @@ func TestIndexOpenReopen(t *testing.T) {
if err != nil {
t.Fatalf("error opening store: %v", err)
}
idx = NewUpsideDownCouch(store)
idx = NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -71,7 +72,8 @@ func TestIndexInsert(t *testing.T) {
defer os.RemoveAll("test")
store, err := boltdb.Open("test", "bleve")
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -109,7 +111,8 @@ func TestIndexInsertThenDelete(t *testing.T) {
defer os.RemoveAll("test")
store, err := boltdb.Open("test", "bleve")
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -177,7 +180,8 @@ func TestIndexInsertThenUpdate(t *testing.T) {
defer os.RemoveAll("test")
store, err := boltdb.Open("test", "bleve")
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -226,7 +230,8 @@ func TestIndexInsertMultiple(t *testing.T) {
defer os.RemoveAll("test")
store, err := boltdb.Open("test", "bleve")
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -260,7 +265,7 @@ func TestIndexInsertMultiple(t *testing.T) {
// close and reopen and and one more to testing counting works correctly
idx.Close()
store, err = boltdb.Open("test", "bleve")
idx = NewUpsideDownCouch(store)
idx = NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -288,7 +293,8 @@ func TestIndexInsertWithStore(t *testing.T) {
if err != nil {
t.Error(err)
}
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -345,7 +351,8 @@ func TestIndexInternalCRUD(t *testing.T) {
defer os.RemoveAll("test")
store, err := boltdb.Open("test", "bleve")
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -405,7 +412,8 @@ func TestIndexBatch(t *testing.T) {
defer os.RemoveAll("test")
store, err := boltdb.Open("test", "bleve")
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -485,7 +493,8 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) {
if err != nil {
t.Error(err)
}
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -642,7 +651,8 @@ func TestIndexInsertFields(t *testing.T) {
if err != nil {
t.Error(err)
}
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -684,7 +694,8 @@ func TestIndexUpdateComposites(t *testing.T) {
if err != nil {
t.Error(err)
}
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -756,7 +767,8 @@ func TestIndexFieldsMisc(t *testing.T) {
if err != nil {
t.Error(err)
}
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -771,15 +783,15 @@ func TestIndexFieldsMisc(t *testing.T) {
t.Errorf("Error updating index: %v", err)
}
fieldName1 := idx.fieldIndexToName(1)
fieldName1 := idx.fieldIndexCache.FieldName(1)
if fieldName1 != "name" {
t.Errorf("expected field named 'name', got '%s'", fieldName1)
}
fieldName2 := idx.fieldIndexToName(2)
fieldName2 := idx.fieldIndexCache.FieldName(2)
if fieldName2 != "title" {
t.Errorf("expected field named 'title', got '%s'", fieldName2)
}
fieldName3 := idx.fieldIndexToName(3)
fieldName3 := idx.fieldIndexCache.FieldName(3)
if fieldName3 != "" {
t.Errorf("expected field named '', got '%s'", fieldName3)
}
@ -793,7 +805,8 @@ func TestIndexTermReaderCompositeFields(t *testing.T) {
if err != nil {
t.Error(err)
}
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)
@ -836,7 +849,8 @@ func TestIndexDocumentFieldTerms(t *testing.T) {
if err != nil {
t.Error(err)
}
idx := NewUpsideDownCouch(store)
analysisQueue := NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err = idx.Open()
if err != nil {
t.Errorf("error opening index: %v", err)

View File

@ -63,7 +63,7 @@ func newMemIndex(mapping *IndexMapping) (*indexImpl, error) {
}
// open open the index
rv.i = upside_down.NewUpsideDownCouch(rv.s)
rv.i = upside_down.NewUpsideDownCouch(rv.s, Config.analysisQueue)
err = rv.i.Open()
if err != nil {
return nil, err
@ -124,7 +124,7 @@ func newIndex(path string, mapping *IndexMapping) (*indexImpl, error) {
}
// open open the index
rv.i = upside_down.NewUpsideDownCouch(rv.s)
rv.i = upside_down.NewUpsideDownCouch(rv.s, Config.analysisQueue)
err = rv.i.Open()
if err != nil {
return nil, err
@ -176,7 +176,7 @@ func openIndex(path string) (*indexImpl, error) {
}
// open open the index
rv.i = upside_down.NewUpsideDownCouch(rv.s)
rv.i = upside_down.NewUpsideDownCouch(rv.s, Config.analysisQueue)
err = rv.i.Open()
if err != nil {
return nil, err
@ -223,8 +223,8 @@ func (i *indexImpl) Mapping() *IndexMapping {
// The IndexMapping for this index will determine
// how the object is indexed.
func (i *indexImpl) Index(id string, data interface{}) error {
i.mutex.Lock()
defer i.mutex.Unlock()
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return ErrorIndexClosed
@ -245,8 +245,8 @@ func (i *indexImpl) Index(id string, data interface{}) error {
// Delete entries for the specified identifier from
// the index.
func (i *indexImpl) Delete(id string) error {
i.mutex.Lock()
defer i.mutex.Unlock()
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return ErrorIndexClosed
@ -264,8 +264,8 @@ func (i *indexImpl) Delete(id string) error {
// significant performance benefits when performing
// operations in a batch.
func (i *indexImpl) Batch(b Batch) error {
i.mutex.Lock()
defer i.mutex.Unlock()
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return ErrorIndexClosed

View File

@ -25,7 +25,8 @@ var twoDocIndex index.Index //= upside_down.NewUpsideDownCouch(inmem.MustOpen())
func init() {
inMemStore, _ := inmem.Open()
twoDocIndex = upside_down.NewUpsideDownCouch(inMemStore)
analysisQueue := upside_down.NewAnalysisQueue(1)
twoDocIndex = upside_down.NewUpsideDownCouch(inMemStore, analysisQueue)
for _, doc := range twoDocIndexDocs {
twoDocIndex.Update(doc)
}

View File

@ -26,7 +26,8 @@ func TestTermSearcher(t *testing.T) {
var queryExplain = true
inMemStore, _ := inmem.Open()
i := upside_down.NewUpsideDownCouch(inMemStore)
analysisQueue := upside_down.NewAnalysisQueue(1)
i := upside_down.NewUpsideDownCouch(inMemStore, analysisQueue)
i.Update(&document.Document{
ID: "a",
Fields: []document.Field{