0
0
Fork 0

refactored API a bit

more things can return error now
in a couple of places we had to swallow errors because they didn't
fit the existing API.  in these case and proactively in a few
others we now return error as well.

also the batch API has been updated to allow performing
set/delete internal within the batch
This commit is contained in:
Marty Schoch 2014-10-31 09:40:23 -04:00
parent 84d1cdf216
commit c7443fe52b
34 changed files with 408 additions and 146 deletions

View File

@ -38,7 +38,11 @@ func (h *DocCountHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
docCount := index.DocCount()
docCount, err := index.DocCount()
if err != nil {
showError(w, req, fmt.Sprintf("error counting docs: %v", err), 500)
return
}
rv := struct {
Status string `json:"status"`
Count uint64 `json:"count"`

View File

@ -16,25 +16,45 @@ import (
// A Batch groups together multiple Index and Delete
// operations you would like performed at the same
// time.
type Batch map[string]interface{}
type Batch struct {
IndexOps map[string]interface{}
InternalOps map[string][]byte
}
// NewBatch creates a new empty batch.
func NewBatch() Batch {
return make(Batch, 0)
func NewBatch() *Batch {
return &Batch{
IndexOps: make(map[string]interface{}),
InternalOps: make(map[string][]byte),
}
}
// Index adds the specified index operation to the
// batch. NOTE: the bleve Index is not updated
// until the batch is executed.
func (b Batch) Index(id string, data interface{}) {
b[id] = data
b.IndexOps[id] = data
}
// Delete adds the specified delete operation to the
// batch. NOTE: the bleve Index is not updated until
// the batch is executed.
func (b Batch) Delete(id string) {
b[id] = nil
b.IndexOps[id] = nil
}
// SetInternal adds the specified set internal
// operation to the batch. NOTE: the bleve Index is
// not updated until the batch is executed.
func (b Batch) SetInternal(key, val []byte) {
b.InternalOps[string(key)] = val
}
// SetInternal adds the specified delete internal
// operation to the batch. NOTE: the bleve Index is
// not updated until the batch is executed.
func (b Batch) DeleteInternal(key []byte) {
b.InternalOps[string(key)] = nil
}
// An Index implements all the indexing and searching
@ -44,10 +64,10 @@ type Index interface {
Index(id string, data interface{}) error
Delete(id string) error
Batch(b Batch) error
Batch(b *Batch) error
Document(id string) (*document.Document, error)
DocCount() uint64
DocCount() (uint64, error)
Search(req *SearchRequest) (*SearchResult, error)
@ -57,7 +77,7 @@ type Index interface {
DumpDoc(id string) chan interface{}
DumpFields() chan interface{}
Close()
Close() error
Mapping() *IndexMapping

View File

@ -17,13 +17,13 @@ import (
type Index interface {
Open() error
Close()
Close() error
DocCount() uint64
DocCount() (uint64, error)
Update(doc *document.Document) error
Delete(id string) error
Batch(batch Batch) error
Batch(batch *Batch) error
SetInternal(key, val []byte) error
DeleteInternal(key []byte) error
@ -32,7 +32,7 @@ type Index interface {
DumpDoc(id string) chan interface{}
DumpFields() chan interface{}
Reader() IndexReader
Reader() (IndexReader, error)
Stats() json.Marshaler
}
@ -90,12 +90,30 @@ type DocIDReader interface {
Close()
}
type Batch map[string]*document.Document
type Batch struct {
IndexOps map[string]*document.Document
InternalOps map[string][]byte
}
func (b Batch) Index(id string, doc *document.Document) {
b[id] = doc
func NewBatch() *Batch {
return &Batch{
IndexOps: make(map[string]*document.Document),
InternalOps: make(map[string][]byte),
}
}
func (b Batch) Update(doc *document.Document) {
b.IndexOps[doc.ID] = doc
}
func (b Batch) Delete(id string) {
b[id] = nil
b.IndexOps[id] = nil
}
func (b Batch) SetInternal(key, val []byte) {
b.InternalOps[string(key)] = val
}
func (b Batch) DeleteInternal(key []byte) {
b.InternalOps[string(key)] = nil
}

View File

@ -81,9 +81,10 @@ func (i *Iterator) Valid() bool {
return i.valid
}
func (i *Iterator) Close() {
func (i *Iterator) Close() error {
// only close the transaction if we opened it
if i.ownTx {
i.tx.Rollback()
return i.tx.Rollback()
}
return nil
}

View File

@ -19,12 +19,15 @@ type Reader struct {
tx *bolt.Tx
}
func newReader(store *Store) *Reader {
tx, _ := store.db.Begin(false)
func newReader(store *Store) (*Reader, error) {
tx, err := store.db.Begin(false)
if err != nil {
return nil, err
}
return &Reader{
store: store,
tx: tx,
}
}, nil
}
func (r *Reader) Get(key []byte) ([]byte, error) {

View File

@ -97,11 +97,11 @@ func (bs *Store) iterator(key []byte) store.KVIterator {
return rv
}
func (bs *Store) Reader() store.KVReader {
func (bs *Store) Reader() (store.KVReader, error) {
return newReader(bs)
}
func (bs *Store) Writer() store.KVWriter {
func (bs *Store) Writer() (store.KVWriter, error) {
return newWriter(bs)
}

View File

@ -17,11 +17,11 @@ type Writer struct {
store *Store
}
func newWriter(store *Store) *Writer {
func newWriter(store *Store) (*Writer, error) {
store.writer.Lock()
return &Writer{
store: store,
}
}, nil
}
func (w *Writer) Set(key, val []byte) error {

View File

@ -64,6 +64,7 @@ func (i *Iterator) Valid() bool {
return i.valid
}
func (i *Iterator) Close() {
func (i *Iterator) Close() error {
i.iterator.Close()
return nil
}

View File

@ -17,10 +17,10 @@ type Reader struct {
store *Store
}
func newReader(store *Store) *Reader {
func newReader(store *Store) (*Reader, error) {
return &Reader{
store: store,
}
}, nil
}
func (r *Reader) Get(key []byte) ([]byte, error) {

View File

@ -80,11 +80,11 @@ func (i *Store) iterator(key []byte) store.KVIterator {
return rv
}
func (i *Store) Reader() store.KVReader {
func (i *Store) Reader() (store.KVReader, error) {
return newReader(i)
}
func (i *Store) Writer() store.KVWriter {
func (i *Store) Writer() (store.KVWriter, error) {
return newWriter(i)
}

View File

@ -17,11 +17,11 @@ type Writer struct {
store *Store
}
func newWriter(store *Store) *Writer {
func newWriter(store *Store) (*Writer, error) {
store.writer.Lock()
return &Writer{
store: store,
}
}, nil
}
func (w *Writer) Set(key, val []byte) error {

View File

@ -29,12 +29,12 @@ type KVIterator interface {
Value() []byte
Valid() bool
Close()
Close() error
}
type KVStore interface {
Writer() KVWriter
Reader() KVReader
Writer() (KVWriter, error)
Reader() (KVReader, error)
Close() error
}

View File

@ -69,6 +69,7 @@ func (ldi *Iterator) Valid() bool {
return ldi.iterator.Valid()
}
func (ldi *Iterator) Close() {
func (ldi *Iterator) Close() error {
ldi.iterator.Close()
return nil
}

View File

@ -21,11 +21,11 @@ type Reader struct {
snapshot *levigo.Snapshot
}
func newReader(store *Store) *Reader {
func newReader(store *Store) (*Reader, error) {
return &Reader{
store: store,
snapshot: store.db.NewSnapshot(),
}
}, nil
}
func (r *Reader) Get(key []byte) ([]byte, error) {

View File

@ -89,11 +89,11 @@ func (ldbs *Store) iterator(key []byte) store.KVIterator {
return rv
}
func (ldbs *Store) Reader() store.KVReader {
func (ldbs *Store) Reader() (store.KVReader, error) {
return newReader(ldbs)
}
func (ldbs *Store) Writer() store.KVWriter {
func (ldbs *Store) Writer() (store.KVWriter, error) {
return newWriter(ldbs)
}

View File

@ -19,11 +19,11 @@ type Writer struct {
store *Store
}
func newWriter(store *Store) *Writer {
func newWriter(store *Store) (*Writer, error) {
store.writer.Lock()
return &Writer{
store: store,
}
}, nil
}
func (w *Writer) Set(key, val []byte) error {

View File

@ -29,11 +29,11 @@ func (i *Store) iterator(key []byte) store.KVIterator {
return rv
}
func (i *Store) Reader() store.KVReader {
func (i *Store) Reader() (store.KVReader, error) {
return newReader(i)
}
func (i *Store) Writer() store.KVWriter {
func (i *Store) Writer() (store.KVWriter, error) {
return newWriter(i)
}
@ -45,10 +45,10 @@ type Reader struct {
store *Store
}
func newReader(store *Store) *Reader {
func newReader(store *Store) (*Reader, error) {
return &Reader{
store: store,
}
}, nil
}
func (r *Reader) Get(key []byte) ([]byte, error) {
@ -96,7 +96,8 @@ func (i *Iterator) Valid() bool {
return false
}
func (i *Iterator) Close() {
func (i *Iterator) Close() error {
return nil
}
type Batch struct {
@ -147,10 +148,10 @@ type Writer struct {
store *Store
}
func newWriter(store *Store) *Writer {
func newWriter(store *Store) (*Writer, error) {
return &Writer{
store: store,
}
}, nil
}
func (w *Writer) Set(key, val []byte) error {

View File

@ -18,8 +18,11 @@ import (
func CommonTestKVStore(t *testing.T, s store.KVStore) {
writer := s.Writer()
err := writer.Set([]byte("a"), []byte("val-a"))
writer, err := s.Writer()
if err != nil {
t.Error(err)
}
err = writer.Set([]byte("a"), []byte("val-a"))
if err != nil {
t.Fatal(err)
}
@ -49,7 +52,10 @@ func CommonTestKVStore(t *testing.T, s store.KVStore) {
}
writer.Close()
reader := s.Reader()
reader, err := s.Reader()
if err != nil {
t.Error(err)
}
defer reader.Close()
it := reader.Iterator([]byte("b"))
key, val, valid := it.Current()
@ -92,15 +98,21 @@ func CommonTestKVStore(t *testing.T, s store.KVStore) {
func CommonTestReaderIsolation(t *testing.T, s store.KVStore) {
// insert a kv pair
writer := s.Writer()
err := writer.Set([]byte("a"), []byte("val-a"))
writer, err := s.Writer()
if err != nil {
t.Error(err)
}
err = writer.Set([]byte("a"), []byte("val-a"))
if err != nil {
t.Fatal(err)
}
writer.Close()
// create an isoalted reader
reader := s.Reader()
reader, err := s.Reader()
if err != nil {
t.Error(err)
}
defer reader.Close()
// verify we see the value already inserted
@ -125,7 +137,10 @@ func CommonTestReaderIsolation(t *testing.T, s store.KVStore) {
}
// add something after the reader was created
writer = s.Writer()
writer, err = s.Writer()
if err != nil {
t.Error(err)
}
err = writer.Set([]byte("b"), []byte("val-b"))
if err != nil {
t.Fatal(err)
@ -133,7 +148,10 @@ func CommonTestReaderIsolation(t *testing.T, s store.KVStore) {
writer.Close()
// ensure that a newer reader sees it
newReader := s.Reader()
newReader, err := s.Reader()
if err != nil {
t.Error(err)
}
defer newReader.Close()
val, err = newReader.Get([]byte("b"))
if err != nil {

View File

@ -69,24 +69,24 @@ func CommonBenchmarkIndexBatch(b *testing.B, s store.KVStore, analysisWorkers, b
b.ResetTimer()
for i := 0; i < b.N; i++ {
var batch index.Batch
batch := index.NewBatch()
for j := 0; j < 1000; j++ {
if j%batchSize == 0 {
if len(batch) > 0 {
if len(batch.IndexOps) > 0 {
err := idx.Batch(batch)
if err != nil {
b.Fatal(err)
}
}
batch = make(index.Batch)
batch = index.NewBatch()
}
indexDocument := document.NewDocument("").
AddField(document.NewTextFieldWithAnalyzer("body", []uint64{}, []byte(benchmarkDocBodies[j%10]), analyzer))
indexDocument.ID = strconv.Itoa(i) + "-" + strconv.Itoa(j)
batch[indexDocument.ID] = indexDocument
batch.Update(indexDocument)
}
// close last batch
if len(batch) > 0 {
if len(batch.IndexOps) > 0 {
err := idx.Batch(batch)
if err != nil {
b.Fatal(err)

View File

@ -53,7 +53,11 @@ func (udc *UpsideDownCouch) DumpAll() chan interface{} {
defer close(rv)
// start an isolated reader for use during the dump
kvreader := udc.store.Reader()
kvreader, err := udc.store.Reader()
if err != nil {
rv <- err
return
}
defer kvreader.Close()
udc.dumpPrefix(kvreader, rv, nil)
@ -67,7 +71,11 @@ func (udc *UpsideDownCouch) DumpFields() chan interface{} {
defer close(rv)
// start an isolated reader for use during the dump
kvreader := udc.store.Reader()
kvreader, err := udc.store.Reader()
if err != nil {
rv <- err
return
}
defer kvreader.Close()
udc.dumpPrefix(kvreader, rv, []byte{'f'})
@ -89,7 +97,11 @@ func (udc *UpsideDownCouch) DumpDoc(id string) chan interface{} {
defer close(rv)
// start an isolated reader for use during the dump
kvreader := udc.store.Reader()
kvreader, err := udc.store.Reader()
if err != nil {
rv <- err
return
}
defer kvreader.Close()
back, err := udc.backIndexRowForDoc(kvreader, id)

View File

@ -34,7 +34,10 @@ func TestDump(t *testing.T) {
defer idx.Close()
var expectedCount uint64
docCount := idx.DocCount()
docCount, err := idx.DocCount()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}

View File

@ -49,7 +49,10 @@ func TestIndexFieldReader(t *testing.T) {
}
expectedCount++
indexReader := idx.Reader()
indexReader, err := idx.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
reader, err := indexReader.FieldReader("name", nil, nil)
if err != nil {

View File

@ -49,7 +49,10 @@ func TestIndexReader(t *testing.T) {
}
expectedCount++
indexReader := idx.Reader()
indexReader, err := idx.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
// first look for a term that doesnt exist
@ -195,7 +198,10 @@ func TestIndexDocIdReader(t *testing.T) {
}
expectedCount++
indexReader := idx.Reader()
indexReader, err := idx.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
// first get all doc ids

View File

@ -125,13 +125,16 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow
return
}
func (udc *UpsideDownCouch) DocCount() uint64 {
return udc.docCount
func (udc *UpsideDownCouch) DocCount() (uint64, error) {
return udc.docCount, nil
}
func (udc *UpsideDownCouch) Open() error {
// start a writer for the open process
kvwriter := udc.store.Writer()
kvwriter, err := udc.store.Writer()
if err != nil {
return err
}
defer kvwriter.Close()
value, err := kvwriter.Get(VersionKey)
@ -176,7 +179,10 @@ func (udc *UpsideDownCouch) countDocs(kvreader store.KVReader) uint64 {
func (udc *UpsideDownCouch) rowCount() uint64 {
// start an isolated reader for use during the rowcount
kvreader := udc.store.Reader()
kvreader, err := udc.store.Reader()
if err != nil {
return 0
}
defer kvreader.Close()
it := kvreader.Iterator([]byte{0})
defer it.Close()
@ -192,8 +198,8 @@ func (udc *UpsideDownCouch) rowCount() uint64 {
return rv
}
func (udc *UpsideDownCouch) Close() {
udc.store.Close()
func (udc *UpsideDownCouch) Close() error {
return udc.store.Close()
}
func (udc *UpsideDownCouch) Update(doc *document.Document) error {
@ -217,7 +223,10 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error {
// start a writer for this update
indexStart := time.Now()
kvwriter := udc.store.Writer()
kvwriter, err := udc.store.Writer()
if err != nil {
return err
}
defer kvwriter.Close()
// first we lookup the backindex row for the doc id if it exists
@ -360,7 +369,10 @@ func (udc *UpsideDownCouch) indexField(docID string, field document.Field, field
func (udc *UpsideDownCouch) Delete(id string) error {
indexStart := time.Now()
// start a writer for this delete
kvwriter := udc.store.Writer()
kvwriter, err := udc.store.Writer()
if err != nil {
return err
}
defer kvwriter.Close()
// lookup the back index row
@ -426,11 +438,11 @@ func (udc *UpsideDownCouch) backIndexRowForDoc(kvreader store.KVReader, docID st
return backIndexRow, nil
}
func (udc *UpsideDownCouch) backIndexRowsForBatch(kvreader store.KVReader, batch index.Batch) (map[string]*BackIndexRow, error) {
func (udc *UpsideDownCouch) backIndexRowsForBatch(kvreader store.KVReader, batch *index.Batch) (map[string]*BackIndexRow, error) {
// FIXME faster to order the ids and scan sequentially
// for now just get it working
rv := make(map[string]*BackIndexRow, 0)
for docID := range batch {
for docID := range batch.IndexOps {
backIndexRow, err := udc.backIndexRowForDoc(kvreader, docID)
if err != nil {
return nil, err
@ -498,19 +510,19 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) []
return rv
}
func (udc *UpsideDownCouch) Batch(batch index.Batch) error {
func (udc *UpsideDownCouch) Batch(batch *index.Batch) error {
analysisStart := time.Now()
resultChan := make(chan *AnalysisResult)
var numUpdates uint64
for _, doc := range batch {
for _, doc := range batch.IndexOps {
if doc != nil {
numUpdates++
}
}
go func() {
for _, doc := range batch {
for _, doc := range batch.IndexOps {
if doc != nil {
aw := AnalysisWork{
udc: udc,
@ -537,7 +549,10 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error {
indexStart := time.Now()
// start a writer for this batch
kvwriter := udc.store.Writer()
kvwriter, err := udc.store.Writer()
if err != nil {
return err
}
defer kvwriter.Close()
// first lookup all the back index rows
@ -553,7 +568,7 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error {
docsAdded := uint64(0)
docsDeleted := uint64(0)
for docID, doc := range batch {
for docID, doc := range batch.IndexOps {
backIndexRow := backIndexRows[docID]
if doc == nil && backIndexRow != nil {
//delete
@ -567,6 +582,18 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error {
}
}
// add the internal ops
for internalKey, internalValue := range batch.InternalOps {
if internalValue == nil {
// delete
deleteInternalRow := NewInternalRow([]byte(internalKey), nil)
deleteRows = append(deleteRows, deleteInternalRow)
} else {
updateInternalRow := NewInternalRow([]byte(internalKey), internalValue)
updateRows = append(updateRows, updateInternalRow)
}
}
err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows)
atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
if err == nil {
@ -583,24 +610,34 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error {
func (udc *UpsideDownCouch) SetInternal(key, val []byte) error {
internalRow := NewInternalRow(key, val)
writer := udc.store.Writer()
writer, err := udc.store.Writer()
if err != nil {
return err
}
defer writer.Close()
return writer.Set(internalRow.Key(), internalRow.Value())
}
func (udc *UpsideDownCouch) DeleteInternal(key []byte) error {
internalRow := NewInternalRow(key, nil)
writer := udc.store.Writer()
writer, err := udc.store.Writer()
if err != nil {
return err
}
defer writer.Close()
return writer.Delete(internalRow.Key())
}
func (udc *UpsideDownCouch) Reader() index.IndexReader {
func (udc *UpsideDownCouch) Reader() (index.IndexReader, error) {
kvr, err := udc.store.Reader()
if err != nil {
return nil, err
}
return &IndexReader{
index: udc,
kvreader: udc.store.Reader(),
kvreader: kvr,
docCount: udc.docCount,
}
}, nil
}
func (udc *UpsideDownCouch) Stats() json.Marshaler {

View File

@ -39,7 +39,10 @@ func TestIndexOpenReopen(t *testing.T) {
}
var expectedCount uint64
docCount := idx.DocCount()
docCount, err := idx.DocCount()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
@ -81,7 +84,10 @@ func TestIndexInsert(t *testing.T) {
defer idx.Close()
var expectedCount uint64
docCount := idx.DocCount()
docCount, err := idx.DocCount()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
@ -94,7 +100,10 @@ func TestIndexInsert(t *testing.T) {
}
expectedCount++
docCount = idx.DocCount()
docCount, err = idx.DocCount()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
@ -120,7 +129,10 @@ func TestIndexInsertThenDelete(t *testing.T) {
defer idx.Close()
var expectedCount uint64
docCount := idx.DocCount()
docCount, err := idx.DocCount()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
@ -141,7 +153,10 @@ func TestIndexInsertThenDelete(t *testing.T) {
}
expectedCount++
docCount = idx.DocCount()
docCount, err = idx.DocCount()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
@ -152,7 +167,10 @@ func TestIndexInsertThenDelete(t *testing.T) {
}
expectedCount--
docCount = idx.DocCount()
docCount, err = idx.DocCount()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
@ -163,7 +181,10 @@ func TestIndexInsertThenDelete(t *testing.T) {
}
expectedCount--
docCount = idx.DocCount()
docCount, err = idx.DocCount()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
@ -280,7 +301,10 @@ func TestIndexInsertMultiple(t *testing.T) {
}
expectedCount++
docCount := idx.DocCount()
docCount, err := idx.DocCount()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("expected doc count: %d, got %d", expectedCount, docCount)
}
@ -302,7 +326,10 @@ func TestIndexInsertWithStore(t *testing.T) {
defer idx.Close()
var expectedCount uint64
docCount := idx.DocCount()
docCount, err := idx.DocCount()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
@ -315,7 +342,10 @@ func TestIndexInsertWithStore(t *testing.T) {
}
expectedCount++
docCount = idx.DocCount()
docCount, err = idx.DocCount()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
@ -327,7 +357,10 @@ func TestIndexInsertWithStore(t *testing.T) {
t.Errorf("expected %d rows, got: %d", expectedLength, rowCount)
}
indexReader := idx.Reader()
indexReader, err := idx.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
storedDoc, err := indexReader.Document("1")
@ -359,7 +392,10 @@ func TestIndexInternalCRUD(t *testing.T) {
}
defer idx.Close()
indexReader := idx.Reader()
indexReader, err := idx.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
// get something that doesnt exist yet
@ -377,7 +413,10 @@ func TestIndexInternalCRUD(t *testing.T) {
t.Error(err)
}
indexReader = idx.Reader()
indexReader, err = idx.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
// get
@ -395,7 +434,10 @@ func TestIndexInternalCRUD(t *testing.T) {
t.Error(err)
}
indexReader = idx.Reader()
indexReader, err = idx.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
// get again
@ -445,21 +487,24 @@ func TestIndexBatch(t *testing.T) {
// delete existing doc
// net document count change 0
batch := make(index.Batch, 0)
batch := index.NewBatch()
doc = document.NewDocument("3")
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test3")))
batch["3"] = doc
batch.Update(doc)
doc = document.NewDocument("2")
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test2updated")))
batch["2"] = doc
batch["1"] = nil
batch.Update(doc)
batch.Delete("1")
err = idx.Batch(batch)
if err != nil {
t.Error(err)
}
indexReader := idx.Reader()
indexReader, err := idx.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
docCount := indexReader.DocCount()
@ -502,7 +547,10 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) {
defer idx.Close()
var expectedCount uint64
docCount := idx.DocCount()
docCount, err := idx.DocCount()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
@ -521,7 +569,10 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) {
}
expectedCount++
docCount = idx.DocCount()
docCount, err = idx.DocCount()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
@ -543,7 +594,10 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) {
t.Errorf("expected %d rows, got: %d", expectedLength, rowCount)
}
indexReader := idx.Reader()
indexReader, err := idx.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
storedDoc, err := indexReader.Document("1")
@ -595,7 +649,10 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) {
t.Errorf("Error updating index: %v", err)
}
indexReader = idx.Reader()
indexReader, err = idx.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
// expected doc count shouldn't have changed
@ -638,7 +695,10 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) {
expectedCount--
// expected doc count shouldn't have changed
docCount = idx.DocCount()
docCount, err = idx.DocCount()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
@ -672,7 +732,10 @@ func TestIndexInsertFields(t *testing.T) {
t.Errorf("Error updating index: %v", err)
}
indexReader := idx.Reader()
indexReader, err := idx.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
fields, err := indexReader.Fields()
@ -734,7 +797,10 @@ func TestIndexUpdateComposites(t *testing.T) {
t.Errorf("Error updating index: %v", err)
}
indexReader := idx.Reader()
indexReader, err := idx.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
// make sure new values are in index
@ -822,7 +888,10 @@ func TestIndexTermReaderCompositeFields(t *testing.T) {
t.Errorf("Error updating index: %v", err)
}
indexReader := idx.Reader()
indexReader, err := idx.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
termFieldReader, err := indexReader.TermFieldReader([]byte("mister"), "_all")
@ -865,7 +934,10 @@ func TestIndexDocumentFieldTerms(t *testing.T) {
t.Errorf("Error updating index: %v", err)
}
indexReader := idx.Reader()
indexReader, err := idx.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
fieldTerms, err := indexReader.DocumentFieldTerms("1")

View File

@ -73,7 +73,7 @@ func (i *indexAliasImpl) Delete(id string) error {
return i.indexes[0].Delete(id)
}
func (i *indexAliasImpl) Batch(b Batch) error {
func (i *indexAliasImpl) Batch(b *Batch) error {
i.mutex.RLock()
defer i.mutex.RUnlock()
@ -105,24 +105,35 @@ func (i *indexAliasImpl) Document(id string) (*document.Document, error) {
return i.indexes[0].Document(id)
}
func (i *indexAliasImpl) DocCount() uint64 {
func (i *indexAliasImpl) DocCount() (uint64, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
rv := uint64(0)
if !i.open {
return 0
return 0, ErrorIndexClosed
}
for _, index := range i.indexes {
rv += index.DocCount()
otherCount, err := index.DocCount()
if err != nil {
return 0, err
}
rv += otherCount
}
return rv
return rv, nil
}
func (i *indexAliasImpl) Search(req *SearchRequest) (*SearchResult, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return nil, ErrorIndexClosed
}
if len(i.indexes) < 1 {
return nil, ErrorAliasEmpty
}
@ -199,11 +210,12 @@ func (i *indexAliasImpl) DumpFields() chan interface{} {
return i.indexes[0].DumpFields()
}
func (i *indexAliasImpl) Close() {
func (i *indexAliasImpl) Close() error {
i.mutex.Lock()
defer i.mutex.Unlock()
i.open = false
return nil
}
func (i *indexAliasImpl) Mapping() *IndexMapping {

View File

@ -191,7 +191,10 @@ func openIndex(path string) (*indexImpl, error) {
rv.stats.indexStat = rv.i.Stats()
// now load the mapping
indexReader := rv.i.Reader()
indexReader, err := rv.i.Reader()
if err != nil {
return nil, err
}
defer indexReader.Close()
mappingBytes, err := indexReader.GetInternal(mappingInternalKey)
if err != nil {
@ -271,7 +274,7 @@ func (i *indexImpl) Delete(id string) error {
// operations at the same time. There are often
// significant performance benefits when performing
// operations in a batch.
func (i *indexImpl) Batch(b Batch) error {
func (i *indexImpl) Batch(b *Batch) error {
i.mutex.RLock()
defer i.mutex.RUnlock()
@ -279,8 +282,8 @@ func (i *indexImpl) Batch(b Batch) error {
return ErrorIndexClosed
}
ib := make(index.Batch, len(b))
for bk, bd := range b {
ib := index.NewBatch()
for bk, bd := range b.IndexOps {
if bd == nil {
ib.Delete(bk)
} else {
@ -289,7 +292,14 @@ func (i *indexImpl) Batch(b Batch) error {
if err != nil {
return err
}
ib.Index(bk, doc)
ib.Update(doc)
}
}
for ik, iv := range b.InternalOps {
if iv == nil {
ib.DeleteInternal([]byte(ik))
} else {
ib.SetInternal([]byte(ik), iv)
}
}
return i.i.Batch(ib)
@ -306,19 +316,22 @@ func (i *indexImpl) Document(id string) (*document.Document, error) {
if !i.open {
return nil, ErrorIndexClosed
}
indexReader := i.i.Reader()
indexReader, err := i.i.Reader()
if err != nil {
return nil, err
}
defer indexReader.Close()
return indexReader.Document(id)
}
// DocCount returns the number of documents in the
// index.
func (i *indexImpl) DocCount() uint64 {
func (i *indexImpl) DocCount() (uint64, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
if !i.open {
return 0
return 0, ErrorIndexClosed
}
return i.i.DocCount()
@ -339,7 +352,10 @@ func (i *indexImpl) Search(req *SearchRequest) (*SearchResult, error) {
collector := collectors.NewTopScorerSkipCollector(req.Size, req.From)
// open a reader for this search
indexReader := i.i.Reader()
indexReader, err := i.i.Reader()
if err != nil {
return nil, err
}
defer indexReader.Close()
searcher, err := req.Query.Searcher(indexReader, i.m, req.Explain)
@ -475,7 +491,10 @@ func (i *indexImpl) Fields() ([]string, error) {
return nil, ErrorIndexClosed
}
indexReader := i.i.Reader()
indexReader, err := i.i.Reader()
if err != nil {
return nil, err
}
defer indexReader.Close()
return indexReader.Fields()
}
@ -522,12 +541,12 @@ func (i *indexImpl) DumpDoc(id string) chan interface{} {
return i.i.DumpDoc(id)
}
func (i *indexImpl) Close() {
func (i *indexImpl) Close() error {
i.mutex.Lock()
defer i.mutex.Unlock()
i.open = false
i.i.Close()
return i.i.Close()
}
func (i *indexImpl) Stats() *IndexStat {
@ -538,7 +557,10 @@ func (i *indexImpl) GetInternal(key []byte) ([]byte, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
reader := i.i.Reader()
reader, err := i.i.Reader()
if err != nil {
return nil, err
}
defer reader.Close()
return reader.GetInternal(key)

View File

@ -17,7 +17,10 @@ import (
func TestBooleanSearch(t *testing.T) {
twoDocIndexReader := twoDocIndex.Reader()
twoDocIndexReader, err := twoDocIndex.Reader()
if err != nil {
t.Error(err)
}
defer twoDocIndexReader.Close()
// test 0

View File

@ -17,7 +17,10 @@ import (
func TestConjunctionSearch(t *testing.T) {
twoDocIndexReader := twoDocIndex.Reader()
twoDocIndexReader, err := twoDocIndex.Reader()
if err != nil {
t.Error(err)
}
defer twoDocIndexReader.Close()
// test 0

View File

@ -17,7 +17,10 @@ import (
func TestDisjunctionSearch(t *testing.T) {
twoDocIndexReader := twoDocIndex.Reader()
twoDocIndexReader, err := twoDocIndex.Reader()
if err != nil {
t.Error(err)
}
defer twoDocIndexReader.Close()
martyTermSearcher, err := NewTermSearcher(twoDocIndexReader, "marty", "name", 1.0, true)
@ -122,7 +125,10 @@ func TestDisjunctionSearch(t *testing.T) {
func TestDisjunctionAdvance(t *testing.T) {
twoDocIndexReader := twoDocIndex.Reader()
twoDocIndexReader, err := twoDocIndex.Reader()
if err != nil {
t.Error(err)
}
defer twoDocIndexReader.Close()
martyTermSearcher, err := NewTermSearcher(twoDocIndexReader, "marty", "name", 1.0, true)

View File

@ -17,7 +17,10 @@ import (
func TestMatchAllSearch(t *testing.T) {
twoDocIndexReader := twoDocIndex.Reader()
twoDocIndexReader, err := twoDocIndex.Reader()
if err != nil {
t.Error(err)
}
defer twoDocIndexReader.Close()
allSearcher, err := NewMatchAllSearcher(twoDocIndexReader, 1.0, true)

View File

@ -17,7 +17,10 @@ import (
func TestMatchNoneSearch(t *testing.T) {
twoDocIndexReader := twoDocIndex.Reader()
twoDocIndexReader, err := twoDocIndex.Reader()
if err != nil {
t.Error(err)
}
defer twoDocIndexReader.Close()
noneSearcher, err := NewMatchNoneSearcher(twoDocIndexReader)

View File

@ -17,7 +17,10 @@ import (
func TestPhraseSearch(t *testing.T) {
twoDocIndexReader := twoDocIndex.Reader()
twoDocIndexReader, err := twoDocIndex.Reader()
if err != nil {
t.Error(err)
}
defer twoDocIndexReader.Close()
angstTermSearcher, err := NewTermSearcher(twoDocIndexReader, "angst", "desc", 1.0, true)

View File

@ -89,7 +89,10 @@ func TestTermSearcher(t *testing.T) {
},
})
indexReader := i.Reader()
indexReader, err := i.Reader()
if err != nil {
t.Error(err)
}
defer indexReader.Close()
searcher, err := NewTermSearcher(indexReader, queryTerm, queryField, queryBoost, queryExplain)
@ -99,7 +102,11 @@ func TestTermSearcher(t *testing.T) {
defer searcher.Close()
searcher.SetQueryNorm(2.0)
idf := 1.0 + math.Log(float64(i.DocCount())/float64(searcher.Count()+1.0))
docCount, err := i.DocCount()
if err != nil {
t.Fatal(err)
}
idf := 1.0 + math.Log(float64(docCount)/float64(searcher.Count()+1.0))
expectedQueryWeight := 3 * idf * 3 * idf
if expectedQueryWeight != searcher.Weight() {
t.Errorf("expected weight %v got %v", expectedQueryWeight, searcher.Weight())