0
0
Fork 0

refactoring to allow pluggable index encodings

this lays the foundation for supporting the new firestorm
indexing scheme.  i'm merging these changes ahead of
the rest of the firestorm branch so i can continue
to make changes to the analysis pipeline in parallel
This commit is contained in:
Marty Schoch 2015-09-02 13:12:08 -04:00
parent 4840aaaa5a
commit dbb93b75a4
22 changed files with 364 additions and 238 deletions

View File

@ -15,7 +15,7 @@ import (
"log"
"time"
"github.com/blevesearch/bleve/index/upside_down"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/registry"
// token maps
@ -106,6 +106,9 @@ import (
_ "github.com/blevesearch/bleve/index/store/gtreap"
_ "github.com/blevesearch/bleve/index/store/inmem"
// index types
_ "github.com/blevesearch/bleve/index/upside_down"
// byte array converters
_ "github.com/blevesearch/bleve/analysis/byte_array_converters/ignore"
_ "github.com/blevesearch/bleve/analysis/byte_array_converters/json"
@ -118,18 +121,19 @@ type configuration struct {
Cache *registry.Cache
DefaultHighlighter string
DefaultKVStore string
DefaultIndexType string
SlowSearchLogThreshold time.Duration
analysisQueue *upside_down.AnalysisQueue
analysisQueue *index.AnalysisQueue
}
func (c *configuration) SetAnalysisQueueSize(n int) {
c.analysisQueue = upside_down.NewAnalysisQueue(n)
c.analysisQueue = index.NewAnalysisQueue(n)
}
func newConfiguration() *configuration {
return &configuration{
Cache: registry.NewCache(),
analysisQueue: upside_down.NewAnalysisQueue(4),
analysisQueue: index.NewAnalysisQueue(4),
}
}
@ -178,6 +182,9 @@ func init() {
// default kv store
Config.DefaultKVStore = "boltdb"
// default index
Config.DefaultIndexType = "upside_down"
bootDuration := time.Since(bootStart)
bleveExpVar.Add("bootDuration", int64(bootDuration))
}

View File

@ -24,6 +24,7 @@ const (
ErrorIndexClosed
ErrorAliasMulti
ErrorAliasEmpty
ErrorUnknownIndexType
)
// Error represents a more strongly typed bleve error for detecting
@ -48,4 +49,5 @@ var errorMessages = map[int]string{
int(ErrorIndexClosed): "index is closed",
int(ErrorAliasMulti): "cannot perform single index operation on multiple index alias",
int(ErrorAliasEmpty): "cannot perform operation on empty alias",
int(ErrorUnknownIndexType): "unknown index type",
}

View File

@ -126,18 +126,19 @@ type Classifier interface {
// The provided mapping will be used for all
// Index/Search operations.
func New(path string, mapping *IndexMapping) (Index, error) {
return newIndexUsing(path, mapping, Config.DefaultKVStore, nil)
return newIndexUsing(path, mapping, Config.DefaultIndexType, Config.DefaultKVStore, nil)
}
// NewUsing creates index at the specified path,
// which must not already exist.
// The provided mapping will be used for all
// Index/Search operations.
// The specified index type will be used
// The specified kvstore implemenation will be used
// and the provided kvconfig will be passed to its
// constructor.
func NewUsing(path string, mapping *IndexMapping, kvstore string, kvconfig map[string]interface{}) (Index, error) {
return newIndexUsing(path, mapping, kvstore, kvconfig)
func NewUsing(path string, mapping *IndexMapping, indexType string, kvstore string, kvconfig map[string]interface{}) (Index, error) {
return newIndexUsing(path, mapping, indexType, kvstore, kvconfig)
}
// Open index at the specified path, must exist.

73
index/analysis.go Normal file
View File

@ -0,0 +1,73 @@
// Copyright (c) 2015 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package index
import "github.com/blevesearch/bleve/document"
type IndexRow interface {
Key() []byte
Value() []byte
}
type AnalysisResult struct {
DocID string
Rows []IndexRow
}
type AnalysisWork struct {
i Index
d *document.Document
rc chan *AnalysisResult
}
func NewAnalysisWork(i Index, d *document.Document, rc chan *AnalysisResult) *AnalysisWork {
return &AnalysisWork{
i: i,
d: d,
rc: rc,
}
}
type AnalysisQueue struct {
queue chan *AnalysisWork
done chan struct{}
}
func (q *AnalysisQueue) Queue(work *AnalysisWork) {
q.queue <- work
}
func (q *AnalysisQueue) Close() {
close(q.done)
}
func NewAnalysisQueue(numWorkers int) *AnalysisQueue {
rv := AnalysisQueue{
queue: make(chan *AnalysisWork),
done: make(chan struct{}),
}
for i := 0; i < numWorkers; i++ {
go AnalysisWorker(rv)
}
return &rv
}
func AnalysisWorker(q AnalysisQueue) {
// read work off the queue
for {
select {
case <-q.done:
return
case w := <-q.queue:
r := w.i.Analyze(w.d)
w.rc <- r
}
}
}

View File

@ -1,4 +1,4 @@
// Copyright (c) 2014 Couchbase, Inc.
// Copyright (c) 2015 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
@ -7,25 +7,26 @@
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package upside_down
package index
import (
"sync"
)
type FieldIndexCache struct {
type FieldCache struct {
fieldIndexes map[string]uint16
lastFieldIndex int
mutex sync.RWMutex
}
func NewFieldIndexCache() *FieldIndexCache {
return &FieldIndexCache{
fieldIndexes: make(map[string]uint16),
func NewFieldCache() *FieldCache {
return &FieldCache{
fieldIndexes: make(map[string]uint16),
lastFieldIndex: -1,
}
}
func (f *FieldIndexCache) AddExisting(field string, index uint16) {
func (f *FieldCache) AddExisting(field string, index uint16) {
f.mutex.Lock()
defer f.mutex.Unlock()
f.fieldIndexes[field] = index
@ -34,30 +35,30 @@ func (f *FieldIndexCache) AddExisting(field string, index uint16) {
}
}
func (f *FieldIndexCache) FieldExists(field string) (uint16, bool) {
// FieldNamed returns the index of the field, and whether or not it existed
// before this call. if createIfMissing is true, and new field index is assigned
// but the second return value will still be false
func (f *FieldCache) FieldNamed(field string, createIfMissing bool) (uint16, bool) {
f.mutex.RLock()
defer f.mutex.RUnlock()
if index, ok := f.fieldIndexes[field]; ok {
f.mutex.RUnlock()
return index, true
} else if !createIfMissing {
f.mutex.RUnlock()
return 0, false
}
return 0, false
}
func (f *FieldIndexCache) FieldIndex(field string) (uint16, *FieldRow) {
// trade read lock for write lock
f.mutex.RUnlock()
f.mutex.Lock()
defer f.mutex.Unlock()
index, exists := f.fieldIndexes[field]
if exists {
return index, nil
}
// assign next field id
index = uint16(f.lastFieldIndex + 1)
index := uint16(f.lastFieldIndex + 1)
f.fieldIndexes[field] = index
f.lastFieldIndex = int(index)
return index, NewFieldRow(uint16(index), field)
f.mutex.Unlock()
return index, false
}
func (f *FieldIndexCache) FieldName(index uint16) string {
func (f *FieldCache) FieldIndexed(index uint16) string {
f.mutex.RLock()
defer f.mutex.RUnlock()
for fieldName, fieldIndex := range f.fieldIndexes {

View File

@ -36,6 +36,8 @@ type Index interface {
Reader() (IndexReader, error)
Stats() json.Marshaler
Analyze(d *document.Document) *AnalysisResult
}
type IndexReader interface {

View File

@ -0,0 +1,76 @@
// Copyright (c) 2015 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package upside_down
import (
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
)
func (udc *UpsideDownCouch) Analyze(d *document.Document) *index.AnalysisResult {
rv := &index.AnalysisResult{
DocID: d.ID,
Rows: make([]index.IndexRow, 0, 100),
}
// track our back index entries
backIndexTermEntries := make([]*BackIndexTermEntry, 0)
backIndexStoredEntries := make([]*BackIndexStoreEntry, 0)
for _, field := range d.Fields {
fieldIndex, newFieldRow := udc.fieldIndexOrNewRow(field.Name())
if newFieldRow != nil {
rv.Rows = append(rv.Rows, newFieldRow)
}
if field.Options().IsIndexed() {
fieldLength, tokenFreqs := field.Analyze()
// see if any of the composite fields need this
for _, compositeField := range d.CompositeFields {
compositeField.Compose(field.Name(), fieldLength, tokenFreqs)
}
// encode this field
indexRows, indexBackIndexTermEntries := udc.indexField(d.ID, field, fieldIndex, fieldLength, tokenFreqs)
rv.Rows = append(rv.Rows, indexRows...)
backIndexTermEntries = append(backIndexTermEntries, indexBackIndexTermEntries...)
}
if field.Options().IsStored() {
storeRows, indexBackIndexStoreEntries := udc.storeField(d.ID, field, fieldIndex)
rv.Rows = append(rv.Rows, storeRows...)
backIndexStoredEntries = append(backIndexStoredEntries, indexBackIndexStoreEntries...)
}
}
// now index the composite fields
for _, compositeField := range d.CompositeFields {
fieldIndex, newFieldRow := udc.fieldIndexOrNewRow(compositeField.Name())
if newFieldRow != nil {
rv.Rows = append(rv.Rows, newFieldRow)
}
if compositeField.Options().IsIndexed() {
fieldLength, tokenFreqs := compositeField.Analyze()
// encode this field
indexRows, indexBackIndexTermEntries := udc.indexField(d.ID, compositeField, fieldIndex, fieldLength, tokenFreqs)
rv.Rows = append(rv.Rows, indexRows...)
backIndexTermEntries = append(backIndexTermEntries, indexBackIndexTermEntries...)
}
}
// build the back index row
backIndexRow := NewBackIndexRow(d.ID, backIndexTermEntries, backIndexStoredEntries)
rv.Rows = append(rv.Rows, backIndexRow)
return rv
}

View File

@ -1,119 +0,0 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package upside_down
import "github.com/blevesearch/bleve/document"
type AnalysisResult struct {
docID string
rows []UpsideDownCouchRow
}
type AnalysisWork struct {
udc *UpsideDownCouch
d *document.Document
rc chan *AnalysisResult
}
type AnalysisQueue struct {
queue chan *AnalysisWork
done chan struct{}
}
func (q *AnalysisQueue) Queue(work *AnalysisWork) {
q.queue <- work
}
func (q *AnalysisQueue) Close() {
close(q.done)
}
func NewAnalysisQueue(numWorkers int) *AnalysisQueue {
rv := AnalysisQueue{
queue: make(chan *AnalysisWork),
done: make(chan struct{}),
}
for i := 0; i < numWorkers; i++ {
go AnalysisWorker(rv)
}
return &rv
}
func AnalysisWorker(q AnalysisQueue) {
// read work off the queue
for {
select {
case <-q.done:
return
case w := <-q.queue:
rv := &AnalysisResult{
docID: w.d.ID,
rows: make([]UpsideDownCouchRow, 0, 100),
}
// track our back index entries
backIndexTermEntries := make([]*BackIndexTermEntry, 0)
backIndexStoredEntries := make([]*BackIndexStoreEntry, 0)
for _, field := range w.d.Fields {
fieldIndex, newFieldRow := w.udc.fieldIndexCache.FieldIndex(field.Name())
if newFieldRow != nil {
rv.rows = append(rv.rows, newFieldRow)
}
if field.Options().IsIndexed() {
fieldLength, tokenFreqs := field.Analyze()
// see if any of the composite fields need this
for _, compositeField := range w.d.CompositeFields {
compositeField.Compose(field.Name(), fieldLength, tokenFreqs)
}
// encode this field
indexRows, indexBackIndexTermEntries := w.udc.indexField(w.d.ID, field, fieldIndex, fieldLength, tokenFreqs)
rv.rows = append(rv.rows, indexRows...)
backIndexTermEntries = append(backIndexTermEntries, indexBackIndexTermEntries...)
}
if field.Options().IsStored() {
storeRows, indexBackIndexStoreEntries := w.udc.storeField(w.d.ID, field, fieldIndex)
rv.rows = append(rv.rows, storeRows...)
backIndexStoredEntries = append(backIndexStoredEntries, indexBackIndexStoreEntries...)
}
}
// now index the composite fields
for _, compositeField := range w.d.CompositeFields {
fieldIndex, newFieldRow := w.udc.fieldIndexCache.FieldIndex(compositeField.Name())
if newFieldRow != nil {
rv.rows = append(rv.rows, newFieldRow)
}
if compositeField.Options().IsIndexed() {
fieldLength, tokenFreqs := compositeField.Analyze()
// encode this field
indexRows, indexBackIndexTermEntries := w.udc.indexField(w.d.ID, compositeField, fieldIndex, fieldLength, tokenFreqs)
rv.rows = append(rv.rows, indexRows...)
backIndexTermEntries = append(backIndexTermEntries, indexBackIndexTermEntries...)
}
}
// build the back index row
backIndexRow := NewBackIndexRow(w.d.ID, backIndexTermEntries, backIndexStoredEntries)
rv.rows = append(rv.rows, backIndexRow)
w.rc <- rv
}
}
}

View File

@ -54,7 +54,7 @@ func CommonBenchmarkIndex(b *testing.B, create KVStoreCreate, destroy KVStoreDes
if err != nil {
b.Fatal(err)
}
analysisQueue := NewAnalysisQueue(analysisWorkers)
analysisQueue := index.NewAnalysisQueue(analysisWorkers)
idx := NewUpsideDownCouch(s, analysisQueue)
err = idx.Open()
@ -97,7 +97,7 @@ func CommonBenchmarkIndexBatch(b *testing.B, create KVStoreCreate, destroy KVSto
if err != nil {
b.Fatal(err)
}
analysisQueue := NewAnalysisQueue(analysisWorkers)
analysisQueue := index.NewAnalysisQueue(analysisWorkers)
idx := NewUpsideDownCouch(s, analysisQueue)
err = idx.Open()

View File

@ -10,11 +10,13 @@
package upside_down
import (
"github.com/blevesearch/bleve/index/store/boltdb"
"os"
"testing"
"time"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/store/boltdb"
"github.com/blevesearch/bleve/document"
)
@ -28,7 +30,7 @@ func TestDump(t *testing.T) {
s := boltdb.New("test", "bleve")
s.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(s, analysisQueue)
err := idx.Open()
if err != nil {

View File

@ -15,6 +15,7 @@ import (
"testing"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/store/boltdb"
)
@ -28,7 +29,7 @@ func TestIndexFieldDict(t *testing.T) {
s := boltdb.New("test", "bleve")
s.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(s, analysisQueue)
err := idx.Open()
if err != nil {

View File

@ -24,7 +24,7 @@ type IndexReader struct {
}
func (i *IndexReader) TermFieldReader(term []byte, fieldName string) (index.TermFieldReader, error) {
fieldIndex, fieldExists := i.index.fieldIndexCache.FieldExists(fieldName)
fieldIndex, fieldExists := i.index.fieldCache.FieldNamed(fieldName, false)
if fieldExists {
return newUpsideDownCouchTermFieldReader(i, term, uint16(fieldIndex))
}
@ -36,7 +36,7 @@ func (i *IndexReader) FieldDict(fieldName string) (index.FieldDict, error) {
}
func (i *IndexReader) FieldDictRange(fieldName string, startTerm []byte, endTerm []byte) (index.FieldDict, error) {
fieldIndex, fieldExists := i.index.fieldIndexCache.FieldExists(fieldName)
fieldIndex, fieldExists := i.index.fieldCache.FieldNamed(fieldName, false)
if fieldExists {
return newUpsideDownCouchFieldDict(i, uint16(fieldIndex), startTerm, endTerm)
}
@ -87,7 +87,7 @@ func (i *IndexReader) Document(id string) (doc *document.Document, err error) {
return
}
if row != nil {
fieldName := i.index.fieldIndexCache.FieldName(row.field)
fieldName := i.index.fieldCache.FieldIndexed(row.field)
field := decodeFieldType(row.typ, fieldName, row.arrayPositions, row.value)
if field != nil {
doc.AddField(field)
@ -107,7 +107,7 @@ func (i *IndexReader) DocumentFieldTerms(id string) (index.FieldTerms, error) {
}
rv := make(index.FieldTerms, len(back.termEntries))
for _, entry := range back.termEntries {
fieldName := i.index.fieldIndexCache.FieldName(uint16(*entry.Field))
fieldName := i.index.fieldCache.FieldIndexed(uint16(*entry.Field))
terms, ok := rv[fieldName]
if !ok {
terms = make([]string, 0)

View File

@ -29,7 +29,7 @@ func TestIndexReader(t *testing.T) {
s := boltdb.New("test", "bleve")
s.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(s, analysisQueue)
err := idx.Open()
if err != nil {
@ -203,7 +203,7 @@ func TestIndexDocIdReader(t *testing.T) {
s := boltdb.New("test", "bleve")
s.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(s, analysisQueue)
err := idx.Open()
if err != nil {

View File

@ -22,10 +22,13 @@ import (
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/registry"
"github.com/golang/protobuf/proto"
)
const Name = "upside_down"
var VersionKey = []byte{'v'}
var UnsafeBatchUseDetected = fmt.Errorf("bleve.Batch is NOT thread-safe, modification after execution detected")
@ -35,25 +38,25 @@ const Version uint8 = 5
var IncompatibleVersion = fmt.Errorf("incompatible version, %d is supported", Version)
type UpsideDownCouch struct {
version uint8
path string
store store.KVStore
fieldIndexCache *FieldIndexCache
analysisQueue *AnalysisQueue
stats *indexStat
version uint8
path string
store store.KVStore
fieldCache *index.FieldCache
analysisQueue *index.AnalysisQueue
stats *indexStat
m sync.RWMutex
// fields protected by m
docCount uint64
}
func NewUpsideDownCouch(s store.KVStore, analysisQueue *AnalysisQueue) *UpsideDownCouch {
func NewUpsideDownCouch(s store.KVStore, analysisQueue *index.AnalysisQueue) *UpsideDownCouch {
return &UpsideDownCouch{
version: Version,
fieldIndexCache: NewFieldIndexCache(),
store: s,
analysisQueue: analysisQueue,
stats: &indexStat{},
version: Version,
fieldCache: index.NewFieldCache(),
store: s,
analysisQueue: analysisQueue,
stats: &indexStat{},
}
}
@ -90,7 +93,7 @@ func (udc *UpsideDownCouch) loadSchema(kvreader store.KVReader) (err error) {
if err != nil {
return
}
udc.fieldIndexCache.AddExisting(fieldRow.name, fieldRow.index)
udc.fieldCache.AddExisting(fieldRow.name, fieldRow.index)
it.Next()
key, val, valid = it.Current()
@ -263,15 +266,11 @@ func (udc *UpsideDownCouch) Close() error {
func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) {
// do analysis before acquiring write lock
analysisStart := time.Now()
resultChan := make(chan *AnalysisResult)
aw := AnalysisWork{
udc: udc,
d: doc,
rc: resultChan,
}
resultChan := make(chan *index.AnalysisResult)
aw := index.NewAnalysisWork(udc, doc, resultChan)
// put the work on the queue
go func() {
udc.analysisQueue.Queue(&aw)
udc.analysisQueue.Queue(aw)
}()
// wait for the result
@ -306,7 +305,7 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) {
updateRows := make([]UpsideDownCouchRow, 0)
deleteRows := make([]UpsideDownCouchRow, 0)
addRows, updateRows, deleteRows = udc.mergeOldAndNew(backIndexRow, result.rows, addRows, updateRows, deleteRows)
addRows, updateRows, deleteRows = udc.mergeOldAndNew(backIndexRow, result.Rows, addRows, updateRows, deleteRows)
err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows)
if err == nil && backIndexRow == nil {
@ -323,7 +322,7 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) {
return
}
func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows, addRows, updateRows, deleteRows []UpsideDownCouchRow) ([]UpsideDownCouchRow, []UpsideDownCouchRow, []UpsideDownCouchRow) {
func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []index.IndexRow, addRows, updateRows, deleteRows []UpsideDownCouchRow) ([]UpsideDownCouchRow, []UpsideDownCouchRow, []UpsideDownCouchRow) {
existingTermKeys := make(map[string]bool)
for _, key := range backIndexRow.AllTermKeys() {
existingTermKeys[string(key)] = true
@ -377,8 +376,8 @@ func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows, add
return addRows, updateRows, deleteRows
}
func (udc *UpsideDownCouch) storeField(docID string, field document.Field, fieldIndex uint16) ([]UpsideDownCouchRow, []*BackIndexStoreEntry) {
rows := make([]UpsideDownCouchRow, 0, 100)
func (udc *UpsideDownCouch) storeField(docID string, field document.Field, fieldIndex uint16) ([]index.IndexRow, []*BackIndexStoreEntry) {
rows := make([]index.IndexRow, 0, 100)
backIndexStoredEntries := make([]*BackIndexStoreEntry, 0)
fieldType := encodeFieldType(field)
storedRow := NewStoredRow(docID, fieldIndex, field.ArrayPositions(), fieldType, field.Value())
@ -406,9 +405,9 @@ func encodeFieldType(f document.Field) byte {
return fieldType
}
func (udc *UpsideDownCouch) indexField(docID string, field document.Field, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies) ([]UpsideDownCouchRow, []*BackIndexTermEntry) {
func (udc *UpsideDownCouch) indexField(docID string, field document.Field, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies) ([]index.IndexRow, []*BackIndexTermEntry) {
rows := make([]UpsideDownCouchRow, 0, 100)
rows := make([]index.IndexRow, 0, 100)
backIndexTermEntries := make([]*BackIndexTermEntry, 0)
fieldNorm := float32(1.0 / math.Sqrt(float64(fieldLength)))
@ -542,16 +541,16 @@ func frequencyFromTokenFreq(tf *analysis.TokenFreq) int {
return len(tf.Locations)
}
func (udc *UpsideDownCouch) termVectorsFromTokenFreq(field uint16, tf *analysis.TokenFreq) ([]*TermVector, []UpsideDownCouchRow) {
func (udc *UpsideDownCouch) termVectorsFromTokenFreq(field uint16, tf *analysis.TokenFreq) ([]*TermVector, []index.IndexRow) {
rv := make([]*TermVector, len(tf.Locations))
newFieldRows := make([]UpsideDownCouchRow, 0)
newFieldRows := make([]index.IndexRow, 0)
for i, l := range tf.Locations {
var newFieldRow *FieldRow
fieldIndex := field
if l.Field != "" {
// lookup correct field
fieldIndex, newFieldRow = udc.fieldIndexCache.FieldIndex(l.Field)
fieldIndex, newFieldRow = udc.fieldIndexOrNewRow(l.Field)
if newFieldRow != nil {
newFieldRows = append(newFieldRows, newFieldRow)
}
@ -573,7 +572,7 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) []
rv := make([]*index.TermFieldVector, len(in))
for i, tv := range in {
fieldName := udc.fieldIndexCache.FieldName(tv.field)
fieldName := udc.fieldCache.FieldIndexed(tv.field)
tfv := index.TermFieldVector{
Field: fieldName,
ArrayPositions: tv.arrayPositions,
@ -588,7 +587,7 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) []
func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
analysisStart := time.Now()
resultChan := make(chan *AnalysisResult)
resultChan := make(chan *index.AnalysisResult)
var numUpdates uint64
for _, doc := range batch.IndexOps {
@ -611,23 +610,19 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
detectedUnsafeMutex.Unlock()
return
}
aw := AnalysisWork{
udc: udc,
d: doc,
rc: resultChan,
}
aw := index.NewAnalysisWork(udc, doc, resultChan)
// put the work on the queue
udc.analysisQueue.Queue(&aw)
udc.analysisQueue.Queue(aw)
}
}
}()
newRowsMap := make(map[string][]UpsideDownCouchRow)
newRowsMap := make(map[string][]index.IndexRow)
// wait for the result
var itemsDeQueued uint64
for itemsDeQueued < numUpdates {
result := <-resultChan
newRowsMap[result.docID] = result.rows
newRowsMap[result.DocID] = result.Rows
itemsDeQueued++
}
close(resultChan)
@ -760,3 +755,19 @@ func (udc *UpsideDownCouch) Reader() (index.IndexReader, error) {
func (udc *UpsideDownCouch) Stats() json.Marshaler {
return udc.stats
}
func (udc *UpsideDownCouch) fieldIndexOrNewRow(name string) (uint16, *FieldRow) {
index, existed := udc.fieldCache.FieldNamed(name, true)
if !existed {
return index, NewFieldRow(uint16(index), name)
}
return index, nil
}
func IndexTypeConstructor(store store.KVStore, analysisQueue *index.AnalysisQueue) (index.Index, error) {
return NewUpsideDownCouch(store, analysisQueue), nil
}
func init() {
registry.RegisterIndexType(Name, IndexTypeConstructor)
}

View File

@ -37,7 +37,7 @@ func TestIndexOpenReopen(t *testing.T) {
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err := idx.Open()
if err != nil {
@ -94,7 +94,7 @@ func TestIndexInsert(t *testing.T) {
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err := idx.Open()
if err != nil {
@ -153,7 +153,7 @@ func TestIndexInsertThenDelete(t *testing.T) {
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err := idx.Open()
if err != nil {
@ -248,7 +248,7 @@ func TestIndexInsertThenUpdate(t *testing.T) {
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err := idx.Open()
if err != nil {
@ -315,7 +315,7 @@ func TestIndexInsertMultiple(t *testing.T) {
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err := idx.Open()
if err != nil {
@ -397,7 +397,7 @@ func TestIndexInsertWithStore(t *testing.T) {
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err := idx.Open()
if err != nil {
@ -483,7 +483,7 @@ func TestIndexInternalCRUD(t *testing.T) {
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err := idx.Open()
if err != nil {
@ -576,7 +576,7 @@ func TestIndexBatch(t *testing.T) {
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err := idx.Open()
if err != nil {
@ -673,7 +673,7 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) {
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err := idx.Open()
if err != nil {
@ -865,7 +865,7 @@ func TestIndexInsertFields(t *testing.T) {
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err := idx.Open()
if err != nil {
@ -924,7 +924,7 @@ func TestIndexUpdateComposites(t *testing.T) {
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err := idx.Open()
if err != nil {
@ -1020,7 +1020,7 @@ func TestIndexFieldsMisc(t *testing.T) {
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err := idx.Open()
if err != nil {
@ -1041,15 +1041,15 @@ func TestIndexFieldsMisc(t *testing.T) {
t.Errorf("Error updating index: %v", err)
}
fieldName1 := idx.fieldIndexCache.FieldName(1)
fieldName1 := idx.fieldCache.FieldIndexed(0)
if fieldName1 != "name" {
t.Errorf("expected field named 'name', got '%s'", fieldName1)
}
fieldName2 := idx.fieldIndexCache.FieldName(2)
fieldName2 := idx.fieldCache.FieldIndexed(1)
if fieldName2 != "title" {
t.Errorf("expected field named 'title', got '%s'", fieldName2)
}
fieldName3 := idx.fieldIndexCache.FieldName(3)
fieldName3 := idx.fieldCache.FieldIndexed(2)
if fieldName3 != "" {
t.Errorf("expected field named '', got '%s'", fieldName3)
}
@ -1066,7 +1066,7 @@ func TestIndexTermReaderCompositeFields(t *testing.T) {
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err := idx.Open()
if err != nil {
@ -1126,7 +1126,7 @@ func TestIndexDocumentFieldTerms(t *testing.T) {
store := boltdb.New("test", "bleve")
store.SetMergeOperator(&mergeOperator)
analysisQueue := NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
idx := NewUpsideDownCouch(store, analysisQueue)
err := idx.Open()
if err != nil {

View File

@ -20,7 +20,6 @@ import (
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/index/upside_down"
"github.com/blevesearch/bleve/registry"
"github.com/blevesearch/bleve/search"
"github.com/blevesearch/bleve/search/collectors"
@ -46,11 +45,11 @@ func indexStorePath(path string) string {
return path + string(os.PathSeparator) + storePath
}
func newMemIndex(mapping *IndexMapping) (*indexImpl, error) {
func newMemIndex(indexType string, mapping *IndexMapping) (*indexImpl, error) {
rv := indexImpl{
path: "",
m: mapping,
meta: newIndexMeta("mem", nil),
meta: newIndexMeta(indexType, "mem", nil),
stats: &IndexStat{},
}
@ -66,7 +65,15 @@ func newMemIndex(mapping *IndexMapping) (*indexImpl, error) {
}
// open the index
rv.i = upside_down.NewUpsideDownCouch(rv.s, Config.analysisQueue)
indexTypeConstructor := registry.IndexTypeConstructorByName(rv.meta.IndexType)
if indexTypeConstructor == nil {
return nil, ErrorUnknownIndexType
}
rv.i, err = indexTypeConstructor(rv.s, Config.analysisQueue)
if err != nil {
return nil, err
}
err = rv.i.Open()
if err != nil {
return nil, err
@ -90,7 +97,7 @@ func newMemIndex(mapping *IndexMapping) (*indexImpl, error) {
return &rv, nil
}
func newIndexUsing(path string, mapping *IndexMapping, kvstore string, kvconfig map[string]interface{}) (*indexImpl, error) {
func newIndexUsing(path string, mapping *IndexMapping, indexType string, kvstore string, kvconfig map[string]interface{}) (*indexImpl, error) {
// first validate the mapping
err := mapping.validate()
if err != nil {
@ -98,7 +105,7 @@ func newIndexUsing(path string, mapping *IndexMapping, kvstore string, kvconfig
}
if path == "" {
return newMemIndex(mapping)
return newMemIndex(indexType, mapping)
}
if kvconfig == nil {
@ -108,7 +115,7 @@ func newIndexUsing(path string, mapping *IndexMapping, kvstore string, kvconfig
rv := indexImpl{
path: path,
m: mapping,
meta: newIndexMeta(kvstore, kvconfig),
meta: newIndexMeta(indexType, kvstore, kvconfig),
stats: &IndexStat{},
}
storeConstructor := registry.KVStoreConstructorByName(rv.meta.Storage)
@ -131,7 +138,15 @@ func newIndexUsing(path string, mapping *IndexMapping, kvstore string, kvconfig
}
// open the index
rv.i = upside_down.NewUpsideDownCouch(rv.s, Config.analysisQueue)
indexTypeConstructor := registry.IndexTypeConstructorByName(rv.meta.IndexType)
if indexTypeConstructor == nil {
return nil, ErrorUnknownIndexType
}
rv.i, err = indexTypeConstructor(rv.s, Config.analysisQueue)
if err != nil {
return nil, err
}
err = rv.i.Open()
if err != nil {
return nil, err
@ -191,7 +206,15 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde
}
// open the index
rv.i = upside_down.NewUpsideDownCouch(rv.s, Config.analysisQueue)
indexTypeConstructor := registry.IndexTypeConstructorByName(rv.meta.IndexType)
if indexTypeConstructor == nil {
return nil, ErrorUnknownIndexType
}
rv.i, err = indexTypeConstructor(rv.s, Config.analysisQueue)
if err != nil {
return nil, err
}
err = rv.i.Open()
if err != nil {
return nil, err

View File

@ -18,14 +18,16 @@ import (
const metaFilename = "index_meta.json"
type indexMeta struct {
Storage string `json:"storage"`
Config map[string]interface{} `json:"config,omitempty"`
Storage string `json:"storage"`
IndexType string `json:"index_type"`
Config map[string]interface{} `json:"config,omitempty"`
}
func newIndexMeta(storage string, config map[string]interface{}) *indexMeta {
func newIndexMeta(indexType string, storage string, config map[string]interface{}) *indexMeta {
return &indexMeta{
Storage: storage,
Config: config,
IndexType: indexType,
Storage: storage,
Config: config,
}
}

47
registry/index_type.go Normal file
View File

@ -0,0 +1,47 @@
// Copyright (c) 2015 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package registry
import (
"fmt"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/store"
)
func RegisterIndexType(name string, constructor IndexTypeConstructor) {
_, exists := index_types[name]
if exists {
panic(fmt.Errorf("attempted to register duplicate index encoding named '%s'", name))
}
index_types[name] = constructor
}
type IndexTypeConstructor func(store.KVStore, *index.AnalysisQueue) (index.Index, error)
type IndexTypeRegistry map[string]IndexTypeConstructor
func IndexTypeConstructorByName(name string) IndexTypeConstructor {
return index_types[name]
}
func IndexTypesAndInstances() ([]string, []string) {
emptyConfig := map[string]interface{}{}
types := make([]string, 0)
instances := make([]string, 0)
for name, cons := range stores {
_, err := cons(emptyConfig)
if err == nil {
instances = append(instances, name)
} else {
types = append(types, name)
}
}
return types, instances
}

View File

@ -17,6 +17,7 @@ import (
)
var stores = make(KVStoreRegistry, 0)
var index_types = make(IndexTypeRegistry, 0)
var byteArrayConverters = make(ByteArrayConverterRegistry, 0)

View File

@ -25,7 +25,7 @@ var twoDocIndex index.Index //= upside_down.NewUpsideDownCouch(inmem.MustOpen())
func init() {
inMemStore, _ := inmem.New()
analysisQueue := upside_down.NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
twoDocIndex = upside_down.NewUpsideDownCouch(inMemStore, analysisQueue)
err := twoDocIndex.Open()
if err != nil {

View File

@ -14,6 +14,7 @@ import (
"testing"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/store/inmem"
"github.com/blevesearch/bleve/index/upside_down"
)
@ -26,7 +27,7 @@ func TestTermSearcher(t *testing.T) {
var queryExplain = true
inMemStore, _ := inmem.New()
analysisQueue := upside_down.NewAnalysisQueue(1)
analysisQueue := index.NewAnalysisQueue(1)
i := upside_down.NewUpsideDownCouch(inMemStore, analysisQueue)
err := i.Open()
if err != nil {

View File

@ -20,7 +20,8 @@ import (
var indexPath = flag.String("index", "", "index path")
var mappingFile = flag.String("mapping", "", "mapping file")
var storeType = flag.String("store", "", "store type")
var storeType = flag.String("store", bleve.Config.DefaultKVStore, "store type")
var indexType = flag.String("indexType", bleve.Config.DefaultIndexType, "index type")
func main() {
@ -44,13 +45,7 @@ func main() {
}
// create the index
var index bleve.Index
var err error
if *storeType != "" {
index, err = bleve.NewUsing(*indexPath, mapping, *storeType, nil)
} else {
index, err = bleve.New(*indexPath, mapping)
}
index, err := bleve.NewUsing(*indexPath, mapping, *indexType, *storeType, nil)
if err != nil {
log.Fatal(err)
}