0
0
Fork 0

made it safe to use bleve.Index object from multiple threads

an RWMutext ensures that only one write op is done at a time, and
that all other ops have finished prior to closing
This commit is contained in:
Marty Schoch 2014-08-25 09:06:53 -04:00
parent 3309c698f8
commit 34afb0929e
3 changed files with 104 additions and 6 deletions

View File

@ -19,6 +19,7 @@ const (
ERROR_PHRASE_QUERY_NO_TERMS
ERROR_UNKNOWN_QUERY_TYPE
ERROR_UNKNOWN_STORAGE_TYPE
ERROR_INDEX_CLOSED
)
type Error int
@ -38,4 +39,5 @@ var errorMessages = map[int]string{
int(ERROR_PHRASE_QUERY_NO_TERMS): "phrase query must contain at least one term",
int(ERROR_UNKNOWN_QUERY_TYPE): "unknown query type",
int(ERROR_UNKNOWN_STORAGE_TYPE): "unkown storage type",
int(ERROR_INDEX_CLOSED): "index is closed",
}

View File

@ -44,6 +44,8 @@ type Index interface {
DumpFields() chan interface{}
Close()
Mapping() *IndexMapping
}
type Classifier interface {

View File

@ -12,6 +12,7 @@ import (
"encoding/json"
"fmt"
"os"
"sync"
"time"
"github.com/couchbaselabs/bleve/document"
@ -23,11 +24,13 @@ import (
)
type indexImpl struct {
path string
meta *indexMeta
s store.KVStore
i index.Index
m *IndexMapping
path string
meta *indexMeta
s store.KVStore
i index.Index
m *IndexMapping
mutex sync.RWMutex
open bool
}
const storePath = "store"
@ -72,6 +75,11 @@ func newMemIndex(mapping *IndexMapping) (*indexImpl, error) {
if err != nil {
return nil, err
}
// mark the index as open
rv.mutex.Lock()
defer rv.mutex.Unlock()
rv.open = true
return &rv, nil
}
@ -128,6 +136,11 @@ func newIndex(path string, mapping *IndexMapping) (*indexImpl, error) {
if err != nil {
return nil, err
}
// mark the index as open
rv.mutex.Lock()
defer rv.mutex.Unlock()
rv.open = true
return &rv, nil
}
@ -178,17 +191,35 @@ func openIndex(path string) (*indexImpl, error) {
return nil, err
}
// mark the index as open
rv.mutex.Lock()
defer rv.mutex.Unlock()
rv.open = true
// validate the mapping
err = im.Validate()
if err != nil {
return nil, err
// note even if the mapping is invalid
// we still return an open usable index
return &rv, err
}
rv.m = &im
return &rv, nil
}
func (i *indexImpl) Mapping() *IndexMapping {
return i.m
}
func (i *indexImpl) Index(id string, data interface{}) error {
i.mutex.Lock()
defer i.mutex.Unlock()
if !i.open {
return ERROR_INDEX_CLOSED
}
doc := document.NewDocument(id)
err := i.m.MapDocument(doc, data)
if err != nil {
@ -202,6 +233,13 @@ func (i *indexImpl) Index(id string, data interface{}) error {
}
func (i *indexImpl) Delete(id string) error {
i.mutex.Lock()
defer i.mutex.Unlock()
if !i.open {
return ERROR_INDEX_CLOSED
}
err := i.i.Delete(id)
if err != nil {
return err
@ -210,6 +248,13 @@ func (i *indexImpl) Delete(id string) error {
}
func (i *indexImpl) Batch(b Batch) error {
i.mutex.Lock()
defer i.mutex.Unlock()
if !i.open {
return ERROR_INDEX_CLOSED
}
ib := make(index.Batch, len(b))
for bk, bd := range b {
if bd == nil {
@ -227,14 +272,34 @@ func (i *indexImpl) Batch(b Batch) error {
}
func (i *indexImpl) Document(id string) (*document.Document, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil, ERROR_INDEX_CLOSED
}
return i.i.Document(id)
}
func (i *indexImpl) DocCount() uint64 {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return 0
}
return i.i.DocCount()
}
func (i *indexImpl) Search(req *SearchRequest) (*SearchResult, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil, ERROR_INDEX_CLOSED
}
collector := search.NewTopScorerSkipCollector(req.Size, req.From)
searcher, err := req.Query.Searcher(i, req.Explain)
if err != nil {
@ -350,21 +415,50 @@ func (i *indexImpl) Search(req *SearchRequest) (*SearchResult, error) {
}
func (i *indexImpl) DumpAll() chan interface{} {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil
}
return i.i.DumpAll()
}
func (i *indexImpl) Fields() ([]string, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil, ERROR_INDEX_CLOSED
}
return i.i.Fields()
}
func (i *indexImpl) DumpFields() chan interface{} {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil
}
return i.i.DumpFields()
}
func (i *indexImpl) DumpDoc(id string) chan interface{} {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil
}
return i.i.DumpDoc(id)
}
func (i *indexImpl) Close() {
i.mutex.Lock()
defer i.mutex.Unlock()
i.open = false
i.i.Close()
}