From 78453dab7d7338dd72e5aa157af609a73a58ddab Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Thu, 19 Mar 2015 18:40:41 -0700 Subject: [PATCH] metrics KVStore now tracks last 100 errors --- index/store/metrics/metrics.go | 98 +++++++++++++++++++++++++++-- index/store/metrics/metrics_test.go | 65 ++++++++++++++++++- 2 files changed, 157 insertions(+), 6 deletions(-) diff --git a/index/store/metrics/metrics.go b/index/store/metrics/metrics.go index 02bf418f..01e3d48e 100644 --- a/index/store/metrics/metrics.go +++ b/index/store/metrics/metrics.go @@ -17,8 +17,12 @@ package metrics import ( + "container/list" + "encoding/json" "fmt" "io" + "sync" + "time" "github.com/blevesearch/bleve/index/store" "github.com/blevesearch/bleve/registry" @@ -27,6 +31,7 @@ import ( ) const Name = "metrics" +const MaxErrors = 100 func init() { registry.RegisterKVStore(Name, StoreConstructor) @@ -72,6 +77,8 @@ func NewBleveMetricsStore(o store.KVStore) *Store { TimerIteratorNext: metrics.NewTimer(), TimerBatchMerge: metrics.NewTimer(), TimerBatchExecute: metrics.NewTimer(), + + errors: list.New(), } } @@ -92,6 +99,16 @@ type Store struct { TimerIteratorNext metrics.Timer TimerBatchMerge metrics.Timer TimerBatchExecute metrics.Timer + + m sync.Mutex // Protects the fields that follow. + errors *list.List // Capped list of StoreError's. +} + +type StoreError struct { + Time string + Op string + Err string + Key string } type Reader struct { @@ -121,6 +138,7 @@ func (s *Store) Close() error { func (s *Store) Reader() (store.KVReader, error) { o, err := s.o.Reader() if err != nil { + s.AddError("Reader", err, nil) return nil, err } return &Reader{s: s, o: o}, nil @@ -129,6 +147,7 @@ func (s *Store) Reader() (store.KVReader, error) { func (s *Store) Writer() (store.KVWriter, error) { o, err := s.o.Writer() if err != nil { + s.AddError("Writer", err, nil) return nil, err } return &Writer{s: s, o: o}, nil @@ -141,6 +160,9 @@ func (s *Store) Actual() store.KVStore { func (w *Reader) Get(key []byte) (v []byte, err error) { w.s.TimerReaderGet.Time(func() { v, err = w.o.Get(key) + if err != nil { + w.s.AddError("Reader.Get", err, key) + } }) return } @@ -153,12 +175,19 @@ func (w *Reader) Iterator(key []byte) (i store.KVIterator) { } func (w *Reader) Close() error { - return w.o.Close() + err := w.o.Close() + if err != nil { + w.s.AddError("Reader.Close", err, nil) + } + return err } func (w *Writer) Get(key []byte) (v []byte, err error) { w.s.TimerWriterGet.Time(func() { v, err = w.o.Get(key) + if err != nil { + w.s.AddError("Writer.Get", err, key) + } }) return } @@ -171,12 +200,19 @@ func (w *Writer) Iterator(key []byte) (i store.KVIterator) { } func (w *Writer) Close() error { - return w.o.Close() + err := w.o.Close() + if err != nil { + w.s.AddError("Writer.Close", err, nil) + } + return err } func (w *Writer) Set(key, val []byte) (err error) { w.s.TimerWriterSet.Time(func() { err = w.o.Set(key, val) + if err != nil { + w.s.AddError("Writer.Set", err, key) + } }) return } @@ -184,6 +220,9 @@ func (w *Writer) Set(key, val []byte) (err error) { func (w *Writer) Delete(key []byte) (err error) { w.s.TimerWriterDelete.Time(func() { err = w.o.Delete(key) + if err != nil { + w.s.AddError("Writer.Delete", err, key) + } }) return } @@ -227,7 +266,11 @@ func (w *Iterator) Valid() bool { } func (w *Iterator) Close() error { - return w.o.Close() + err := w.o.Close() + if err != nil { + w.s.AddError("Iterator.Close", err, nil) + } + return err } func (w *Batch) Set(key, val []byte) { @@ -247,12 +290,37 @@ func (w *Batch) Merge(key []byte, oper store.AssociativeMerge) { func (w *Batch) Execute() (err error) { w.s.TimerBatchExecute.Time(func() { err = w.o.Execute() + if err != nil { + w.s.AddError("Batch.Execute", err, nil) + } }) return } func (w *Batch) Close() error { - return w.o.Close() + err := w.o.Close() + if err != nil { + w.s.AddError("Batch.Close", err, nil) + } + return err +} + +// -------------------------------------------------------- + +func (s *Store) AddError(op string, err error, key []byte) { + e := &StoreError{ + Time: time.Now().Format(time.RFC3339Nano), + Op: op, + Err: fmt.Sprintf("%v", err), + Key: string(key), + } + + s.m.Lock() + for s.errors.Len() >= MaxErrors { + s.errors.Remove(s.errors.Front()) + } + s.errors.PushBack(e) + s.m.Unlock() } // -------------------------------------------------------- @@ -280,6 +348,28 @@ func (s *Store) WriteJSON(w io.Writer) { WriteTimerJSON(w, s.TimerBatchMerge) w.Write([]byte(`,"TimerBatchExecute":`)) WriteTimerJSON(w, s.TimerBatchExecute) + + w.Write([]byte(`,"Errors":[`)) + s.m.Lock() + e := s.errors.Front() + i := 0 + for e != nil { + se, ok := e.Value.(*StoreError) + if ok && se != nil { + if i > 0 { + w.Write([]byte(",")) + } + buf, err := json.Marshal(se) + if err == nil { + w.Write(buf) + } + } + e = e.Next() + i = i + 1 + } + s.m.Unlock() + w.Write([]byte(`]`)) + w.Write([]byte(`}`)) } diff --git a/index/store/metrics/metrics_test.go b/index/store/metrics/metrics_test.go index 646b2c49..07dc09c8 100644 --- a/index/store/metrics/metrics_test.go +++ b/index/store/metrics/metrics_test.go @@ -17,6 +17,7 @@ package metrics import ( "bytes" "encoding/json" + "fmt" "reflect" "testing" @@ -85,7 +86,6 @@ func TestReaderIsolation(t *testing.T) { } func CommonTestKVStore(t *testing.T, s store.KVStore) { - writer, err := s.Writer() if err != nil { t.Error(err) @@ -261,5 +261,66 @@ func CommonTestReaderIsolation(t *testing.T, s store.KVStore) { if count != 1 { t.Errorf("expected iterator to see 1, saw %d", count) } - +} + +func TestErrors(t *testing.T) { + s, err := StoreConstructor(map[string]interface{}{ + "kvStoreName_actual": "gtreap", + }) + if err != nil { + t.Fatal(err) + } + + x, ok := s.(*Store) + if !ok { + t.Errorf("expecting a Store") + } + + x.AddError("foo", fmt.Errorf("Foo"), []byte("fooKey")) + x.AddError("bar", fmt.Errorf("Bar"), nil) + x.AddError("baz", fmt.Errorf("Baz"), []byte("bazKey")) + + b := bytes.NewBuffer(nil) + x.WriteJSON(b) + + var m map[string]interface{} + err = json.Unmarshal(b.Bytes(), &m) + if err != nil { + t.Errorf("expected unmarshallable writeJSON, err: %v, b: %s", + err, b.Bytes()) + } + + errorsi, ok := m["Errors"] + if !ok || errorsi == nil { + t.Errorf("expected errorsi") + } + errors, ok := errorsi.([]interface{}) + if !ok || errors == nil { + t.Errorf("expected errorsi is array") + } + if len(errors) != 3 { + t.Errorf("expected errors len 3") + } + + e := errors[0].(map[string]interface{}) + if e["Op"].(string) != "foo" || + e["Err"].(string) != "Foo" || + len(e["Time"].(string)) < 10 || + e["Key"].(string) != "fooKey" { + t.Errorf("expected foo, %#v", e) + } + e = errors[1].(map[string]interface{}) + if e["Op"].(string) != "bar" || + e["Err"].(string) != "Bar" || + len(e["Time"].(string)) < 10 || + e["Key"].(string) != "" { + t.Errorf("expected bar, %#v", e) + } + e = errors[2].(map[string]interface{}) + if e["Op"].(string) != "baz" || + e["Err"].(string) != "Baz" || + len(e["Time"].(string)) < 10 || + e["Key"].(string) != "bazKey" { + t.Errorf("expected baz, %#v", e) + } }