From c7443fe52bb07ed46f9c6dc53bc3773d34e86f84 Mon Sep 17 00:00:00 2001 From: Marty Schoch Date: Fri, 31 Oct 2014 09:40:23 -0400 Subject: [PATCH] refactored API a bit more things can return error now in a couple of places we had to swallow errors because they didn't fit the existing API. in these case and proactively in a few others we now return error as well. also the batch API has been updated to allow performing set/delete internal within the batch --- http/doc_count.go | 6 +- index.go | 36 ++++-- index/index.go | 34 ++++-- index/store/boltdb/iterator.go | 5 +- index/store/boltdb/reader.go | 9 +- index/store/boltdb/store.go | 4 +- index/store/boltdb/writer.go | 4 +- index/store/inmem/iterator.go | 3 +- index/store/inmem/reader.go | 4 +- index/store/inmem/store.go | 4 +- index/store/inmem/writer.go | 4 +- index/store/kvstore.go | 6 +- index/store/leveldb/iterator.go | 3 +- index/store/leveldb/reader.go | 4 +- index/store/leveldb/store.go | 4 +- index/store/leveldb/writer.go | 4 +- index/store/null/null.go | 15 +-- index/store/test/common.go | 34 ++++-- index/upside_down/benchmark_common_test.go | 10 +- index/upside_down/dump.go | 18 ++- index/upside_down/dump_test.go | 5 +- index/upside_down/field_reader_test.go | 5 +- index/upside_down/reader_test.go | 10 +- index/upside_down/upside_down.go | 77 +++++++++--- index/upside_down/upside_down_test.go | 128 +++++++++++++++----- index_alias_impl.go | 24 +++- index_impl.go | 48 ++++++-- search/searchers/search_boolean_test.go | 5 +- search/searchers/search_conjunction_test.go | 5 +- search/searchers/search_disjunction_test.go | 10 +- search/searchers/search_match_all_test.go | 5 +- search/searchers/search_match_none_test.go | 5 +- search/searchers/search_phrase_test.go | 5 +- search/searchers/search_term_test.go | 11 +- 34 files changed, 408 insertions(+), 146 deletions(-) diff --git a/http/doc_count.go b/http/doc_count.go index 761e6153..568782e9 100644 --- a/http/doc_count.go +++ b/http/doc_count.go @@ -38,7 +38,11 @@ func (h *DocCountHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - docCount := index.DocCount() + docCount, err := index.DocCount() + if err != nil { + showError(w, req, fmt.Sprintf("error counting docs: %v", err), 500) + return + } rv := struct { Status string `json:"status"` Count uint64 `json:"count"` diff --git a/index.go b/index.go index 04e6f537..9102c753 100644 --- a/index.go +++ b/index.go @@ -16,25 +16,45 @@ import ( // A Batch groups together multiple Index and Delete // operations you would like performed at the same // time. -type Batch map[string]interface{} +type Batch struct { + IndexOps map[string]interface{} + InternalOps map[string][]byte +} // NewBatch creates a new empty batch. -func NewBatch() Batch { - return make(Batch, 0) +func NewBatch() *Batch { + return &Batch{ + IndexOps: make(map[string]interface{}), + InternalOps: make(map[string][]byte), + } } // Index adds the specified index operation to the // batch. NOTE: the bleve Index is not updated // until the batch is executed. func (b Batch) Index(id string, data interface{}) { - b[id] = data + b.IndexOps[id] = data } // Delete adds the specified delete operation to the // batch. NOTE: the bleve Index is not updated until // the batch is executed. func (b Batch) Delete(id string) { - b[id] = nil + b.IndexOps[id] = nil +} + +// SetInternal adds the specified set internal +// operation to the batch. NOTE: the bleve Index is +// not updated until the batch is executed. +func (b Batch) SetInternal(key, val []byte) { + b.InternalOps[string(key)] = val +} + +// SetInternal adds the specified delete internal +// operation to the batch. NOTE: the bleve Index is +// not updated until the batch is executed. +func (b Batch) DeleteInternal(key []byte) { + b.InternalOps[string(key)] = nil } // An Index implements all the indexing and searching @@ -44,10 +64,10 @@ type Index interface { Index(id string, data interface{}) error Delete(id string) error - Batch(b Batch) error + Batch(b *Batch) error Document(id string) (*document.Document, error) - DocCount() uint64 + DocCount() (uint64, error) Search(req *SearchRequest) (*SearchResult, error) @@ -57,7 +77,7 @@ type Index interface { DumpDoc(id string) chan interface{} DumpFields() chan interface{} - Close() + Close() error Mapping() *IndexMapping diff --git a/index/index.go b/index/index.go index f5a6fed7..b4dbdc84 100644 --- a/index/index.go +++ b/index/index.go @@ -17,13 +17,13 @@ import ( type Index interface { Open() error - Close() + Close() error - DocCount() uint64 + DocCount() (uint64, error) Update(doc *document.Document) error Delete(id string) error - Batch(batch Batch) error + Batch(batch *Batch) error SetInternal(key, val []byte) error DeleteInternal(key []byte) error @@ -32,7 +32,7 @@ type Index interface { DumpDoc(id string) chan interface{} DumpFields() chan interface{} - Reader() IndexReader + Reader() (IndexReader, error) Stats() json.Marshaler } @@ -90,12 +90,30 @@ type DocIDReader interface { Close() } -type Batch map[string]*document.Document +type Batch struct { + IndexOps map[string]*document.Document + InternalOps map[string][]byte +} -func (b Batch) Index(id string, doc *document.Document) { - b[id] = doc +func NewBatch() *Batch { + return &Batch{ + IndexOps: make(map[string]*document.Document), + InternalOps: make(map[string][]byte), + } +} + +func (b Batch) Update(doc *document.Document) { + b.IndexOps[doc.ID] = doc } func (b Batch) Delete(id string) { - b[id] = nil + b.IndexOps[id] = nil +} + +func (b Batch) SetInternal(key, val []byte) { + b.InternalOps[string(key)] = val +} + +func (b Batch) DeleteInternal(key []byte) { + b.InternalOps[string(key)] = nil } diff --git a/index/store/boltdb/iterator.go b/index/store/boltdb/iterator.go index 40ff6f1a..c5d870a2 100644 --- a/index/store/boltdb/iterator.go +++ b/index/store/boltdb/iterator.go @@ -81,9 +81,10 @@ func (i *Iterator) Valid() bool { return i.valid } -func (i *Iterator) Close() { +func (i *Iterator) Close() error { // only close the transaction if we opened it if i.ownTx { - i.tx.Rollback() + return i.tx.Rollback() } + return nil } diff --git a/index/store/boltdb/reader.go b/index/store/boltdb/reader.go index 7295a2fa..d15031d3 100644 --- a/index/store/boltdb/reader.go +++ b/index/store/boltdb/reader.go @@ -19,12 +19,15 @@ type Reader struct { tx *bolt.Tx } -func newReader(store *Store) *Reader { - tx, _ := store.db.Begin(false) +func newReader(store *Store) (*Reader, error) { + tx, err := store.db.Begin(false) + if err != nil { + return nil, err + } return &Reader{ store: store, tx: tx, - } + }, nil } func (r *Reader) Get(key []byte) ([]byte, error) { diff --git a/index/store/boltdb/store.go b/index/store/boltdb/store.go index 4e978316..89f65054 100644 --- a/index/store/boltdb/store.go +++ b/index/store/boltdb/store.go @@ -97,11 +97,11 @@ func (bs *Store) iterator(key []byte) store.KVIterator { return rv } -func (bs *Store) Reader() store.KVReader { +func (bs *Store) Reader() (store.KVReader, error) { return newReader(bs) } -func (bs *Store) Writer() store.KVWriter { +func (bs *Store) Writer() (store.KVWriter, error) { return newWriter(bs) } diff --git a/index/store/boltdb/writer.go b/index/store/boltdb/writer.go index 208f5fb6..7ab6a40e 100644 --- a/index/store/boltdb/writer.go +++ b/index/store/boltdb/writer.go @@ -17,11 +17,11 @@ type Writer struct { store *Store } -func newWriter(store *Store) *Writer { +func newWriter(store *Store) (*Writer, error) { store.writer.Lock() return &Writer{ store: store, - } + }, nil } func (w *Writer) Set(key, val []byte) error { diff --git a/index/store/inmem/iterator.go b/index/store/inmem/iterator.go index 691b062b..042bd890 100644 --- a/index/store/inmem/iterator.go +++ b/index/store/inmem/iterator.go @@ -64,6 +64,7 @@ func (i *Iterator) Valid() bool { return i.valid } -func (i *Iterator) Close() { +func (i *Iterator) Close() error { i.iterator.Close() + return nil } diff --git a/index/store/inmem/reader.go b/index/store/inmem/reader.go index 495867ce..5f8ab7b6 100644 --- a/index/store/inmem/reader.go +++ b/index/store/inmem/reader.go @@ -17,10 +17,10 @@ type Reader struct { store *Store } -func newReader(store *Store) *Reader { +func newReader(store *Store) (*Reader, error) { return &Reader{ store: store, - } + }, nil } func (r *Reader) Get(key []byte) ([]byte, error) { diff --git a/index/store/inmem/store.go b/index/store/inmem/store.go index 1ff573b8..453838e4 100644 --- a/index/store/inmem/store.go +++ b/index/store/inmem/store.go @@ -80,11 +80,11 @@ func (i *Store) iterator(key []byte) store.KVIterator { return rv } -func (i *Store) Reader() store.KVReader { +func (i *Store) Reader() (store.KVReader, error) { return newReader(i) } -func (i *Store) Writer() store.KVWriter { +func (i *Store) Writer() (store.KVWriter, error) { return newWriter(i) } diff --git a/index/store/inmem/writer.go b/index/store/inmem/writer.go index 76ca2673..dc1dd2ff 100644 --- a/index/store/inmem/writer.go +++ b/index/store/inmem/writer.go @@ -17,11 +17,11 @@ type Writer struct { store *Store } -func newWriter(store *Store) *Writer { +func newWriter(store *Store) (*Writer, error) { store.writer.Lock() return &Writer{ store: store, - } + }, nil } func (w *Writer) Set(key, val []byte) error { diff --git a/index/store/kvstore.go b/index/store/kvstore.go index d385df8f..078e2a93 100644 --- a/index/store/kvstore.go +++ b/index/store/kvstore.go @@ -29,12 +29,12 @@ type KVIterator interface { Value() []byte Valid() bool - Close() + Close() error } type KVStore interface { - Writer() KVWriter - Reader() KVReader + Writer() (KVWriter, error) + Reader() (KVReader, error) Close() error } diff --git a/index/store/leveldb/iterator.go b/index/store/leveldb/iterator.go index 6feda6cd..dd44e636 100644 --- a/index/store/leveldb/iterator.go +++ b/index/store/leveldb/iterator.go @@ -69,6 +69,7 @@ func (ldi *Iterator) Valid() bool { return ldi.iterator.Valid() } -func (ldi *Iterator) Close() { +func (ldi *Iterator) Close() error { ldi.iterator.Close() + return nil } diff --git a/index/store/leveldb/reader.go b/index/store/leveldb/reader.go index c3016c75..25c53991 100644 --- a/index/store/leveldb/reader.go +++ b/index/store/leveldb/reader.go @@ -21,11 +21,11 @@ type Reader struct { snapshot *levigo.Snapshot } -func newReader(store *Store) *Reader { +func newReader(store *Store) (*Reader, error) { return &Reader{ store: store, snapshot: store.db.NewSnapshot(), - } + }, nil } func (r *Reader) Get(key []byte) ([]byte, error) { diff --git a/index/store/leveldb/store.go b/index/store/leveldb/store.go index dfea3f7c..23d8661a 100644 --- a/index/store/leveldb/store.go +++ b/index/store/leveldb/store.go @@ -89,11 +89,11 @@ func (ldbs *Store) iterator(key []byte) store.KVIterator { return rv } -func (ldbs *Store) Reader() store.KVReader { +func (ldbs *Store) Reader() (store.KVReader, error) { return newReader(ldbs) } -func (ldbs *Store) Writer() store.KVWriter { +func (ldbs *Store) Writer() (store.KVWriter, error) { return newWriter(ldbs) } diff --git a/index/store/leveldb/writer.go b/index/store/leveldb/writer.go index dfcfed7e..aba035d3 100644 --- a/index/store/leveldb/writer.go +++ b/index/store/leveldb/writer.go @@ -19,11 +19,11 @@ type Writer struct { store *Store } -func newWriter(store *Store) *Writer { +func newWriter(store *Store) (*Writer, error) { store.writer.Lock() return &Writer{ store: store, - } + }, nil } func (w *Writer) Set(key, val []byte) error { diff --git a/index/store/null/null.go b/index/store/null/null.go index 025ed9f3..3138bcdd 100644 --- a/index/store/null/null.go +++ b/index/store/null/null.go @@ -29,11 +29,11 @@ func (i *Store) iterator(key []byte) store.KVIterator { return rv } -func (i *Store) Reader() store.KVReader { +func (i *Store) Reader() (store.KVReader, error) { return newReader(i) } -func (i *Store) Writer() store.KVWriter { +func (i *Store) Writer() (store.KVWriter, error) { return newWriter(i) } @@ -45,10 +45,10 @@ type Reader struct { store *Store } -func newReader(store *Store) *Reader { +func newReader(store *Store) (*Reader, error) { return &Reader{ store: store, - } + }, nil } func (r *Reader) Get(key []byte) ([]byte, error) { @@ -96,7 +96,8 @@ func (i *Iterator) Valid() bool { return false } -func (i *Iterator) Close() { +func (i *Iterator) Close() error { + return nil } type Batch struct { @@ -147,10 +148,10 @@ type Writer struct { store *Store } -func newWriter(store *Store) *Writer { +func newWriter(store *Store) (*Writer, error) { return &Writer{ store: store, - } + }, nil } func (w *Writer) Set(key, val []byte) error { diff --git a/index/store/test/common.go b/index/store/test/common.go index 6f44e191..372daec5 100644 --- a/index/store/test/common.go +++ b/index/store/test/common.go @@ -18,8 +18,11 @@ import ( func CommonTestKVStore(t *testing.T, s store.KVStore) { - writer := s.Writer() - err := writer.Set([]byte("a"), []byte("val-a")) + writer, err := s.Writer() + if err != nil { + t.Error(err) + } + err = writer.Set([]byte("a"), []byte("val-a")) if err != nil { t.Fatal(err) } @@ -49,7 +52,10 @@ func CommonTestKVStore(t *testing.T, s store.KVStore) { } writer.Close() - reader := s.Reader() + reader, err := s.Reader() + if err != nil { + t.Error(err) + } defer reader.Close() it := reader.Iterator([]byte("b")) key, val, valid := it.Current() @@ -92,15 +98,21 @@ func CommonTestKVStore(t *testing.T, s store.KVStore) { func CommonTestReaderIsolation(t *testing.T, s store.KVStore) { // insert a kv pair - writer := s.Writer() - err := writer.Set([]byte("a"), []byte("val-a")) + writer, err := s.Writer() + if err != nil { + t.Error(err) + } + err = writer.Set([]byte("a"), []byte("val-a")) if err != nil { t.Fatal(err) } writer.Close() // create an isoalted reader - reader := s.Reader() + reader, err := s.Reader() + if err != nil { + t.Error(err) + } defer reader.Close() // verify we see the value already inserted @@ -125,7 +137,10 @@ func CommonTestReaderIsolation(t *testing.T, s store.KVStore) { } // add something after the reader was created - writer = s.Writer() + writer, err = s.Writer() + if err != nil { + t.Error(err) + } err = writer.Set([]byte("b"), []byte("val-b")) if err != nil { t.Fatal(err) @@ -133,7 +148,10 @@ func CommonTestReaderIsolation(t *testing.T, s store.KVStore) { writer.Close() // ensure that a newer reader sees it - newReader := s.Reader() + newReader, err := s.Reader() + if err != nil { + t.Error(err) + } defer newReader.Close() val, err = newReader.Get([]byte("b")) if err != nil { diff --git a/index/upside_down/benchmark_common_test.go b/index/upside_down/benchmark_common_test.go index a918a3f7..fad11f63 100644 --- a/index/upside_down/benchmark_common_test.go +++ b/index/upside_down/benchmark_common_test.go @@ -69,24 +69,24 @@ func CommonBenchmarkIndexBatch(b *testing.B, s store.KVStore, analysisWorkers, b b.ResetTimer() for i := 0; i < b.N; i++ { - var batch index.Batch + batch := index.NewBatch() for j := 0; j < 1000; j++ { if j%batchSize == 0 { - if len(batch) > 0 { + if len(batch.IndexOps) > 0 { err := idx.Batch(batch) if err != nil { b.Fatal(err) } } - batch = make(index.Batch) + batch = index.NewBatch() } indexDocument := document.NewDocument(""). AddField(document.NewTextFieldWithAnalyzer("body", []uint64{}, []byte(benchmarkDocBodies[j%10]), analyzer)) indexDocument.ID = strconv.Itoa(i) + "-" + strconv.Itoa(j) - batch[indexDocument.ID] = indexDocument + batch.Update(indexDocument) } // close last batch - if len(batch) > 0 { + if len(batch.IndexOps) > 0 { err := idx.Batch(batch) if err != nil { b.Fatal(err) diff --git a/index/upside_down/dump.go b/index/upside_down/dump.go index a91fcfe3..ff4eab20 100644 --- a/index/upside_down/dump.go +++ b/index/upside_down/dump.go @@ -53,7 +53,11 @@ func (udc *UpsideDownCouch) DumpAll() chan interface{} { defer close(rv) // start an isolated reader for use during the dump - kvreader := udc.store.Reader() + kvreader, err := udc.store.Reader() + if err != nil { + rv <- err + return + } defer kvreader.Close() udc.dumpPrefix(kvreader, rv, nil) @@ -67,7 +71,11 @@ func (udc *UpsideDownCouch) DumpFields() chan interface{} { defer close(rv) // start an isolated reader for use during the dump - kvreader := udc.store.Reader() + kvreader, err := udc.store.Reader() + if err != nil { + rv <- err + return + } defer kvreader.Close() udc.dumpPrefix(kvreader, rv, []byte{'f'}) @@ -89,7 +97,11 @@ func (udc *UpsideDownCouch) DumpDoc(id string) chan interface{} { defer close(rv) // start an isolated reader for use during the dump - kvreader := udc.store.Reader() + kvreader, err := udc.store.Reader() + if err != nil { + rv <- err + return + } defer kvreader.Close() back, err := udc.backIndexRowForDoc(kvreader, id) diff --git a/index/upside_down/dump_test.go b/index/upside_down/dump_test.go index 35252701..3bcdcc07 100644 --- a/index/upside_down/dump_test.go +++ b/index/upside_down/dump_test.go @@ -34,7 +34,10 @@ func TestDump(t *testing.T) { defer idx.Close() var expectedCount uint64 - docCount := idx.DocCount() + docCount, err := idx.DocCount() + if err != nil { + t.Error(err) + } if docCount != expectedCount { t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) } diff --git a/index/upside_down/field_reader_test.go b/index/upside_down/field_reader_test.go index 19f9aa46..e39e78eb 100644 --- a/index/upside_down/field_reader_test.go +++ b/index/upside_down/field_reader_test.go @@ -49,7 +49,10 @@ func TestIndexFieldReader(t *testing.T) { } expectedCount++ - indexReader := idx.Reader() + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() reader, err := indexReader.FieldReader("name", nil, nil) if err != nil { diff --git a/index/upside_down/reader_test.go b/index/upside_down/reader_test.go index 627369c8..6aac6b54 100644 --- a/index/upside_down/reader_test.go +++ b/index/upside_down/reader_test.go @@ -49,7 +49,10 @@ func TestIndexReader(t *testing.T) { } expectedCount++ - indexReader := idx.Reader() + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() // first look for a term that doesnt exist @@ -195,7 +198,10 @@ func TestIndexDocIdReader(t *testing.T) { } expectedCount++ - indexReader := idx.Reader() + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() // first get all doc ids diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index 3cffd71f..1d6cd02e 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -125,13 +125,16 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow return } -func (udc *UpsideDownCouch) DocCount() uint64 { - return udc.docCount +func (udc *UpsideDownCouch) DocCount() (uint64, error) { + return udc.docCount, nil } func (udc *UpsideDownCouch) Open() error { // start a writer for the open process - kvwriter := udc.store.Writer() + kvwriter, err := udc.store.Writer() + if err != nil { + return err + } defer kvwriter.Close() value, err := kvwriter.Get(VersionKey) @@ -176,7 +179,10 @@ func (udc *UpsideDownCouch) countDocs(kvreader store.KVReader) uint64 { func (udc *UpsideDownCouch) rowCount() uint64 { // start an isolated reader for use during the rowcount - kvreader := udc.store.Reader() + kvreader, err := udc.store.Reader() + if err != nil { + return 0 + } defer kvreader.Close() it := kvreader.Iterator([]byte{0}) defer it.Close() @@ -192,8 +198,8 @@ func (udc *UpsideDownCouch) rowCount() uint64 { return rv } -func (udc *UpsideDownCouch) Close() { - udc.store.Close() +func (udc *UpsideDownCouch) Close() error { + return udc.store.Close() } func (udc *UpsideDownCouch) Update(doc *document.Document) error { @@ -217,7 +223,10 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error { // start a writer for this update indexStart := time.Now() - kvwriter := udc.store.Writer() + kvwriter, err := udc.store.Writer() + if err != nil { + return err + } defer kvwriter.Close() // first we lookup the backindex row for the doc id if it exists @@ -360,7 +369,10 @@ func (udc *UpsideDownCouch) indexField(docID string, field document.Field, field func (udc *UpsideDownCouch) Delete(id string) error { indexStart := time.Now() // start a writer for this delete - kvwriter := udc.store.Writer() + kvwriter, err := udc.store.Writer() + if err != nil { + return err + } defer kvwriter.Close() // lookup the back index row @@ -426,11 +438,11 @@ func (udc *UpsideDownCouch) backIndexRowForDoc(kvreader store.KVReader, docID st return backIndexRow, nil } -func (udc *UpsideDownCouch) backIndexRowsForBatch(kvreader store.KVReader, batch index.Batch) (map[string]*BackIndexRow, error) { +func (udc *UpsideDownCouch) backIndexRowsForBatch(kvreader store.KVReader, batch *index.Batch) (map[string]*BackIndexRow, error) { // FIXME faster to order the ids and scan sequentially // for now just get it working rv := make(map[string]*BackIndexRow, 0) - for docID := range batch { + for docID := range batch.IndexOps { backIndexRow, err := udc.backIndexRowForDoc(kvreader, docID) if err != nil { return nil, err @@ -498,19 +510,19 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) [] return rv } -func (udc *UpsideDownCouch) Batch(batch index.Batch) error { +func (udc *UpsideDownCouch) Batch(batch *index.Batch) error { analysisStart := time.Now() resultChan := make(chan *AnalysisResult) var numUpdates uint64 - for _, doc := range batch { + for _, doc := range batch.IndexOps { if doc != nil { numUpdates++ } } go func() { - for _, doc := range batch { + for _, doc := range batch.IndexOps { if doc != nil { aw := AnalysisWork{ udc: udc, @@ -537,7 +549,10 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error { indexStart := time.Now() // start a writer for this batch - kvwriter := udc.store.Writer() + kvwriter, err := udc.store.Writer() + if err != nil { + return err + } defer kvwriter.Close() // first lookup all the back index rows @@ -553,7 +568,7 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error { docsAdded := uint64(0) docsDeleted := uint64(0) - for docID, doc := range batch { + for docID, doc := range batch.IndexOps { backIndexRow := backIndexRows[docID] if doc == nil && backIndexRow != nil { //delete @@ -567,6 +582,18 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error { } } + // add the internal ops + for internalKey, internalValue := range batch.InternalOps { + if internalValue == nil { + // delete + deleteInternalRow := NewInternalRow([]byte(internalKey), nil) + deleteRows = append(deleteRows, deleteInternalRow) + } else { + updateInternalRow := NewInternalRow([]byte(internalKey), internalValue) + updateRows = append(updateRows, updateInternalRow) + } + } + err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows) atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart))) if err == nil { @@ -583,24 +610,34 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error { func (udc *UpsideDownCouch) SetInternal(key, val []byte) error { internalRow := NewInternalRow(key, val) - writer := udc.store.Writer() + writer, err := udc.store.Writer() + if err != nil { + return err + } defer writer.Close() return writer.Set(internalRow.Key(), internalRow.Value()) } func (udc *UpsideDownCouch) DeleteInternal(key []byte) error { internalRow := NewInternalRow(key, nil) - writer := udc.store.Writer() + writer, err := udc.store.Writer() + if err != nil { + return err + } defer writer.Close() return writer.Delete(internalRow.Key()) } -func (udc *UpsideDownCouch) Reader() index.IndexReader { +func (udc *UpsideDownCouch) Reader() (index.IndexReader, error) { + kvr, err := udc.store.Reader() + if err != nil { + return nil, err + } return &IndexReader{ index: udc, - kvreader: udc.store.Reader(), + kvreader: kvr, docCount: udc.docCount, - } + }, nil } func (udc *UpsideDownCouch) Stats() json.Marshaler { diff --git a/index/upside_down/upside_down_test.go b/index/upside_down/upside_down_test.go index 8ad4f894..dcb87017 100644 --- a/index/upside_down/upside_down_test.go +++ b/index/upside_down/upside_down_test.go @@ -39,7 +39,10 @@ func TestIndexOpenReopen(t *testing.T) { } var expectedCount uint64 - docCount := idx.DocCount() + docCount, err := idx.DocCount() + if err != nil { + t.Error(err) + } if docCount != expectedCount { t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) } @@ -81,7 +84,10 @@ func TestIndexInsert(t *testing.T) { defer idx.Close() var expectedCount uint64 - docCount := idx.DocCount() + docCount, err := idx.DocCount() + if err != nil { + t.Error(err) + } if docCount != expectedCount { t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) } @@ -94,7 +100,10 @@ func TestIndexInsert(t *testing.T) { } expectedCount++ - docCount = idx.DocCount() + docCount, err = idx.DocCount() + if err != nil { + t.Error(err) + } if docCount != expectedCount { t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) } @@ -120,7 +129,10 @@ func TestIndexInsertThenDelete(t *testing.T) { defer idx.Close() var expectedCount uint64 - docCount := idx.DocCount() + docCount, err := idx.DocCount() + if err != nil { + t.Error(err) + } if docCount != expectedCount { t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) } @@ -141,7 +153,10 @@ func TestIndexInsertThenDelete(t *testing.T) { } expectedCount++ - docCount = idx.DocCount() + docCount, err = idx.DocCount() + if err != nil { + t.Error(err) + } if docCount != expectedCount { t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) } @@ -152,7 +167,10 @@ func TestIndexInsertThenDelete(t *testing.T) { } expectedCount-- - docCount = idx.DocCount() + docCount, err = idx.DocCount() + if err != nil { + t.Error(err) + } if docCount != expectedCount { t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) } @@ -163,7 +181,10 @@ func TestIndexInsertThenDelete(t *testing.T) { } expectedCount-- - docCount = idx.DocCount() + docCount, err = idx.DocCount() + if err != nil { + t.Error(err) + } if docCount != expectedCount { t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) } @@ -280,7 +301,10 @@ func TestIndexInsertMultiple(t *testing.T) { } expectedCount++ - docCount := idx.DocCount() + docCount, err := idx.DocCount() + if err != nil { + t.Error(err) + } if docCount != expectedCount { t.Errorf("expected doc count: %d, got %d", expectedCount, docCount) } @@ -302,7 +326,10 @@ func TestIndexInsertWithStore(t *testing.T) { defer idx.Close() var expectedCount uint64 - docCount := idx.DocCount() + docCount, err := idx.DocCount() + if err != nil { + t.Error(err) + } if docCount != expectedCount { t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) } @@ -315,7 +342,10 @@ func TestIndexInsertWithStore(t *testing.T) { } expectedCount++ - docCount = idx.DocCount() + docCount, err = idx.DocCount() + if err != nil { + t.Error(err) + } if docCount != expectedCount { t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) } @@ -327,7 +357,10 @@ func TestIndexInsertWithStore(t *testing.T) { t.Errorf("expected %d rows, got: %d", expectedLength, rowCount) } - indexReader := idx.Reader() + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() storedDoc, err := indexReader.Document("1") @@ -359,7 +392,10 @@ func TestIndexInternalCRUD(t *testing.T) { } defer idx.Close() - indexReader := idx.Reader() + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() // get something that doesnt exist yet @@ -377,7 +413,10 @@ func TestIndexInternalCRUD(t *testing.T) { t.Error(err) } - indexReader = idx.Reader() + indexReader, err = idx.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() // get @@ -395,7 +434,10 @@ func TestIndexInternalCRUD(t *testing.T) { t.Error(err) } - indexReader = idx.Reader() + indexReader, err = idx.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() // get again @@ -445,21 +487,24 @@ func TestIndexBatch(t *testing.T) { // delete existing doc // net document count change 0 - batch := make(index.Batch, 0) + batch := index.NewBatch() doc = document.NewDocument("3") doc.AddField(document.NewTextField("name", []uint64{}, []byte("test3"))) - batch["3"] = doc + batch.Update(doc) doc = document.NewDocument("2") doc.AddField(document.NewTextField("name", []uint64{}, []byte("test2updated"))) - batch["2"] = doc - batch["1"] = nil + batch.Update(doc) + batch.Delete("1") err = idx.Batch(batch) if err != nil { t.Error(err) } - indexReader := idx.Reader() + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() docCount := indexReader.DocCount() @@ -502,7 +547,10 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) { defer idx.Close() var expectedCount uint64 - docCount := idx.DocCount() + docCount, err := idx.DocCount() + if err != nil { + t.Error(err) + } if docCount != expectedCount { t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) } @@ -521,7 +569,10 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) { } expectedCount++ - docCount = idx.DocCount() + docCount, err = idx.DocCount() + if err != nil { + t.Error(err) + } if docCount != expectedCount { t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) } @@ -543,7 +594,10 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) { t.Errorf("expected %d rows, got: %d", expectedLength, rowCount) } - indexReader := idx.Reader() + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() storedDoc, err := indexReader.Document("1") @@ -595,7 +649,10 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) { t.Errorf("Error updating index: %v", err) } - indexReader = idx.Reader() + indexReader, err = idx.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() // expected doc count shouldn't have changed @@ -638,7 +695,10 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) { expectedCount-- // expected doc count shouldn't have changed - docCount = idx.DocCount() + docCount, err = idx.DocCount() + if err != nil { + t.Error(err) + } if docCount != expectedCount { t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) } @@ -672,7 +732,10 @@ func TestIndexInsertFields(t *testing.T) { t.Errorf("Error updating index: %v", err) } - indexReader := idx.Reader() + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() fields, err := indexReader.Fields() @@ -734,7 +797,10 @@ func TestIndexUpdateComposites(t *testing.T) { t.Errorf("Error updating index: %v", err) } - indexReader := idx.Reader() + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() // make sure new values are in index @@ -822,7 +888,10 @@ func TestIndexTermReaderCompositeFields(t *testing.T) { t.Errorf("Error updating index: %v", err) } - indexReader := idx.Reader() + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() termFieldReader, err := indexReader.TermFieldReader([]byte("mister"), "_all") @@ -865,7 +934,10 @@ func TestIndexDocumentFieldTerms(t *testing.T) { t.Errorf("Error updating index: %v", err) } - indexReader := idx.Reader() + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() fieldTerms, err := indexReader.DocumentFieldTerms("1") diff --git a/index_alias_impl.go b/index_alias_impl.go index a3f928e3..17e75d6f 100644 --- a/index_alias_impl.go +++ b/index_alias_impl.go @@ -73,7 +73,7 @@ func (i *indexAliasImpl) Delete(id string) error { return i.indexes[0].Delete(id) } -func (i *indexAliasImpl) Batch(b Batch) error { +func (i *indexAliasImpl) Batch(b *Batch) error { i.mutex.RLock() defer i.mutex.RUnlock() @@ -105,24 +105,35 @@ func (i *indexAliasImpl) Document(id string) (*document.Document, error) { return i.indexes[0].Document(id) } -func (i *indexAliasImpl) DocCount() uint64 { +func (i *indexAliasImpl) DocCount() (uint64, error) { + i.mutex.RLock() + defer i.mutex.RUnlock() + rv := uint64(0) if !i.open { - return 0 + return 0, ErrorIndexClosed } for _, index := range i.indexes { - rv += index.DocCount() + otherCount, err := index.DocCount() + if err != nil { + return 0, err + } + rv += otherCount } - return rv + return rv, nil } func (i *indexAliasImpl) Search(req *SearchRequest) (*SearchResult, error) { i.mutex.RLock() defer i.mutex.RUnlock() + if !i.open { + return nil, ErrorIndexClosed + } + if len(i.indexes) < 1 { return nil, ErrorAliasEmpty } @@ -199,11 +210,12 @@ func (i *indexAliasImpl) DumpFields() chan interface{} { return i.indexes[0].DumpFields() } -func (i *indexAliasImpl) Close() { +func (i *indexAliasImpl) Close() error { i.mutex.Lock() defer i.mutex.Unlock() i.open = false + return nil } func (i *indexAliasImpl) Mapping() *IndexMapping { diff --git a/index_impl.go b/index_impl.go index 5e2ae8a7..01996d79 100644 --- a/index_impl.go +++ b/index_impl.go @@ -191,7 +191,10 @@ func openIndex(path string) (*indexImpl, error) { rv.stats.indexStat = rv.i.Stats() // now load the mapping - indexReader := rv.i.Reader() + indexReader, err := rv.i.Reader() + if err != nil { + return nil, err + } defer indexReader.Close() mappingBytes, err := indexReader.GetInternal(mappingInternalKey) if err != nil { @@ -271,7 +274,7 @@ func (i *indexImpl) Delete(id string) error { // operations at the same time. There are often // significant performance benefits when performing // operations in a batch. -func (i *indexImpl) Batch(b Batch) error { +func (i *indexImpl) Batch(b *Batch) error { i.mutex.RLock() defer i.mutex.RUnlock() @@ -279,8 +282,8 @@ func (i *indexImpl) Batch(b Batch) error { return ErrorIndexClosed } - ib := make(index.Batch, len(b)) - for bk, bd := range b { + ib := index.NewBatch() + for bk, bd := range b.IndexOps { if bd == nil { ib.Delete(bk) } else { @@ -289,7 +292,14 @@ func (i *indexImpl) Batch(b Batch) error { if err != nil { return err } - ib.Index(bk, doc) + ib.Update(doc) + } + } + for ik, iv := range b.InternalOps { + if iv == nil { + ib.DeleteInternal([]byte(ik)) + } else { + ib.SetInternal([]byte(ik), iv) } } return i.i.Batch(ib) @@ -306,19 +316,22 @@ func (i *indexImpl) Document(id string) (*document.Document, error) { if !i.open { return nil, ErrorIndexClosed } - indexReader := i.i.Reader() + indexReader, err := i.i.Reader() + if err != nil { + return nil, err + } defer indexReader.Close() return indexReader.Document(id) } // DocCount returns the number of documents in the // index. -func (i *indexImpl) DocCount() uint64 { +func (i *indexImpl) DocCount() (uint64, error) { i.mutex.RLock() defer i.mutex.RUnlock() if !i.open { - return 0 + return 0, ErrorIndexClosed } return i.i.DocCount() @@ -339,7 +352,10 @@ func (i *indexImpl) Search(req *SearchRequest) (*SearchResult, error) { collector := collectors.NewTopScorerSkipCollector(req.Size, req.From) // open a reader for this search - indexReader := i.i.Reader() + indexReader, err := i.i.Reader() + if err != nil { + return nil, err + } defer indexReader.Close() searcher, err := req.Query.Searcher(indexReader, i.m, req.Explain) @@ -475,7 +491,10 @@ func (i *indexImpl) Fields() ([]string, error) { return nil, ErrorIndexClosed } - indexReader := i.i.Reader() + indexReader, err := i.i.Reader() + if err != nil { + return nil, err + } defer indexReader.Close() return indexReader.Fields() } @@ -522,12 +541,12 @@ func (i *indexImpl) DumpDoc(id string) chan interface{} { return i.i.DumpDoc(id) } -func (i *indexImpl) Close() { +func (i *indexImpl) Close() error { i.mutex.Lock() defer i.mutex.Unlock() i.open = false - i.i.Close() + return i.i.Close() } func (i *indexImpl) Stats() *IndexStat { @@ -538,7 +557,10 @@ func (i *indexImpl) GetInternal(key []byte) ([]byte, error) { i.mutex.RLock() defer i.mutex.RUnlock() - reader := i.i.Reader() + reader, err := i.i.Reader() + if err != nil { + return nil, err + } defer reader.Close() return reader.GetInternal(key) diff --git a/search/searchers/search_boolean_test.go b/search/searchers/search_boolean_test.go index e26af089..40fd56ed 100644 --- a/search/searchers/search_boolean_test.go +++ b/search/searchers/search_boolean_test.go @@ -17,7 +17,10 @@ import ( func TestBooleanSearch(t *testing.T) { - twoDocIndexReader := twoDocIndex.Reader() + twoDocIndexReader, err := twoDocIndex.Reader() + if err != nil { + t.Error(err) + } defer twoDocIndexReader.Close() // test 0 diff --git a/search/searchers/search_conjunction_test.go b/search/searchers/search_conjunction_test.go index e2d1d474..24f6518c 100644 --- a/search/searchers/search_conjunction_test.go +++ b/search/searchers/search_conjunction_test.go @@ -17,7 +17,10 @@ import ( func TestConjunctionSearch(t *testing.T) { - twoDocIndexReader := twoDocIndex.Reader() + twoDocIndexReader, err := twoDocIndex.Reader() + if err != nil { + t.Error(err) + } defer twoDocIndexReader.Close() // test 0 diff --git a/search/searchers/search_disjunction_test.go b/search/searchers/search_disjunction_test.go index bcce3a4f..cb306e3a 100644 --- a/search/searchers/search_disjunction_test.go +++ b/search/searchers/search_disjunction_test.go @@ -17,7 +17,10 @@ import ( func TestDisjunctionSearch(t *testing.T) { - twoDocIndexReader := twoDocIndex.Reader() + twoDocIndexReader, err := twoDocIndex.Reader() + if err != nil { + t.Error(err) + } defer twoDocIndexReader.Close() martyTermSearcher, err := NewTermSearcher(twoDocIndexReader, "marty", "name", 1.0, true) @@ -122,7 +125,10 @@ func TestDisjunctionSearch(t *testing.T) { func TestDisjunctionAdvance(t *testing.T) { - twoDocIndexReader := twoDocIndex.Reader() + twoDocIndexReader, err := twoDocIndex.Reader() + if err != nil { + t.Error(err) + } defer twoDocIndexReader.Close() martyTermSearcher, err := NewTermSearcher(twoDocIndexReader, "marty", "name", 1.0, true) diff --git a/search/searchers/search_match_all_test.go b/search/searchers/search_match_all_test.go index 3d1433ac..ad3311e3 100644 --- a/search/searchers/search_match_all_test.go +++ b/search/searchers/search_match_all_test.go @@ -17,7 +17,10 @@ import ( func TestMatchAllSearch(t *testing.T) { - twoDocIndexReader := twoDocIndex.Reader() + twoDocIndexReader, err := twoDocIndex.Reader() + if err != nil { + t.Error(err) + } defer twoDocIndexReader.Close() allSearcher, err := NewMatchAllSearcher(twoDocIndexReader, 1.0, true) diff --git a/search/searchers/search_match_none_test.go b/search/searchers/search_match_none_test.go index 2fcbb85e..07a68dbf 100644 --- a/search/searchers/search_match_none_test.go +++ b/search/searchers/search_match_none_test.go @@ -17,7 +17,10 @@ import ( func TestMatchNoneSearch(t *testing.T) { - twoDocIndexReader := twoDocIndex.Reader() + twoDocIndexReader, err := twoDocIndex.Reader() + if err != nil { + t.Error(err) + } defer twoDocIndexReader.Close() noneSearcher, err := NewMatchNoneSearcher(twoDocIndexReader) diff --git a/search/searchers/search_phrase_test.go b/search/searchers/search_phrase_test.go index 95059245..15fb53c7 100644 --- a/search/searchers/search_phrase_test.go +++ b/search/searchers/search_phrase_test.go @@ -17,7 +17,10 @@ import ( func TestPhraseSearch(t *testing.T) { - twoDocIndexReader := twoDocIndex.Reader() + twoDocIndexReader, err := twoDocIndex.Reader() + if err != nil { + t.Error(err) + } defer twoDocIndexReader.Close() angstTermSearcher, err := NewTermSearcher(twoDocIndexReader, "angst", "desc", 1.0, true) diff --git a/search/searchers/search_term_test.go b/search/searchers/search_term_test.go index 6286105f..771ba314 100644 --- a/search/searchers/search_term_test.go +++ b/search/searchers/search_term_test.go @@ -89,7 +89,10 @@ func TestTermSearcher(t *testing.T) { }, }) - indexReader := i.Reader() + indexReader, err := i.Reader() + if err != nil { + t.Error(err) + } defer indexReader.Close() searcher, err := NewTermSearcher(indexReader, queryTerm, queryField, queryBoost, queryExplain) @@ -99,7 +102,11 @@ func TestTermSearcher(t *testing.T) { defer searcher.Close() searcher.SetQueryNorm(2.0) - idf := 1.0 + math.Log(float64(i.DocCount())/float64(searcher.Count()+1.0)) + docCount, err := i.DocCount() + if err != nil { + t.Fatal(err) + } + idf := 1.0 + math.Log(float64(docCount)/float64(searcher.Count()+1.0)) expectedQueryWeight := 3 * idf * 3 * idf if expectedQueryWeight != searcher.Weight() { t.Errorf("expected weight %v got %v", expectedQueryWeight, searcher.Weight())