diff --git a/index/firestorm/analysis_test.go b/index/firestorm/analysis_test.go index 9aab01f4..6764be23 100644 --- a/index/firestorm/analysis_test.go +++ b/index/firestorm/analysis_test.go @@ -62,7 +62,10 @@ func TestAnalysis(t *testing.T) { t.Fatal(err) } // warmup to load field cache and set maxRead correctly - f.(*Firestorm).warmup(kvreader) + err = f.(*Firestorm).warmup(kvreader) + if err != nil { + t.Fatal(err) + } tests := []struct { d *document.Document diff --git a/index/firestorm/dict_updater.go b/index/firestorm/dict_updater.go index 25cb6df5..38a5e59a 100644 --- a/index/firestorm/dict_updater.go +++ b/index/firestorm/dict_updater.go @@ -94,7 +94,7 @@ func (d *DictUpdater) update() { // open a writer writer, err := d.f.store.Writer() if err != nil { - writer.Close() + _ = writer.Close() logger.Printf("dict updater fatal: %v", err) return } @@ -110,7 +110,7 @@ func (d *DictUpdater) update() { err = writer.ExecuteBatch(wb) if err != nil { - writer.Close() + _ = writer.Close() logger.Printf("dict updater fatal: %v", err) return } @@ -123,18 +123,19 @@ func (d *DictUpdater) update() { // this is not intended to be used publicly, only for unit tests // which depend on consistency we no longer provide func (d *DictUpdater) waitTasksDone(dur time.Duration) error { + initial := atomic.LoadUint64(&d.batchesStarted) timeout := time.After(dur) tick := time.Tick(100 * time.Millisecond) for { select { // Got a timeout! fail with a timeout error case <-timeout: - return fmt.Errorf("timeout") + flushed := atomic.LoadUint64(&d.batchesFlushed) + return fmt.Errorf("timeout, %d/%d", initial, flushed) // Got a tick, we should check on doSomething() case <-tick: - started := atomic.LoadUint64(&d.batchesStarted) flushed := atomic.LoadUint64(&d.batchesFlushed) - if started == flushed { + if flushed > initial { return nil } } diff --git a/index/firestorm/dump.go b/index/firestorm/dump.go index c4126d30..b7c222db 100644 --- a/index/firestorm/dump.go +++ b/index/firestorm/dump.go @@ -22,8 +22,8 @@ import ( // if your application relies on them, you're doing something wrong // they may change or be removed at any time -func (f *Firestorm) dumpPrefix(kvreader store.KVReader, rv chan interface{}, prefix []byte) { - visitPrefix(kvreader, prefix, func(key, val []byte) (bool, error) { +func (f *Firestorm) dumpPrefix(kvreader store.KVReader, rv chan interface{}, prefix []byte) error { + return visitPrefix(kvreader, prefix, func(key, val []byte) (bool, error) { row, err := parseFromKeyValue(key, val) if err != nil { rv <- err @@ -34,11 +34,11 @@ func (f *Firestorm) dumpPrefix(kvreader store.KVReader, rv chan interface{}, pre }) } -func (f *Firestorm) dumpDoc(kvreader store.KVReader, rv chan interface{}, docID []byte) { +func (f *Firestorm) dumpDoc(kvreader store.KVReader, rv chan interface{}, docID []byte) error { // without a back index we have no choice but to walk the term freq and stored rows // walk the term freqs - visitPrefix(kvreader, TermFreqKeyPrefix, func(key, val []byte) (bool, error) { + err := visitPrefix(kvreader, TermFreqKeyPrefix, func(key, val []byte) (bool, error) { tfr, err := NewTermFreqRowKV(key, val) if err != nil { rv <- err @@ -50,8 +50,12 @@ func (f *Firestorm) dumpDoc(kvreader store.KVReader, rv chan interface{}, docID return true, nil }) + if err != nil { + return err + } + // now walk the stored - visitPrefix(kvreader, StoredKeyPrefix, func(key, val []byte) (bool, error) { + err = visitPrefix(kvreader, StoredKeyPrefix, func(key, val []byte) (bool, error) { sr, err := NewStoredRowKV(key, val) if err != nil { rv <- err @@ -62,6 +66,8 @@ func (f *Firestorm) dumpDoc(kvreader store.KVReader, rv chan interface{}, docID } return true, nil }) + + return err } func parseFromKeyValue(key, value []byte) (index.IndexRow, error) { diff --git a/index/firestorm/dump_test.go b/index/firestorm/dump_test.go index b75ec38f..8fb32f3c 100644 --- a/index/firestorm/dump_test.go +++ b/index/firestorm/dump_test.go @@ -105,7 +105,10 @@ func TestDump(t *testing.T) { t.Errorf("expected %d rows for document, got %d", expectedDocRowCount, docRowCount) } - idx.(*Firestorm).dictUpdater.waitTasksDone(dictWaitDuration) + err = idx.(*Firestorm).dictUpdater.waitTasksDone(dictWaitDuration) + if err != nil { + t.Fatal(err) + } // 1 version // fieldsCount field rows diff --git a/index/firestorm/firestorm.go b/index/firestorm/firestorm.go index 30dd53e6..5dfdec13 100644 --- a/index/firestorm/firestorm.go +++ b/index/firestorm/firestorm.go @@ -90,7 +90,10 @@ func (f *Firestorm) Open() (err error) { if !newIndex { // process existing index before opening - f.warmup(kvreader) + err = f.warmup(kvreader) + if err != nil { + return + } } err = kvreader.Close() @@ -100,7 +103,10 @@ func (f *Firestorm) Open() (err error) { if newIndex { // prepare a new index - f.bootstrap() + err = f.bootstrap() + if err != nil { + return + } } // start the garbage collector @@ -384,7 +390,11 @@ func (f *Firestorm) DumpAll() chan interface{} { } }() - f.dumpPrefix(kvreader, rv, nil) + err = f.dumpPrefix(kvreader, rv, nil) + if err != nil { + rv <- err + return + } }() return rv } @@ -407,7 +417,11 @@ func (f *Firestorm) DumpDoc(docID string) chan interface{} { } }() - f.dumpDoc(kvreader, rv, []byte(docID)) + err = f.dumpDoc(kvreader, rv, []byte(docID)) + if err != nil { + rv <- err + return + } }() return rv } @@ -430,7 +444,11 @@ func (f *Firestorm) DumpFields() chan interface{} { } }() - f.dumpPrefix(kvreader, rv, FieldKeyPrefix) + err = f.dumpPrefix(kvreader, rv, FieldKeyPrefix) + if err != nil { + rv <- err + return + } }() return rv } diff --git a/index/firestorm/firestorm_test.go b/index/firestorm/firestorm_test.go index cee49fdc..469f9d25 100644 --- a/index/firestorm/firestorm_test.go +++ b/index/firestorm/firestorm_test.go @@ -115,7 +115,10 @@ func TestIndexInsert(t *testing.T) { } expectedCount++ - idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + err = idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + if err != nil { + t.Fatal(err) + } docCount, err = idx.DocCount() if err != nil { @@ -168,7 +171,10 @@ func TestIndexInsertThenDelete(t *testing.T) { } expectedCount++ - idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + err = idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + if err != nil { + t.Fatal(err) + } doc2 := document.NewDocument("2") doc2.AddField(document.NewTextField("name", []uint64{}, []byte("test"))) @@ -178,7 +184,10 @@ func TestIndexInsertThenDelete(t *testing.T) { } expectedCount++ - idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + err = idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + if err != nil { + t.Fatal(err) + } docCount, err = idx.DocCount() if err != nil { @@ -194,7 +203,10 @@ func TestIndexInsertThenDelete(t *testing.T) { } expectedCount-- - idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + err = idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + if err != nil { + t.Fatal(err) + } docCount, err = idx.DocCount() if err != nil { @@ -210,7 +222,10 @@ func TestIndexInsertThenDelete(t *testing.T) { } expectedCount-- - idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + err = idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + if err != nil { + t.Fatal(err) + } docCount, err = idx.DocCount() if err != nil { @@ -270,7 +285,10 @@ func TestIndexInsertThenUpdate(t *testing.T) { t.Errorf("Error deleting entry from index: %v", err) } - idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + err = idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + if err != nil { + t.Fatal(err) + } docCount, err := idx.DocCount() if err != nil { @@ -347,7 +365,10 @@ func TestIndexInsertMultiple(t *testing.T) { } expectedCount++ - idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + err = idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + if err != nil { + t.Fatal(err) + } docCount, err := idx.DocCount() if err != nil { @@ -399,7 +420,10 @@ func TestIndexInsertWithStore(t *testing.T) { } expectedCount++ - idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + err = idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + if err != nil { + t.Fatal(err) + } docCount, err = idx.DocCount() if err != nil { @@ -605,7 +629,10 @@ func TestIndexBatch(t *testing.T) { } }() - idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + err = idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + if err != nil { + t.Fatal(err) + } docCount := indexReader.DocCount() if docCount != expectedCount { @@ -683,7 +710,10 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) { } expectedCount++ - idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + err = idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + if err != nil { + t.Fatal(err) + } docCount, err = idx.DocCount() if err != nil { @@ -801,7 +831,10 @@ func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) { err = idx.Delete("1") expectedCount-- - idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + err = idx.(*Firestorm).lookuper.waitTasksDone(lookupWaitDuration) + if err != nil { + t.Fatal(err) + } // expected doc count shouldn't have changed docCount, err = idx.DocCount() diff --git a/index/firestorm/garbage.go b/index/firestorm/garbage.go index 0f08dfa6..42a6b999 100644 --- a/index/firestorm/garbage.go +++ b/index/firestorm/garbage.go @@ -122,7 +122,11 @@ func (gc *GarbageCollector) cleanup() { logger.Printf("garbage collector fatal: %v", err) return } - defer reader.Close() + defer func() { + if cerr := reader.Close(); err == nil && cerr != nil { + err = cerr + } + }() // walk all the term freq rows (where field > 0) termFreqStart := TermFreqIteratorStart(0, []byte{ByteSeparator}) @@ -176,7 +180,7 @@ func (gc *GarbageCollector) cleanup() { // open a writer writer, err := gc.f.store.Writer() if err != nil { - writer.Close() + _ = writer.Close() logger.Printf("garbage collector fatal: %v", err) return } @@ -190,7 +194,7 @@ func (gc *GarbageCollector) cleanup() { err = writer.ExecuteBatch(wb) if err != nil { - writer.Close() + _ = writer.Close() logger.Printf("garbage collector fatal: %v", err) return } @@ -212,7 +216,11 @@ func (gc *GarbageCollector) cleanup() { logger.Printf("garbage collector fatal: %v", err) return } - writer.Close() + err = writer.Close() + if err != nil { + logger.Printf("garbage collector fatal: %v", err) + return + } } // updating dictionary in one batch diff --git a/index/firestorm/lookup.go b/index/firestorm/lookup.go index 219a2559..c07ea525 100644 --- a/index/firestorm/lookup.go +++ b/index/firestorm/lookup.go @@ -81,7 +81,11 @@ func (l *Lookuper) lookup(task *lookupTask) { logger.Printf("lookuper fatal: %v", err) return } - defer reader.Close() + defer func() { + if cerr := reader.Close(); err == nil && cerr != nil { + err = cerr + } + }() prefix := TermFreqPrefixFieldTermDocId(0, nil, task.docID) logger.Printf("lookuper prefix - % x", prefix) diff --git a/index/firestorm/reader.go b/index/firestorm/reader.go index 4178d53d..553ada99 100644 --- a/index/firestorm/reader.go +++ b/index/firestorm/reader.go @@ -150,7 +150,7 @@ func (r *firestormReader) DocumentFieldTerms(id string) (index.FieldTerms, error rv := make(index.FieldTerms, 0) // walk the term freqs - visitPrefix(r.r, TermFreqKeyPrefix, func(key, val []byte) (bool, error) { + err = visitPrefix(r.r, TermFreqKeyPrefix, func(key, val []byte) (bool, error) { tfr, err := NewTermFreqRowKV(key, val) if err != nil { return false, err @@ -166,6 +166,9 @@ func (r *firestormReader) DocumentFieldTerms(id string) (index.FieldTerms, error } return true, nil }) + if err != nil { + return nil, err + } return rv, nil } diff --git a/index/firestorm/reader_dict_test.go b/index/firestorm/reader_dict_test.go index afe2aebf..846120df 100644 --- a/index/firestorm/reader_dict_test.go +++ b/index/firestorm/reader_dict_test.go @@ -67,7 +67,10 @@ func TestDictionaryReader(t *testing.T) { t.Fatal(err) } - f.(*Firestorm).warmup(kvreader) + err = f.(*Firestorm).warmup(kvreader) + if err != nil { + t.Fatal(err) + } err = kvreader.Close() if err != nil { diff --git a/index/firestorm/reader_docs_test.go b/index/firestorm/reader_docs_test.go index 3f306e7a..bec40437 100644 --- a/index/firestorm/reader_docs_test.go +++ b/index/firestorm/reader_docs_test.go @@ -83,7 +83,10 @@ func TestDocIDReaderSomeGarbage(t *testing.T) { } // warmup to load field cache and set maxRead correctly - f.(*Firestorm).warmup(kvreader) + err = f.(*Firestorm).warmup(kvreader) + if err != nil { + t.Fatal(err) + } err = kvreader.Close() if err != nil { diff --git a/index/firestorm/reader_terms_test.go b/index/firestorm/reader_terms_test.go index 2ed1031e..4007ec39 100644 --- a/index/firestorm/reader_terms_test.go +++ b/index/firestorm/reader_terms_test.go @@ -65,7 +65,10 @@ func TestTermReaderNoGarbage(t *testing.T) { } // warmup to load field cache and set maxRead correctly - f.(*Firestorm).warmup(kvreader) + err = f.(*Firestorm).warmup(kvreader) + if err != nil { + t.Fatal(err) + } err = kvreader.Close() if err != nil { @@ -173,7 +176,10 @@ func TestTermReaderSomeGarbage(t *testing.T) { } // warmup to load field cache and set maxRead correctly - f.(*Firestorm).warmup(kvreader) + err = f.(*Firestorm).warmup(kvreader) + if err != nil { + t.Fatal(err) + } err = kvreader.Close() if err != nil {