From 64b0066121396fef64298b4e766eff0739479b99 Mon Sep 17 00:00:00 2001 From: Marty Schoch Date: Thu, 2 Oct 2014 11:12:22 -0700 Subject: [PATCH] added support for tracking index stats and exposing via expvar closes #83 --- http/registry.go | 11 +++++++-- index.go | 2 ++ index/index.go | 4 +++ index/upside_down/stats.go | 31 +++++++++++++++++++++++ index/upside_down/upside_down.go | 42 ++++++++++++++++++++++++++++++-- index_impl.go | 31 +++++++++++++++++------ index_stats.go | 39 +++++++++++++++++++++++++++++ 7 files changed, 149 insertions(+), 11 deletions(-) create mode 100644 index/upside_down/stats.go create mode 100644 index_stats.go diff --git a/http/registry.go b/http/registry.go index 2ed97620..1206b0fd 100644 --- a/http/registry.go +++ b/http/registry.go @@ -17,15 +17,17 @@ import ( var indexNameMapping map[string]bleve.Index var indexNameMappingLock sync.RWMutex +var indexStats = bleve.IndexStats{} -func RegisterIndexName(name string, index bleve.Index) { +func RegisterIndexName(name string, idx bleve.Index) { indexNameMappingLock.Lock() defer indexNameMappingLock.Unlock() if indexNameMapping == nil { indexNameMapping = make(map[string]bleve.Index) } - indexNameMapping[name] = index + indexNameMapping[name] = idx + indexStats[name] = idx.Stats() } func UnregisterIndexByName(name string) bleve.Index { @@ -39,6 +41,7 @@ func UnregisterIndexByName(name string) bleve.Index { if rv != nil { delete(indexNameMapping, name) } + delete(indexStats, name) return rv } @@ -61,3 +64,7 @@ func IndexNames() []string { } return rv } + +func IndexStats() bleve.IndexStats { + return indexStats +} diff --git a/index.go b/index.go index 9a118fe0..8cc7322c 100644 --- a/index.go +++ b/index.go @@ -60,6 +60,8 @@ type Index interface { Close() Mapping() *IndexMapping + + Stats() *IndexStat } // A Classifier is an interface describing any object diff --git a/index/index.go b/index/index.go index fad73d5f..f5a6fed7 100644 --- a/index/index.go +++ b/index/index.go @@ -10,6 +10,8 @@ package index import ( + "encoding/json" + "github.com/blevesearch/bleve/document" ) @@ -31,6 +33,8 @@ type Index interface { DumpFields() chan interface{} Reader() IndexReader + + Stats() json.Marshaler } type IndexReader interface { diff --git a/index/upside_down/stats.go b/index/upside_down/stats.go new file mode 100644 index 00000000..e447b28b --- /dev/null +++ b/index/upside_down/stats.go @@ -0,0 +1,31 @@ +// Copyright (c) 2014 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed under the +// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +// either express or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package upside_down + +import ( + "encoding/json" + "sync/atomic" +) + +type indexStat struct { + updates, deletes, batches, errors uint64 + analysisTime, indexTime uint64 +} + +func (i *indexStat) MarshalJSON() ([]byte, error) { + m := map[string]interface{}{} + m["updates"] = atomic.LoadUint64(&i.updates) + m["deletes"] = atomic.LoadUint64(&i.deletes) + m["batches"] = atomic.LoadUint64(&i.batches) + m["errors"] = atomic.LoadUint64(&i.errors) + m["analysis_time"] = atomic.LoadUint64(&i.analysisTime) + m["index_time"] = atomic.LoadUint64(&i.indexTime) + return json.Marshal(m) +} diff --git a/index/upside_down/upside_down.go b/index/upside_down/upside_down.go index 469cdb1d..3cffd71f 100644 --- a/index/upside_down/upside_down.go +++ b/index/upside_down/upside_down.go @@ -11,7 +11,10 @@ package upside_down import ( "bytes" + "encoding/json" "math" + "sync/atomic" + "time" "github.com/blevesearch/bleve/analysis" "github.com/blevesearch/bleve/document" @@ -32,6 +35,7 @@ type UpsideDownCouch struct { fieldIndexCache *FieldIndexCache docCount uint64 analysisQueue AnalysisQueue + stats *indexStat } func NewUpsideDownCouch(s store.KVStore, analysisQueue AnalysisQueue) *UpsideDownCouch { @@ -40,6 +44,7 @@ func NewUpsideDownCouch(s store.KVStore, analysisQueue AnalysisQueue) *UpsideDow fieldIndexCache: NewFieldIndexCache(), store: s, analysisQueue: analysisQueue, + stats: &indexStat{}, } } @@ -193,6 +198,7 @@ func (udc *UpsideDownCouch) Close() { func (udc *UpsideDownCouch) Update(doc *document.Document) error { // do analysis before acquiring write lock + analysisStart := time.Now() resultChan := make(chan *AnalysisResult) aw := AnalysisWork{ udc: udc, @@ -207,8 +213,10 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error { // wait for the result result := <-resultChan close(resultChan) + atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart))) // start a writer for this update + indexStart := time.Now() kvwriter := udc.store.Writer() defer kvwriter.Close() @@ -216,6 +224,7 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error { // lookup the back index row backIndexRow, err := udc.backIndexRowForDoc(kvwriter, doc.ID) if err != nil { + atomic.AddUint64(&udc.stats.errors, 1) return err } @@ -230,6 +239,12 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error { if err == nil && backIndexRow == nil { udc.docCount++ } + atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart))) + if err == nil { + atomic.AddUint64(&udc.stats.updates, 1) + } else { + atomic.AddUint64(&udc.stats.errors, 1) + } return err } @@ -343,6 +358,7 @@ func (udc *UpsideDownCouch) indexField(docID string, field document.Field, field } func (udc *UpsideDownCouch) Delete(id string) error { + indexStart := time.Now() // start a writer for this delete kvwriter := udc.store.Writer() defer kvwriter.Close() @@ -350,9 +366,11 @@ func (udc *UpsideDownCouch) Delete(id string) error { // lookup the back index row backIndexRow, err := udc.backIndexRowForDoc(kvwriter, id) if err != nil { + atomic.AddUint64(&udc.stats.errors, 1) return err } if backIndexRow == nil { + atomic.AddUint64(&udc.stats.deletes, 1) return nil } @@ -363,6 +381,12 @@ func (udc *UpsideDownCouch) Delete(id string) error { if err == nil { udc.docCount-- } + atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart))) + if err == nil { + atomic.AddUint64(&udc.stats.deletes, 1) + } else { + atomic.AddUint64(&udc.stats.errors, 1) + } return err } @@ -475,9 +499,10 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) [] } func (udc *UpsideDownCouch) Batch(batch index.Batch) error { + analysisStart := time.Now() resultChan := make(chan *AnalysisResult) - numUpdates := 0 + var numUpdates uint64 for _, doc := range batch { if doc != nil { numUpdates++ @@ -500,7 +525,7 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error { newRowsMap := make(map[string][]UpsideDownCouchRow) // wait for the result - itemsDeQueued := 0 + var itemsDeQueued uint64 for itemsDeQueued < numUpdates { result := <-resultChan newRowsMap[result.docID] = result.rows @@ -508,6 +533,9 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error { } close(resultChan) + atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart))) + + indexStart := time.Now() // start a writer for this batch kvwriter := udc.store.Writer() defer kvwriter.Close() @@ -540,9 +568,15 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error { } err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows) + atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart))) if err == nil { udc.docCount += docsAdded udc.docCount -= docsDeleted + atomic.AddUint64(&udc.stats.updates, numUpdates) + atomic.AddUint64(&udc.stats.deletes, docsDeleted) + atomic.AddUint64(&udc.stats.batches, 1) + } else { + atomic.AddUint64(&udc.stats.errors, 1) } return err } @@ -568,3 +602,7 @@ func (udc *UpsideDownCouch) Reader() index.IndexReader { docCount: udc.docCount, } } + +func (udc *UpsideDownCouch) Stats() json.Marshaler { + return udc.stats +} diff --git a/index_impl.go b/index_impl.go index e2166a70..2a38801d 100644 --- a/index_impl.go +++ b/index_impl.go @@ -14,6 +14,7 @@ import ( "fmt" "os" "sync" + "sync/atomic" "time" "github.com/blevesearch/bleve/document" @@ -34,6 +35,7 @@ type indexImpl struct { m *IndexMapping mutex sync.RWMutex open bool + stats *IndexStat } const storePath = "store" @@ -46,9 +48,10 @@ func indexStorePath(path string) string { func newMemIndex(mapping *IndexMapping) (*indexImpl, error) { rv := indexImpl{ - path: "", - m: mapping, - meta: newIndexMeta("mem"), + path: "", + m: mapping, + meta: newIndexMeta("mem"), + stats: &IndexStat{}, } storeConstructor := registry.KVStoreConstructorByName(rv.meta.Storage) @@ -68,6 +71,7 @@ func newMemIndex(mapping *IndexMapping) (*indexImpl, error) { if err != nil { return nil, err } + rv.stats.indexStat = rv.i.Stats() // now persist the mapping mappingBytes, err := json.Marshal(mapping) @@ -98,9 +102,10 @@ func newIndex(path string, mapping *IndexMapping) (*indexImpl, error) { } rv := indexImpl{ - path: path, - m: mapping, - meta: newIndexMeta(Config.DefaultKVStore), + path: path, + m: mapping, + meta: newIndexMeta(Config.DefaultKVStore), + stats: &IndexStat{}, } storeConstructor := registry.KVStoreConstructorByName(rv.meta.Storage) if storeConstructor == nil { @@ -129,6 +134,7 @@ func newIndex(path string, mapping *IndexMapping) (*indexImpl, error) { if err != nil { return nil, err } + rv.stats.indexStat = rv.i.Stats() // now persist the mapping mappingBytes, err := json.Marshal(mapping) @@ -150,7 +156,8 @@ func newIndex(path string, mapping *IndexMapping) (*indexImpl, error) { func openIndex(path string) (*indexImpl, error) { rv := indexImpl{ - path: path, + path: path, + stats: &IndexStat{}, } var err error rv.meta, err = openIndexMeta(path) @@ -181,6 +188,7 @@ func openIndex(path string) (*indexImpl, error) { if err != nil { return nil, err } + rv.stats.indexStat = rv.i.Stats() // now load the mapping indexReader := rv.i.Reader() @@ -322,6 +330,8 @@ func (i *indexImpl) Search(req *SearchRequest) (*SearchResult, error) { i.mutex.RLock() defer i.mutex.RUnlock() + searchStart := time.Now() + if !i.open { return nil, ErrorIndexClosed } @@ -442,6 +452,9 @@ func (i *indexImpl) Search(req *SearchRequest) (*SearchResult, error) { } } + atomic.AddUint64(&i.stats.searches, 1) + atomic.AddUint64(&i.stats.searchTime, uint64(time.Since(searchStart))) + return &SearchResult{ Request: req, Hits: hits, @@ -516,3 +529,7 @@ func (i *indexImpl) Close() { i.open = false i.i.Close() } + +func (i *indexImpl) Stats() *IndexStat { + return i.stats +} diff --git a/index_stats.go b/index_stats.go new file mode 100644 index 00000000..b29ec9ee --- /dev/null +++ b/index_stats.go @@ -0,0 +1,39 @@ +// Copyright (c) 2014 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed under the +// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +// either express or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package bleve + +import ( + "encoding/json" + "sync/atomic" +) + +type IndexStat struct { + indexStat json.Marshaler + searches uint64 + searchTime uint64 +} + +func (is *IndexStat) MarshalJSON() ([]byte, error) { + m := map[string]interface{}{} + m["index"] = is.indexStat + m["searches"] = atomic.LoadUint64(&is.searches) + m["search_time"] = atomic.LoadUint64(&is.searchTime) + return json.Marshal(m) +} + +type IndexStats map[string]*IndexStat + +func (i IndexStats) String() string { + bytes, err := json.Marshal(i) + if err != nil { + return "error marshaling stats" + } + return string(bytes) +}