0
0
Fork 0

added support for tracking index stats and exposing via expvar

closes #83
This commit is contained in:
Marty Schoch 2014-10-02 11:12:22 -07:00
parent 97902e2619
commit 64b0066121
7 changed files with 149 additions and 11 deletions

View File

@ -17,15 +17,17 @@ import (
var indexNameMapping map[string]bleve.Index
var indexNameMappingLock sync.RWMutex
var indexStats = bleve.IndexStats{}
func RegisterIndexName(name string, index bleve.Index) {
func RegisterIndexName(name string, idx bleve.Index) {
indexNameMappingLock.Lock()
defer indexNameMappingLock.Unlock()
if indexNameMapping == nil {
indexNameMapping = make(map[string]bleve.Index)
}
indexNameMapping[name] = index
indexNameMapping[name] = idx
indexStats[name] = idx.Stats()
}
func UnregisterIndexByName(name string) bleve.Index {
@ -39,6 +41,7 @@ func UnregisterIndexByName(name string) bleve.Index {
if rv != nil {
delete(indexNameMapping, name)
}
delete(indexStats, name)
return rv
}
@ -61,3 +64,7 @@ func IndexNames() []string {
}
return rv
}
func IndexStats() bleve.IndexStats {
return indexStats
}

View File

@ -60,6 +60,8 @@ type Index interface {
Close()
Mapping() *IndexMapping
Stats() *IndexStat
}
// A Classifier is an interface describing any object

View File

@ -10,6 +10,8 @@
package index
import (
"encoding/json"
"github.com/blevesearch/bleve/document"
)
@ -31,6 +33,8 @@ type Index interface {
DumpFields() chan interface{}
Reader() IndexReader
Stats() json.Marshaler
}
type IndexReader interface {

View File

@ -0,0 +1,31 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package upside_down
import (
"encoding/json"
"sync/atomic"
)
type indexStat struct {
updates, deletes, batches, errors uint64
analysisTime, indexTime uint64
}
func (i *indexStat) MarshalJSON() ([]byte, error) {
m := map[string]interface{}{}
m["updates"] = atomic.LoadUint64(&i.updates)
m["deletes"] = atomic.LoadUint64(&i.deletes)
m["batches"] = atomic.LoadUint64(&i.batches)
m["errors"] = atomic.LoadUint64(&i.errors)
m["analysis_time"] = atomic.LoadUint64(&i.analysisTime)
m["index_time"] = atomic.LoadUint64(&i.indexTime)
return json.Marshal(m)
}

View File

@ -11,7 +11,10 @@ package upside_down
import (
"bytes"
"encoding/json"
"math"
"sync/atomic"
"time"
"github.com/blevesearch/bleve/analysis"
"github.com/blevesearch/bleve/document"
@ -32,6 +35,7 @@ type UpsideDownCouch struct {
fieldIndexCache *FieldIndexCache
docCount uint64
analysisQueue AnalysisQueue
stats *indexStat
}
func NewUpsideDownCouch(s store.KVStore, analysisQueue AnalysisQueue) *UpsideDownCouch {
@ -40,6 +44,7 @@ func NewUpsideDownCouch(s store.KVStore, analysisQueue AnalysisQueue) *UpsideDow
fieldIndexCache: NewFieldIndexCache(),
store: s,
analysisQueue: analysisQueue,
stats: &indexStat{},
}
}
@ -193,6 +198,7 @@ func (udc *UpsideDownCouch) Close() {
func (udc *UpsideDownCouch) Update(doc *document.Document) error {
// do analysis before acquiring write lock
analysisStart := time.Now()
resultChan := make(chan *AnalysisResult)
aw := AnalysisWork{
udc: udc,
@ -207,8 +213,10 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error {
// wait for the result
result := <-resultChan
close(resultChan)
atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart)))
// start a writer for this update
indexStart := time.Now()
kvwriter := udc.store.Writer()
defer kvwriter.Close()
@ -216,6 +224,7 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error {
// lookup the back index row
backIndexRow, err := udc.backIndexRowForDoc(kvwriter, doc.ID)
if err != nil {
atomic.AddUint64(&udc.stats.errors, 1)
return err
}
@ -230,6 +239,12 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) error {
if err == nil && backIndexRow == nil {
udc.docCount++
}
atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
if err == nil {
atomic.AddUint64(&udc.stats.updates, 1)
} else {
atomic.AddUint64(&udc.stats.errors, 1)
}
return err
}
@ -343,6 +358,7 @@ func (udc *UpsideDownCouch) indexField(docID string, field document.Field, field
}
func (udc *UpsideDownCouch) Delete(id string) error {
indexStart := time.Now()
// start a writer for this delete
kvwriter := udc.store.Writer()
defer kvwriter.Close()
@ -350,9 +366,11 @@ func (udc *UpsideDownCouch) Delete(id string) error {
// lookup the back index row
backIndexRow, err := udc.backIndexRowForDoc(kvwriter, id)
if err != nil {
atomic.AddUint64(&udc.stats.errors, 1)
return err
}
if backIndexRow == nil {
atomic.AddUint64(&udc.stats.deletes, 1)
return nil
}
@ -363,6 +381,12 @@ func (udc *UpsideDownCouch) Delete(id string) error {
if err == nil {
udc.docCount--
}
atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
if err == nil {
atomic.AddUint64(&udc.stats.deletes, 1)
} else {
atomic.AddUint64(&udc.stats.errors, 1)
}
return err
}
@ -475,9 +499,10 @@ func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) []
}
func (udc *UpsideDownCouch) Batch(batch index.Batch) error {
analysisStart := time.Now()
resultChan := make(chan *AnalysisResult)
numUpdates := 0
var numUpdates uint64
for _, doc := range batch {
if doc != nil {
numUpdates++
@ -500,7 +525,7 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error {
newRowsMap := make(map[string][]UpsideDownCouchRow)
// wait for the result
itemsDeQueued := 0
var itemsDeQueued uint64
for itemsDeQueued < numUpdates {
result := <-resultChan
newRowsMap[result.docID] = result.rows
@ -508,6 +533,9 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error {
}
close(resultChan)
atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart)))
indexStart := time.Now()
// start a writer for this batch
kvwriter := udc.store.Writer()
defer kvwriter.Close()
@ -540,9 +568,15 @@ func (udc *UpsideDownCouch) Batch(batch index.Batch) error {
}
err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows)
atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
if err == nil {
udc.docCount += docsAdded
udc.docCount -= docsDeleted
atomic.AddUint64(&udc.stats.updates, numUpdates)
atomic.AddUint64(&udc.stats.deletes, docsDeleted)
atomic.AddUint64(&udc.stats.batches, 1)
} else {
atomic.AddUint64(&udc.stats.errors, 1)
}
return err
}
@ -568,3 +602,7 @@ func (udc *UpsideDownCouch) Reader() index.IndexReader {
docCount: udc.docCount,
}
}
func (udc *UpsideDownCouch) Stats() json.Marshaler {
return udc.stats
}

View File

@ -14,6 +14,7 @@ import (
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"github.com/blevesearch/bleve/document"
@ -34,6 +35,7 @@ type indexImpl struct {
m *IndexMapping
mutex sync.RWMutex
open bool
stats *IndexStat
}
const storePath = "store"
@ -46,9 +48,10 @@ func indexStorePath(path string) string {
func newMemIndex(mapping *IndexMapping) (*indexImpl, error) {
rv := indexImpl{
path: "",
m: mapping,
meta: newIndexMeta("mem"),
path: "",
m: mapping,
meta: newIndexMeta("mem"),
stats: &IndexStat{},
}
storeConstructor := registry.KVStoreConstructorByName(rv.meta.Storage)
@ -68,6 +71,7 @@ func newMemIndex(mapping *IndexMapping) (*indexImpl, error) {
if err != nil {
return nil, err
}
rv.stats.indexStat = rv.i.Stats()
// now persist the mapping
mappingBytes, err := json.Marshal(mapping)
@ -98,9 +102,10 @@ func newIndex(path string, mapping *IndexMapping) (*indexImpl, error) {
}
rv := indexImpl{
path: path,
m: mapping,
meta: newIndexMeta(Config.DefaultKVStore),
path: path,
m: mapping,
meta: newIndexMeta(Config.DefaultKVStore),
stats: &IndexStat{},
}
storeConstructor := registry.KVStoreConstructorByName(rv.meta.Storage)
if storeConstructor == nil {
@ -129,6 +134,7 @@ func newIndex(path string, mapping *IndexMapping) (*indexImpl, error) {
if err != nil {
return nil, err
}
rv.stats.indexStat = rv.i.Stats()
// now persist the mapping
mappingBytes, err := json.Marshal(mapping)
@ -150,7 +156,8 @@ func newIndex(path string, mapping *IndexMapping) (*indexImpl, error) {
func openIndex(path string) (*indexImpl, error) {
rv := indexImpl{
path: path,
path: path,
stats: &IndexStat{},
}
var err error
rv.meta, err = openIndexMeta(path)
@ -181,6 +188,7 @@ func openIndex(path string) (*indexImpl, error) {
if err != nil {
return nil, err
}
rv.stats.indexStat = rv.i.Stats()
// now load the mapping
indexReader := rv.i.Reader()
@ -322,6 +330,8 @@ func (i *indexImpl) Search(req *SearchRequest) (*SearchResult, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
searchStart := time.Now()
if !i.open {
return nil, ErrorIndexClosed
}
@ -442,6 +452,9 @@ func (i *indexImpl) Search(req *SearchRequest) (*SearchResult, error) {
}
}
atomic.AddUint64(&i.stats.searches, 1)
atomic.AddUint64(&i.stats.searchTime, uint64(time.Since(searchStart)))
return &SearchResult{
Request: req,
Hits: hits,
@ -516,3 +529,7 @@ func (i *indexImpl) Close() {
i.open = false
i.i.Close()
}
func (i *indexImpl) Stats() *IndexStat {
return i.stats
}

39
index_stats.go Normal file
View File

@ -0,0 +1,39 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package bleve
import (
"encoding/json"
"sync/atomic"
)
type IndexStat struct {
indexStat json.Marshaler
searches uint64
searchTime uint64
}
func (is *IndexStat) MarshalJSON() ([]byte, error) {
m := map[string]interface{}{}
m["index"] = is.indexStat
m["searches"] = atomic.LoadUint64(&is.searches)
m["search_time"] = atomic.LoadUint64(&is.searchTime)
return json.Marshal(m)
}
type IndexStats map[string]*IndexStat
func (i IndexStats) String() string {
bytes, err := json.Marshal(i)
if err != nil {
return "error marshaling stats"
}
return string(bytes)
}