add support for gathering stats via map for easier consumption
This commit is contained in:
parent
e00577f265
commit
d7292ed891
1
index.go
1
index.go
|
@ -199,6 +199,7 @@ type Index interface {
|
|||
Mapping() *IndexMapping
|
||||
|
||||
Stats() *IndexStat
|
||||
StatsMap() map[string]interface{}
|
||||
|
||||
GetInternal(key []byte) ([]byte, error)
|
||||
SetInternal(key, val []byte) error
|
||||
|
|
|
@ -75,10 +75,6 @@ func (f *Firestorm) Open() (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
if ss, ok := f.store.(store.KVStoreStats); ok {
|
||||
f.stats.kvStats = ss.Stats()
|
||||
}
|
||||
|
||||
// start a reader
|
||||
var kvreader store.KVReader
|
||||
kvreader, err = f.store.Reader()
|
||||
|
@ -548,7 +544,10 @@ func (f *Firestorm) Reader() (index.IndexReader, error) {
|
|||
|
||||
func (f *Firestorm) Stats() json.Marshaler {
|
||||
return f.stats
|
||||
}
|
||||
|
||||
func (f *Firestorm) StatsMap() map[string]interface{} {
|
||||
return f.stats.statsMap()
|
||||
}
|
||||
|
||||
func (f *Firestorm) Wait(timeout time.Duration) error {
|
||||
|
|
|
@ -12,6 +12,8 @@ package firestorm
|
|||
import (
|
||||
"encoding/json"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/blevesearch/bleve/index/store"
|
||||
)
|
||||
|
||||
type indexStat struct {
|
||||
|
@ -21,10 +23,9 @@ type indexStat struct {
|
|||
termSearchersStarted uint64
|
||||
termSearchersFinished uint64
|
||||
numPlainTextBytesIndexed uint64
|
||||
kvStats json.Marshaler
|
||||
}
|
||||
|
||||
func (i *indexStat) MarshalJSON() ([]byte, error) {
|
||||
func (i *indexStat) statsMap() map[string]interface{} {
|
||||
m := map[string]interface{}{}
|
||||
m["updates"] = atomic.LoadUint64(&i.updates)
|
||||
m["deletes"] = atomic.LoadUint64(&i.deletes)
|
||||
|
@ -36,8 +37,15 @@ func (i *indexStat) MarshalJSON() ([]byte, error) {
|
|||
m["term_searchers_started"] = atomic.LoadUint64(&i.termSearchersStarted)
|
||||
m["term_searchers_finished"] = atomic.LoadUint64(&i.termSearchersFinished)
|
||||
m["num_plain_text_bytes_indexed"] = atomic.LoadUint64(&i.numPlainTextBytesIndexed)
|
||||
if i.kvStats != nil {
|
||||
m["kv"] = i.kvStats
|
||||
|
||||
if o, ok := i.f.store.(store.KVStoreStats); ok {
|
||||
m["kv"] = o.StatsMap()
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
func (i *indexStat) MarshalJSON() ([]byte, error) {
|
||||
m := i.statsMap()
|
||||
return json.Marshal(m)
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ type Index interface {
|
|||
Reader() (IndexReader, error)
|
||||
|
||||
Stats() json.Marshaler
|
||||
StatsMap() map[string]interface{}
|
||||
|
||||
Analyze(d *document.Document) *AnalysisResult
|
||||
|
||||
|
|
|
@ -162,4 +162,6 @@ type KVBatch interface {
|
|||
type KVStoreStats interface {
|
||||
// Stats returns a JSON serializable object representing stats for this KVStore
|
||||
Stats() json.Marshaler
|
||||
|
||||
StatsMap() map[string]interface{}
|
||||
}
|
||||
|
|
|
@ -9,18 +9,22 @@
|
|||
|
||||
package metrics
|
||||
|
||||
import "encoding/json"
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/blevesearch/bleve/index/store"
|
||||
)
|
||||
|
||||
type stats struct {
|
||||
s *Store
|
||||
ostats json.Marshaler
|
||||
s *Store
|
||||
}
|
||||
|
||||
func (s *stats) MarshalJSON() ([]byte, error) {
|
||||
func (s *stats) statsMap() map[string]interface{} {
|
||||
ms := map[string]interface{}{}
|
||||
|
||||
ms["metrics"] = map[string]interface{}{
|
||||
"reader_get": TimerMap(s.s.TimerReaderGet),
|
||||
"reader_multi_get": TimerMap(s.s.TimerReaderMultiGet),
|
||||
"reader_prefix_iterator": TimerMap(s.s.TimerReaderPrefixIterator),
|
||||
"reader_range_iterator": TimerMap(s.s.TimerReaderRangeIterator),
|
||||
"writer_execute_batch": TimerMap(s.s.TimerWriterExecuteBatch),
|
||||
|
@ -29,9 +33,14 @@ func (s *stats) MarshalJSON() ([]byte, error) {
|
|||
"batch_merge": TimerMap(s.s.TimerBatchMerge),
|
||||
}
|
||||
|
||||
if s.ostats != nil {
|
||||
ms["kv"] = s.ostats
|
||||
if o, ok := s.s.o.(store.KVStoreStats); ok {
|
||||
ms["kv"] = o.StatsMap()
|
||||
}
|
||||
|
||||
return json.Marshal(ms)
|
||||
return ms
|
||||
}
|
||||
|
||||
func (s *stats) MarshalJSON() ([]byte, error) {
|
||||
m := s.statsMap()
|
||||
return json.Marshal(m)
|
||||
}
|
||||
|
|
|
@ -43,6 +43,8 @@ type Store struct {
|
|||
|
||||
m sync.Mutex // Protects the fields that follow.
|
||||
errors *list.List // Capped list of StoreError's.
|
||||
|
||||
s *stats
|
||||
}
|
||||
|
||||
func New(mo store.MergeOperator, config map[string]interface{}) (store.KVStore, error) {
|
||||
|
@ -68,7 +70,7 @@ func New(mo store.MergeOperator, config map[string]interface{}) (store.KVStore,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return &Store{
|
||||
rv := &Store{
|
||||
o: kvs,
|
||||
|
||||
TimerReaderGet: metrics.NewTimer(),
|
||||
|
@ -81,7 +83,11 @@ func New(mo store.MergeOperator, config map[string]interface{}) (store.KVStore,
|
|||
TimerBatchMerge: metrics.NewTimer(),
|
||||
|
||||
errors: list.New(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
rv.s = &stats{s: rv}
|
||||
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@ -256,11 +262,9 @@ func (s *Store) WriteCSV(w io.Writer) {
|
|||
}
|
||||
|
||||
func (s *Store) Stats() json.Marshaler {
|
||||
rv := stats{
|
||||
s: s,
|
||||
}
|
||||
if o, ok := s.o.(store.KVStoreStats); ok {
|
||||
rv.ostats = o.Stats()
|
||||
}
|
||||
return &rv
|
||||
return s.s
|
||||
}
|
||||
|
||||
func (s *Store) StatsMap() map[string]interface{} {
|
||||
return s.s.statsMap()
|
||||
}
|
||||
|
|
|
@ -9,24 +9,35 @@
|
|||
|
||||
package moss
|
||||
|
||||
import "encoding/json"
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/blevesearch/bleve/index/store"
|
||||
)
|
||||
|
||||
type stats struct {
|
||||
s *Store
|
||||
llstats json.Marshaler
|
||||
s *Store
|
||||
}
|
||||
|
||||
func (s *stats) MarshalJSON() ([]byte, error) {
|
||||
func (s *stats) statsMap() map[string]interface{} {
|
||||
ms := map[string]interface{}{}
|
||||
|
||||
var err error
|
||||
ms["moss"], err = s.s.ms.Stats()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return ms
|
||||
}
|
||||
|
||||
if s.llstats != nil {
|
||||
ms["kv"] = s.llstats
|
||||
if s.s.llstore != nil {
|
||||
if o, ok := s.s.llstore.(store.KVStoreStats); ok {
|
||||
ms["kv"] = o.StatsMap()
|
||||
}
|
||||
}
|
||||
return json.Marshal(ms)
|
||||
|
||||
return ms
|
||||
}
|
||||
|
||||
func (s *stats) MarshalJSON() ([]byte, error) {
|
||||
m := s.statsMap()
|
||||
return json.Marshal(m)
|
||||
}
|
||||
|
|
|
@ -32,6 +32,8 @@ type Store struct {
|
|||
ms moss.Collection
|
||||
mo store.MergeOperator
|
||||
llstore store.KVStore
|
||||
|
||||
s *stats
|
||||
}
|
||||
|
||||
func New(mo store.MergeOperator, config map[string]interface{}) (
|
||||
|
@ -164,6 +166,7 @@ func NewEx(mo store.MergeOperator, config map[string]interface{},
|
|||
mo: mo,
|
||||
llstore: llStore,
|
||||
}
|
||||
rv.s = &stats{s: &rv}
|
||||
return &rv, nil
|
||||
}
|
||||
|
||||
|
@ -191,13 +194,11 @@ func (s *Store) Logf(fmt string, args ...interface{}) {
|
|||
}
|
||||
|
||||
func (s *Store) Stats() json.Marshaler {
|
||||
rv := stats{
|
||||
s: s,
|
||||
}
|
||||
if llstore, ok := s.llstore.(store.KVStoreStats); ok {
|
||||
rv.llstats = llstore.Stats()
|
||||
}
|
||||
return &rv
|
||||
return s.s
|
||||
}
|
||||
|
||||
func (s *Store) StatsMap() map[string]interface{} {
|
||||
return s.s.statsMap()
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -12,18 +12,20 @@ package upside_down
|
|||
import (
|
||||
"encoding/json"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/blevesearch/bleve/index/store"
|
||||
)
|
||||
|
||||
type indexStat struct {
|
||||
i *UpsideDownCouch
|
||||
updates, deletes, batches, errors uint64
|
||||
analysisTime, indexTime uint64
|
||||
termSearchersStarted uint64
|
||||
termSearchersFinished uint64
|
||||
numPlainTextBytesIndexed uint64
|
||||
kvStats json.Marshaler
|
||||
}
|
||||
|
||||
func (i *indexStat) MarshalJSON() ([]byte, error) {
|
||||
func (i *indexStat) statsMap() map[string]interface{} {
|
||||
m := map[string]interface{}{}
|
||||
m["updates"] = atomic.LoadUint64(&i.updates)
|
||||
m["deletes"] = atomic.LoadUint64(&i.deletes)
|
||||
|
@ -34,8 +36,15 @@ func (i *indexStat) MarshalJSON() ([]byte, error) {
|
|||
m["term_searchers_started"] = atomic.LoadUint64(&i.termSearchersStarted)
|
||||
m["term_searchers_finished"] = atomic.LoadUint64(&i.termSearchersFinished)
|
||||
m["num_plain_text_bytes_indexed"] = atomic.LoadUint64(&i.numPlainTextBytesIndexed)
|
||||
if i.kvStats != nil {
|
||||
m["kv"] = i.kvStats
|
||||
|
||||
if o, ok := i.i.store.(store.KVStoreStats); ok {
|
||||
m["kv"] = o.StatsMap()
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
func (i *indexStat) MarshalJSON() ([]byte, error) {
|
||||
m := i.statsMap()
|
||||
return json.Marshal(m)
|
||||
}
|
||||
|
|
|
@ -68,14 +68,15 @@ type docBackIndexRow struct {
|
|||
}
|
||||
|
||||
func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
|
||||
return &UpsideDownCouch{
|
||||
rv := &UpsideDownCouch{
|
||||
version: Version,
|
||||
fieldCache: index.NewFieldCache(),
|
||||
storeName: storeName,
|
||||
storeConfig: storeConfig,
|
||||
analysisQueue: analysisQueue,
|
||||
stats: &indexStat{},
|
||||
}, nil
|
||||
}
|
||||
rv.stats = &indexStat{i: rv}
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
func (udc *UpsideDownCouch) init(kvwriter store.KVWriter) (err error) {
|
||||
|
@ -310,10 +311,6 @@ func (udc *UpsideDownCouch) Open() (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
if ss, ok := udc.store.(store.KVStoreStats); ok {
|
||||
udc.stats.kvStats = ss.Stats()
|
||||
}
|
||||
|
||||
// start a reader to look at the index
|
||||
var kvreader store.KVReader
|
||||
kvreader, err = udc.store.Reader()
|
||||
|
@ -1033,6 +1030,10 @@ func (udc *UpsideDownCouch) Stats() json.Marshaler {
|
|||
return udc.stats
|
||||
}
|
||||
|
||||
func (udc *UpsideDownCouch) StatsMap() map[string]interface{} {
|
||||
return udc.stats.statsMap()
|
||||
}
|
||||
|
||||
func (udc *UpsideDownCouch) Advanced() (store.KVStore, error) {
|
||||
return udc.store, nil
|
||||
}
|
||||
|
|
|
@ -339,6 +339,22 @@ func (i *indexAliasImpl) Stats() *IndexStat {
|
|||
return i.indexes[0].Stats()
|
||||
}
|
||||
|
||||
func (i *indexAliasImpl) StatsMap() map[string]interface{} {
|
||||
i.mutex.RLock()
|
||||
defer i.mutex.RUnlock()
|
||||
|
||||
if !i.open {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := i.isAliasToSingleIndex()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return i.indexes[0].StatsMap()
|
||||
}
|
||||
|
||||
func (i *indexAliasImpl) GetInternal(key []byte) ([]byte, error) {
|
||||
i.mutex.RLock()
|
||||
defer i.mutex.RUnlock()
|
||||
|
|
|
@ -1239,6 +1239,10 @@ func (i *stubIndex) Stats() *IndexStat {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (i *stubIndex) StatsMap() map[string]interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *stubIndex) GetInternal(key []byte) ([]byte, error) {
|
||||
return nil, i.err
|
||||
}
|
||||
|
|
|
@ -73,7 +73,6 @@ func newMemIndex(indexType string, mapping *IndexMapping) (*indexImpl, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rv.stats.indexStat = rv.i.Stats()
|
||||
|
||||
// now persist the mapping
|
||||
mappingBytes, err := json.Marshal(mapping)
|
||||
|
@ -109,12 +108,12 @@ func newIndexUsing(path string, mapping *IndexMapping, indexType string, kvstore
|
|||
}
|
||||
|
||||
rv := indexImpl{
|
||||
path: path,
|
||||
name: path,
|
||||
m: mapping,
|
||||
meta: newIndexMeta(indexType, kvstore, kvconfig),
|
||||
stats: &IndexStat{},
|
||||
path: path,
|
||||
name: path,
|
||||
m: mapping,
|
||||
meta: newIndexMeta(indexType, kvstore, kvconfig),
|
||||
}
|
||||
rv.stats = &IndexStat{i: &rv}
|
||||
// at this point there is hope that we can be successful, so save index meta
|
||||
err = rv.meta.Save(path)
|
||||
if err != nil {
|
||||
|
@ -141,7 +140,6 @@ func newIndexUsing(path string, mapping *IndexMapping, indexType string, kvstore
|
|||
}
|
||||
return nil, err
|
||||
}
|
||||
rv.stats.indexStat = rv.i.Stats()
|
||||
|
||||
// now persist the mapping
|
||||
mappingBytes, err := json.Marshal(mapping)
|
||||
|
@ -163,10 +161,10 @@ func newIndexUsing(path string, mapping *IndexMapping, indexType string, kvstore
|
|||
|
||||
func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *indexImpl, err error) {
|
||||
rv = &indexImpl{
|
||||
path: path,
|
||||
name: path,
|
||||
stats: &IndexStat{},
|
||||
path: path,
|
||||
name: path,
|
||||
}
|
||||
rv.stats = &IndexStat{i: rv}
|
||||
|
||||
rv.meta, err = openIndexMeta(path)
|
||||
if err != nil {
|
||||
|
@ -207,7 +205,6 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde
|
|||
}
|
||||
return nil, err
|
||||
}
|
||||
rv.stats.indexStat = rv.i.Stats()
|
||||
|
||||
// now load the mapping
|
||||
indexReader, err := rv.i.Reader()
|
||||
|
@ -713,6 +710,10 @@ func (i *indexImpl) Stats() *IndexStat {
|
|||
return i.stats
|
||||
}
|
||||
|
||||
func (i *indexImpl) StatsMap() map[string]interface{} {
|
||||
return i.stats.statsMap()
|
||||
}
|
||||
|
||||
func (i *indexImpl) GetInternal(key []byte) (val []byte, err error) {
|
||||
i.mutex.RLock()
|
||||
defer i.mutex.RUnlock()
|
||||
|
|
|
@ -16,16 +16,21 @@ import (
|
|||
)
|
||||
|
||||
type IndexStat struct {
|
||||
indexStat json.Marshaler
|
||||
i *indexImpl
|
||||
searches uint64
|
||||
searchTime uint64
|
||||
}
|
||||
|
||||
func (is *IndexStat) MarshalJSON() ([]byte, error) {
|
||||
func (is *IndexStat) statsMap() map[string]interface{} {
|
||||
m := map[string]interface{}{}
|
||||
m["index"] = is.indexStat
|
||||
m["index"] = is.i.i.StatsMap()
|
||||
m["searches"] = atomic.LoadUint64(&is.searches)
|
||||
m["search_time"] = atomic.LoadUint64(&is.searchTime)
|
||||
return m
|
||||
}
|
||||
|
||||
func (is *IndexStat) MarshalJSON() ([]byte, error) {
|
||||
m := is.statsMap()
|
||||
return json.Marshal(m)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue