metrics KVStore now tracks last 100 errors
This commit is contained in:
parent
62645f10a2
commit
78453dab7d
@ -17,8 +17,12 @@
|
|||||||
package metrics
|
package metrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"container/list"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/blevesearch/bleve/index/store"
|
"github.com/blevesearch/bleve/index/store"
|
||||||
"github.com/blevesearch/bleve/registry"
|
"github.com/blevesearch/bleve/registry"
|
||||||
@ -27,6 +31,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const Name = "metrics"
|
const Name = "metrics"
|
||||||
|
const MaxErrors = 100
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
registry.RegisterKVStore(Name, StoreConstructor)
|
registry.RegisterKVStore(Name, StoreConstructor)
|
||||||
@ -72,6 +77,8 @@ func NewBleveMetricsStore(o store.KVStore) *Store {
|
|||||||
TimerIteratorNext: metrics.NewTimer(),
|
TimerIteratorNext: metrics.NewTimer(),
|
||||||
TimerBatchMerge: metrics.NewTimer(),
|
TimerBatchMerge: metrics.NewTimer(),
|
||||||
TimerBatchExecute: metrics.NewTimer(),
|
TimerBatchExecute: metrics.NewTimer(),
|
||||||
|
|
||||||
|
errors: list.New(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,6 +99,16 @@ type Store struct {
|
|||||||
TimerIteratorNext metrics.Timer
|
TimerIteratorNext metrics.Timer
|
||||||
TimerBatchMerge metrics.Timer
|
TimerBatchMerge metrics.Timer
|
||||||
TimerBatchExecute metrics.Timer
|
TimerBatchExecute metrics.Timer
|
||||||
|
|
||||||
|
m sync.Mutex // Protects the fields that follow.
|
||||||
|
errors *list.List // Capped list of StoreError's.
|
||||||
|
}
|
||||||
|
|
||||||
|
type StoreError struct {
|
||||||
|
Time string
|
||||||
|
Op string
|
||||||
|
Err string
|
||||||
|
Key string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Reader struct {
|
type Reader struct {
|
||||||
@ -121,6 +138,7 @@ func (s *Store) Close() error {
|
|||||||
func (s *Store) Reader() (store.KVReader, error) {
|
func (s *Store) Reader() (store.KVReader, error) {
|
||||||
o, err := s.o.Reader()
|
o, err := s.o.Reader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.AddError("Reader", err, nil)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Reader{s: s, o: o}, nil
|
return &Reader{s: s, o: o}, nil
|
||||||
@ -129,6 +147,7 @@ func (s *Store) Reader() (store.KVReader, error) {
|
|||||||
func (s *Store) Writer() (store.KVWriter, error) {
|
func (s *Store) Writer() (store.KVWriter, error) {
|
||||||
o, err := s.o.Writer()
|
o, err := s.o.Writer()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.AddError("Writer", err, nil)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Writer{s: s, o: o}, nil
|
return &Writer{s: s, o: o}, nil
|
||||||
@ -141,6 +160,9 @@ func (s *Store) Actual() store.KVStore {
|
|||||||
func (w *Reader) Get(key []byte) (v []byte, err error) {
|
func (w *Reader) Get(key []byte) (v []byte, err error) {
|
||||||
w.s.TimerReaderGet.Time(func() {
|
w.s.TimerReaderGet.Time(func() {
|
||||||
v, err = w.o.Get(key)
|
v, err = w.o.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
w.s.AddError("Reader.Get", err, key)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -153,12 +175,19 @@ func (w *Reader) Iterator(key []byte) (i store.KVIterator) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *Reader) Close() error {
|
func (w *Reader) Close() error {
|
||||||
return w.o.Close()
|
err := w.o.Close()
|
||||||
|
if err != nil {
|
||||||
|
w.s.AddError("Reader.Close", err, nil)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Writer) Get(key []byte) (v []byte, err error) {
|
func (w *Writer) Get(key []byte) (v []byte, err error) {
|
||||||
w.s.TimerWriterGet.Time(func() {
|
w.s.TimerWriterGet.Time(func() {
|
||||||
v, err = w.o.Get(key)
|
v, err = w.o.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
w.s.AddError("Writer.Get", err, key)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -171,12 +200,19 @@ func (w *Writer) Iterator(key []byte) (i store.KVIterator) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *Writer) Close() error {
|
func (w *Writer) Close() error {
|
||||||
return w.o.Close()
|
err := w.o.Close()
|
||||||
|
if err != nil {
|
||||||
|
w.s.AddError("Writer.Close", err, nil)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Writer) Set(key, val []byte) (err error) {
|
func (w *Writer) Set(key, val []byte) (err error) {
|
||||||
w.s.TimerWriterSet.Time(func() {
|
w.s.TimerWriterSet.Time(func() {
|
||||||
err = w.o.Set(key, val)
|
err = w.o.Set(key, val)
|
||||||
|
if err != nil {
|
||||||
|
w.s.AddError("Writer.Set", err, key)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -184,6 +220,9 @@ func (w *Writer) Set(key, val []byte) (err error) {
|
|||||||
func (w *Writer) Delete(key []byte) (err error) {
|
func (w *Writer) Delete(key []byte) (err error) {
|
||||||
w.s.TimerWriterDelete.Time(func() {
|
w.s.TimerWriterDelete.Time(func() {
|
||||||
err = w.o.Delete(key)
|
err = w.o.Delete(key)
|
||||||
|
if err != nil {
|
||||||
|
w.s.AddError("Writer.Delete", err, key)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -227,7 +266,11 @@ func (w *Iterator) Valid() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *Iterator) Close() error {
|
func (w *Iterator) Close() error {
|
||||||
return w.o.Close()
|
err := w.o.Close()
|
||||||
|
if err != nil {
|
||||||
|
w.s.AddError("Iterator.Close", err, nil)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Batch) Set(key, val []byte) {
|
func (w *Batch) Set(key, val []byte) {
|
||||||
@ -247,12 +290,37 @@ func (w *Batch) Merge(key []byte, oper store.AssociativeMerge) {
|
|||||||
func (w *Batch) Execute() (err error) {
|
func (w *Batch) Execute() (err error) {
|
||||||
w.s.TimerBatchExecute.Time(func() {
|
w.s.TimerBatchExecute.Time(func() {
|
||||||
err = w.o.Execute()
|
err = w.o.Execute()
|
||||||
|
if err != nil {
|
||||||
|
w.s.AddError("Batch.Execute", err, nil)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Batch) Close() error {
|
func (w *Batch) Close() error {
|
||||||
return w.o.Close()
|
err := w.o.Close()
|
||||||
|
if err != nil {
|
||||||
|
w.s.AddError("Batch.Close", err, nil)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// --------------------------------------------------------
|
||||||
|
|
||||||
|
func (s *Store) AddError(op string, err error, key []byte) {
|
||||||
|
e := &StoreError{
|
||||||
|
Time: time.Now().Format(time.RFC3339Nano),
|
||||||
|
Op: op,
|
||||||
|
Err: fmt.Sprintf("%v", err),
|
||||||
|
Key: string(key),
|
||||||
|
}
|
||||||
|
|
||||||
|
s.m.Lock()
|
||||||
|
for s.errors.Len() >= MaxErrors {
|
||||||
|
s.errors.Remove(s.errors.Front())
|
||||||
|
}
|
||||||
|
s.errors.PushBack(e)
|
||||||
|
s.m.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// --------------------------------------------------------
|
// --------------------------------------------------------
|
||||||
@ -280,6 +348,28 @@ func (s *Store) WriteJSON(w io.Writer) {
|
|||||||
WriteTimerJSON(w, s.TimerBatchMerge)
|
WriteTimerJSON(w, s.TimerBatchMerge)
|
||||||
w.Write([]byte(`,"TimerBatchExecute":`))
|
w.Write([]byte(`,"TimerBatchExecute":`))
|
||||||
WriteTimerJSON(w, s.TimerBatchExecute)
|
WriteTimerJSON(w, s.TimerBatchExecute)
|
||||||
|
|
||||||
|
w.Write([]byte(`,"Errors":[`))
|
||||||
|
s.m.Lock()
|
||||||
|
e := s.errors.Front()
|
||||||
|
i := 0
|
||||||
|
for e != nil {
|
||||||
|
se, ok := e.Value.(*StoreError)
|
||||||
|
if ok && se != nil {
|
||||||
|
if i > 0 {
|
||||||
|
w.Write([]byte(","))
|
||||||
|
}
|
||||||
|
buf, err := json.Marshal(se)
|
||||||
|
if err == nil {
|
||||||
|
w.Write(buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
e = e.Next()
|
||||||
|
i = i + 1
|
||||||
|
}
|
||||||
|
s.m.Unlock()
|
||||||
|
w.Write([]byte(`]`))
|
||||||
|
|
||||||
w.Write([]byte(`}`))
|
w.Write([]byte(`}`))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ package metrics
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@ -85,7 +86,6 @@ func TestReaderIsolation(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func CommonTestKVStore(t *testing.T, s store.KVStore) {
|
func CommonTestKVStore(t *testing.T, s store.KVStore) {
|
||||||
|
|
||||||
writer, err := s.Writer()
|
writer, err := s.Writer()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -261,5 +261,66 @@ func CommonTestReaderIsolation(t *testing.T, s store.KVStore) {
|
|||||||
if count != 1 {
|
if count != 1 {
|
||||||
t.Errorf("expected iterator to see 1, saw %d", count)
|
t.Errorf("expected iterator to see 1, saw %d", count)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestErrors(t *testing.T) {
|
||||||
|
s, err := StoreConstructor(map[string]interface{}{
|
||||||
|
"kvStoreName_actual": "gtreap",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
x, ok := s.(*Store)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("expecting a Store")
|
||||||
|
}
|
||||||
|
|
||||||
|
x.AddError("foo", fmt.Errorf("Foo"), []byte("fooKey"))
|
||||||
|
x.AddError("bar", fmt.Errorf("Bar"), nil)
|
||||||
|
x.AddError("baz", fmt.Errorf("Baz"), []byte("bazKey"))
|
||||||
|
|
||||||
|
b := bytes.NewBuffer(nil)
|
||||||
|
x.WriteJSON(b)
|
||||||
|
|
||||||
|
var m map[string]interface{}
|
||||||
|
err = json.Unmarshal(b.Bytes(), &m)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("expected unmarshallable writeJSON, err: %v, b: %s",
|
||||||
|
err, b.Bytes())
|
||||||
|
}
|
||||||
|
|
||||||
|
errorsi, ok := m["Errors"]
|
||||||
|
if !ok || errorsi == nil {
|
||||||
|
t.Errorf("expected errorsi")
|
||||||
|
}
|
||||||
|
errors, ok := errorsi.([]interface{})
|
||||||
|
if !ok || errors == nil {
|
||||||
|
t.Errorf("expected errorsi is array")
|
||||||
|
}
|
||||||
|
if len(errors) != 3 {
|
||||||
|
t.Errorf("expected errors len 3")
|
||||||
|
}
|
||||||
|
|
||||||
|
e := errors[0].(map[string]interface{})
|
||||||
|
if e["Op"].(string) != "foo" ||
|
||||||
|
e["Err"].(string) != "Foo" ||
|
||||||
|
len(e["Time"].(string)) < 10 ||
|
||||||
|
e["Key"].(string) != "fooKey" {
|
||||||
|
t.Errorf("expected foo, %#v", e)
|
||||||
|
}
|
||||||
|
e = errors[1].(map[string]interface{})
|
||||||
|
if e["Op"].(string) != "bar" ||
|
||||||
|
e["Err"].(string) != "Bar" ||
|
||||||
|
len(e["Time"].(string)) < 10 ||
|
||||||
|
e["Key"].(string) != "" {
|
||||||
|
t.Errorf("expected bar, %#v", e)
|
||||||
|
}
|
||||||
|
e = errors[2].(map[string]interface{})
|
||||||
|
if e["Op"].(string) != "baz" ||
|
||||||
|
e["Err"].(string) != "Baz" ||
|
||||||
|
len(e["Time"].(string)) < 10 ||
|
||||||
|
e["Key"].(string) != "bazKey" {
|
||||||
|
t.Errorf("expected baz, %#v", e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user