0
0
Fork 0

Merge pull request #789 from steveyen/sreekanth-cb-scorch_stats

adding stats for scorch, with no gauges
This commit is contained in:
Steve Yen 2018-02-28 17:41:10 -08:00 committed by GitHub
commit 39f9cee910
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 217 additions and 69 deletions

View File

@ -48,6 +48,8 @@ func (s *Scorch) mainLoop() {
var epochWatchers []*epochWatcher
OUTER:
for {
atomic.AddUint64(&s.stats.TotIntroduceLoop, 1)
select {
case <-s.closeCh:
break OUTER
@ -92,6 +94,9 @@ OUTER:
}
func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
atomic.AddUint64(&s.stats.TotIntroduceSegmentBeg, 1)
defer atomic.AddUint64(&s.stats.TotIntroduceSegmentEnd, 1)
// acquire lock
s.rootLock.Lock()
@ -159,7 +164,8 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
// increment numItemsIntroduced which tracks the number of items
// queued for persistence.
atomic.AddUint64(&s.stats.numItemsIntroduced, newSegmentSnapshot.Count())
atomic.AddUint64(&s.stats.TotIntroducedItems, newSegmentSnapshot.Count())
atomic.AddUint64(&s.stats.TotIntroducedSegmentsBatch, 1)
}
// copy old values
for key, oldVal := range s.root.internal {
@ -192,6 +198,9 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
}
func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
atomic.AddUint64(&s.stats.TotIntroduceMergeBeg, 1)
defer atomic.AddUint64(&s.stats.TotIntroduceMergeEnd, 1)
// acquire lock
s.rootLock.Lock()
@ -270,6 +279,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
cachedDocs: &cachedDocs{cache: nil},
})
newSnapshot.offsets = append(newSnapshot.offsets, running)
atomic.AddUint64(&s.stats.TotIntroducedSegmentsMerge, 1)
}
newSnapshot.AddRef() // 1 ref for the nextMerge.notify response
@ -290,6 +300,9 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
}
func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error {
atomic.AddUint64(&s.stats.TotIntroduceRevertBeg, 1)
defer atomic.AddUint64(&s.stats.TotIntroduceRevertEnd, 1)
if revertTo.snapshot == nil {
err := fmt.Errorf("Cannot revert to a nil snapshot")
revertTo.applied <- err

View File

@ -40,6 +40,8 @@ func (s *Scorch) mergerLoop() {
OUTER:
for {
atomic.AddUint64(&s.stats.TotFileMergeLoopBeg, 1)
select {
case <-s.closeCh:
break OUTER
@ -59,6 +61,7 @@ OUTER:
if err != nil {
s.fireAsyncError(fmt.Errorf("merging err: %v", err))
_ = ourSnapshot.DecRef()
atomic.AddUint64(&s.stats.TotFileMergeLoopErr, 1)
continue OUTER
}
lastEpochMergePlanned = ourSnapshot.epoch
@ -88,7 +91,10 @@ OUTER:
case <-ew.notifyCh:
}
}
atomic.AddUint64(&s.stats.TotFileMergeLoopEnd, 1)
}
s.asyncTasks.Done()
}
@ -119,32 +125,45 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
}
}
atomic.AddUint64(&s.stats.TotFileMergePlan, 1)
// give this list to the planner
resultMergePlan, err := mergeplan.Plan(onlyZapSnapshots, options)
if err != nil {
atomic.AddUint64(&s.stats.TotFileMergePlanErr, 1)
return fmt.Errorf("merge planning err: %v", err)
}
if resultMergePlan == nil {
// nothing to do
atomic.AddUint64(&s.stats.TotFileMergePlanNone, 1)
return nil
}
atomic.AddUint64(&s.stats.TotFileMergePlanOk, 1)
atomic.AddUint64(&s.stats.TotFileMergePlanTasks, uint64(len(resultMergePlan.Tasks)))
// process tasks in serial for now
var notifications []chan *IndexSnapshot
for _, task := range resultMergePlan.Tasks {
if len(task.Segments) == 0 {
atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegmentsEmpty, 1)
continue
}
atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegments, uint64(len(task.Segments)))
oldMap := make(map[uint64]*SegmentSnapshot)
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
segmentsToMerge := make([]*zap.Segment, 0, len(task.Segments))
docsToDrop := make([]*roaring.Bitmap, 0, len(task.Segments))
for _, planSegment := range task.Segments {
if segSnapshot, ok := planSegment.(*SegmentSnapshot); ok {
oldMap[segSnapshot.id] = segSnapshot
if zapSeg, ok := segSnapshot.segment.(*zap.Segment); ok {
if segSnapshot.LiveSize() == 0 {
atomic.AddUint64(&s.stats.TotFileMergeSegmentsEmpty, 1)
oldMap[segSnapshot.id] = nil
} else {
segmentsToMerge = append(segmentsToMerge, zapSeg)
@ -160,20 +179,27 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
filename := zapFileName(newSegmentID)
s.markIneligibleForRemoval(filename)
path := s.path + string(os.PathSeparator) + filename
atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024)
atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
return fmt.Errorf("merging failed: %v", err)
}
segment, err = zap.Open(path)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
return err
}
oldNewDocNums = make(map[uint64][]uint64)
for i, segNewDocNums := range newDocNums {
oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
}
atomic.AddUint64(&s.stats.TotFileMergeSegments, uint64(len(segmentsToMerge)))
}
sm := &segmentMerge{
@ -191,18 +217,24 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
_ = segment.Close()
return nil
case s.merges <- sm:
atomic.AddUint64(&s.stats.TotFileMergeIntroductions, 1)
}
atomic.AddUint64(&s.stats.TotFileMergePlanTasksDone, 1)
}
for _, notification := range notifications {
select {
case <-s.closeCh:
return nil
case newSnapshot := <-notification:
atomic.AddUint64(&s.stats.TotFileMergeIntroductionsDone, 1)
if newSnapshot != nil {
_ = newSnapshot.DecRef()
}
}
}
return nil
}
@ -220,14 +252,19 @@ type segmentMerge struct {
func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
sbs []*zap.SegmentBase, sbsDrops []*roaring.Bitmap, sbsIndexes []int,
chunkFactor uint32) (uint64, *IndexSnapshot, uint64, error) {
atomic.AddUint64(&s.stats.TotMemMergeBeg, 1)
var br bytes.Buffer
cr := zap.NewCountHashWriter(&br)
atomic.AddUint64(&s.stats.TotMemMergeZapBeg, 1)
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset,
docValueOffset, dictLocs, fieldsInv, fieldsMap, err :=
zap.MergeToWriter(sbs, sbsDrops, chunkFactor, cr)
atomic.AddUint64(&s.stats.TotMemMergeZapEnd, 1)
if err != nil {
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
return 0, nil, 0, err
}
@ -235,6 +272,7 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
fieldsMap, fieldsInv, numDocs, storedIndexOffset, fieldsIndexOffset,
docValueOffset, dictLocs)
if err != nil {
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
return 0, nil, 0, err
}
@ -244,14 +282,20 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
path := s.path + string(os.PathSeparator) + filename
err = zap.PersistSegmentBase(sb, path)
if err != nil {
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
return 0, nil, 0, err
}
segment, err := zap.Open(path)
if err != nil {
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
return 0, nil, 0, err
}
// update persisted stats
atomic.AddUint64(&s.stats.TotPersistedItems, segment.Count())
atomic.AddUint64(&s.stats.TotPersistedSegments, 1)
sm := &segmentMerge{
id: newSegmentID,
old: make(map[uint64]*SegmentSnapshot),
@ -277,6 +321,8 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
case <-s.closeCh:
return 0, nil, 0, nil // TODO: return ErrInterruptedClosed?
case newSnapshot := <-sm.notify:
atomic.AddUint64(&s.stats.TotMemMergeSegments, uint64(len(sbs)))
atomic.AddUint64(&s.stats.TotMemMergeDone, 1)
return numDocs, newSnapshot, newSegmentID, nil
}
}

View File

@ -55,6 +55,8 @@ func (s *Scorch) persisterLoop() {
var ew *epochWatcher
OUTER:
for {
atomic.AddUint64(&s.stats.TotPersistLoopBeg, 1)
select {
case <-s.closeCh:
break OUTER
@ -65,8 +67,8 @@ OUTER:
if ew != nil && ew.epoch > lastMergedEpoch {
lastMergedEpoch = ew.epoch
}
persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch,
&lastMergedEpoch, persistWatchers)
lastMergedEpoch, persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch,
lastMergedEpoch, persistWatchers)
var ourSnapshot *IndexSnapshot
var ourPersisted []chan error
@ -94,6 +96,7 @@ OUTER:
if err != nil {
s.fireAsyncError(fmt.Errorf("got err persisting snapshot: %v", err))
_ = ourSnapshot.DecRef()
atomic.AddUint64(&s.stats.TotPersistLoopErr, 1)
continue OUTER
}
@ -115,6 +118,7 @@ OUTER:
s.fireEvent(EventKindPersisterProgress, time.Since(startTime))
if changed {
atomic.AddUint64(&s.stats.TotPersistLoopProgress, 1)
continue OUTER
}
}
@ -133,17 +137,21 @@ OUTER:
s.removeOldData() // might as well cleanup while waiting
atomic.AddUint64(&s.stats.TotPersistLoopWait, 1)
select {
case <-s.closeCh:
break OUTER
case <-w.notifyCh:
// woken up, next loop should pick up work
continue OUTER
atomic.AddUint64(&s.stats.TotPersistLoopWaitNotified, 1)
case ew = <-s.persisterNotifier:
// if the watchers are already caught up then let them wait,
// else let them continue to do the catch up
persistWatchers = append(persistWatchers, ew)
}
atomic.AddUint64(&s.stats.TotPersistLoopEnd, 1)
}
}
@ -160,29 +168,32 @@ func notifyMergeWatchers(lastPersistedEpoch uint64,
return watchersNext
}
func (s *Scorch) pausePersisterForMergerCatchUp(lastPersistedEpoch uint64, lastMergedEpoch *uint64,
persistWatchers []*epochWatcher) []*epochWatcher {
func (s *Scorch) pausePersisterForMergerCatchUp(lastPersistedEpoch uint64, lastMergedEpoch uint64,
persistWatchers []*epochWatcher) (uint64, []*epochWatcher) {
// first, let the watchers proceed if they lag behind
persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers)
OUTER:
// check for slow merger and await until the merger catch up
for lastPersistedEpoch > *lastMergedEpoch+epochDistance {
for lastPersistedEpoch > lastMergedEpoch+epochDistance {
atomic.AddUint64(&s.stats.TotPersisterSlowMergerPause, 1)
select {
case <-s.closeCh:
break OUTER
case ew := <-s.persisterNotifier:
persistWatchers = append(persistWatchers, ew)
*lastMergedEpoch = ew.epoch
lastMergedEpoch = ew.epoch
}
atomic.AddUint64(&s.stats.TotPersisterSlowMergerResume, 1)
// let the watchers proceed if they lag behind
persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers)
}
return persistWatchers
return lastMergedEpoch, persistWatchers
}
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
@ -411,8 +422,10 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
}
newIndexSnapshot.segment[i] = newSegmentSnapshot
delete(newSegments, segmentSnapshot.id)
// update items persisted incase of a new segment snapshot
atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count())
atomic.AddUint64(&s.stats.TotPersistedItems, newSegmentSnapshot.Count())
atomic.AddUint64(&s.stats.TotPersistedSegments, 1)
} else {
newIndexSnapshot.segment[i] = s.root.segment[i]
newIndexSnapshot.segment[i].segment.AddRef()

View File

@ -17,6 +17,7 @@ package scorch
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"sync"
"sync/atomic"
@ -43,7 +44,7 @@ type Scorch struct {
version uint8
config map[string]interface{}
analysisQueue *index.AnalysisQueue
stats *Stats
stats Stats
nextSegmentID uint64
path string
@ -80,7 +81,6 @@ func NewScorch(storeName string,
closeCh: make(chan struct{}),
ineligibleForRemoval: map[string]bool{},
}
rv.stats = &Stats{i: rv}
rv.root = &IndexSnapshot{parent: rv, refs: 1}
ro, ok := config["read_only"].(bool)
if ok {
@ -111,6 +111,7 @@ func (s *Scorch) fireAsyncError(err error) {
if s.onAsyncError != nil {
s.onAsyncError(err)
}
atomic.AddUint64(&s.stats.TotOnErrors, 1)
}
func (s *Scorch) Open() error {
@ -275,7 +276,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
}
close(resultChan)
atomic.AddUint64(&s.stats.analysisTime, uint64(time.Since(start)))
atomic.AddUint64(&s.stats.TotAnalysisTime, uint64(time.Since(start)))
// notify handlers that we're about to introduce a segment
s.fireEvent(EventKindBatchIntroductionStart, 0)
@ -286,6 +287,8 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
if err != nil {
return err
}
} else {
atomic.AddUint64(&s.stats.TotBatchesEmpty, 1)
}
err = s.prepareSegment(newSegment, ids, batch.InternalOps)
@ -293,12 +296,12 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
if newSegment != nil {
_ = newSegment.Close()
}
atomic.AddUint64(&s.stats.errors, 1)
atomic.AddUint64(&s.stats.TotOnErrors, 1)
} else {
atomic.AddUint64(&s.stats.updates, numUpdates)
atomic.AddUint64(&s.stats.deletes, numDeletes)
atomic.AddUint64(&s.stats.batches, 1)
atomic.AddUint64(&s.stats.numPlainTextBytesIndexed, numPlainTextBytes)
atomic.AddUint64(&s.stats.TotUpdates, numUpdates)
atomic.AddUint64(&s.stats.TotDeletes, numDeletes)
atomic.AddUint64(&s.stats.TotBatches, 1)
atomic.AddUint64(&s.stats.TotIndexedPlainTextBytes, numPlainTextBytes)
}
return err
}
@ -374,10 +377,43 @@ func (s *Scorch) Reader() (index.IndexReader, error) {
}
func (s *Scorch) Stats() json.Marshaler {
return s.stats
return &s.stats
}
func (s *Scorch) StatsMap() map[string]interface{} {
m, _ := s.stats.statsMap()
m := s.stats.ToMap()
if s.path != "" {
finfos, err := ioutil.ReadDir(s.path)
if err == nil {
var numFilesOnDisk, numBytesUsedDisk uint64
for _, finfo := range finfos {
if !finfo.IsDir() {
numBytesUsedDisk += uint64(finfo.Size())
numFilesOnDisk++
}
}
m["TotOnDiskBytes"] = numBytesUsedDisk
m["TotOnDiskFiles"] = numFilesOnDisk
}
}
// TODO: consider one day removing these backwards compatible
// names for apps using the old names
m["updates"] = m["TotUpdates"]
m["deletes"] = m["TotDeletes"]
m["batches"] = m["TotBatches"]
m["errors"] = m["TotOnErrors"]
m["analysis_time"] = m["TotAnalysisTime"]
m["index_time"] = m["TotIndexTime"]
m["term_searchers_started"] = m["TotTermSearchersStarted"]
m["term_searchers_finished"] = m["TotTermSearchersFinished"]
m["num_plain_text_bytes_indexed"] = m["TotIndexedPlainTextBytes"]
m["num_items_introduced"] = m["TotIntroducedItems"]
m["num_items_persisted"] = m["TotPersistedItems"]
m["num_bytes_used_disk"] = m["TotOnDiskBytes"]
m["num_files_on_disk"] = m["TotOnDiskFiles"]
return m
}

View File

@ -372,7 +372,7 @@ func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
rv.postings[i] = pl
rv.iterators[i] = pl.Iterator()
}
atomic.AddUint64(&i.parent.stats.termSearchersStarted, uint64(1))
atomic.AddUint64(&i.parent.stats.TotTermSearchersStarted, uint64(1))
return rv, nil
}

View File

@ -126,7 +126,7 @@ func (i *IndexSnapshotTermFieldReader) Count() uint64 {
func (i *IndexSnapshotTermFieldReader) Close() error {
if i.snapshot != nil {
atomic.AddUint64(&i.snapshot.parent.stats.termSearchersFinished, uint64(1))
atomic.AddUint64(&i.snapshot.parent.stats.TotTermSearchersFinished, uint64(1))
}
return nil
}

View File

@ -16,63 +16,103 @@ package scorch
import (
"encoding/json"
"io/ioutil"
"reflect"
"sync/atomic"
)
// Stats tracks statistics about the index
// Stats tracks statistics about the index, fields that are
// prefixed like CurXxxx are gauges (can go up and down),
// and fields that are prefixed like TotXxxx are monotonically
// increasing counters.
type Stats struct {
updates, deletes, batches, errors uint64
analysisTime, indexTime uint64
termSearchersStarted uint64
termSearchersFinished uint64
numPlainTextBytesIndexed uint64
numItemsIntroduced uint64
numItemsPersisted uint64
i *Scorch
TotUpdates uint64
TotDeletes uint64
TotBatches uint64
TotBatchesEmpty uint64
TotOnErrors uint64
TotAnalysisTime uint64
TotIndexTime uint64
TotIndexedPlainTextBytes uint64
TotTermSearchersStarted uint64
TotTermSearchersFinished uint64
TotIntroduceLoop uint64
TotIntroduceSegmentBeg uint64
TotIntroduceSegmentEnd uint64
TotIntroduceMergeBeg uint64
TotIntroduceMergeEnd uint64
TotIntroduceRevertBeg uint64
TotIntroduceRevertEnd uint64
TotIntroducedItems uint64
TotIntroducedSegmentsBatch uint64
TotIntroducedSegmentsMerge uint64
TotPersistLoopBeg uint64
TotPersistLoopErr uint64
TotPersistLoopProgress uint64
TotPersistLoopWait uint64
TotPersistLoopWaitNotified uint64
TotPersistLoopEnd uint64
TotPersistedItems uint64
TotPersistedSegments uint64
TotPersisterSlowMergerPause uint64
TotPersisterSlowMergerResume uint64
TotFileMergeLoopBeg uint64
TotFileMergeLoopErr uint64
TotFileMergeLoopEnd uint64
TotFileMergePlan uint64
TotFileMergePlanErr uint64
TotFileMergePlanNone uint64
TotFileMergePlanOk uint64
TotFileMergePlanTasks uint64
TotFileMergePlanTasksDone uint64
TotFileMergePlanTasksErr uint64
TotFileMergePlanTasksSegments uint64
TotFileMergePlanTasksSegmentsEmpty uint64
TotFileMergeSegmentsEmpty uint64
TotFileMergeSegments uint64
TotFileMergeZapBeg uint64
TotFileMergeZapEnd uint64
TotFileMergeIntroductions uint64
TotFileMergeIntroductionsDone uint64
TotMemMergeBeg uint64
TotMemMergeErr uint64
TotMemMergeDone uint64
TotMemMergeZapBeg uint64
TotMemMergeZapEnd uint64
TotMemMergeSegments uint64
}
func (s *Stats) statsMap() (map[string]interface{}, error) {
// atomically populates the returned map
func (s *Stats) ToMap() map[string]interface{} {
m := map[string]interface{}{}
m["updates"] = atomic.LoadUint64(&s.updates)
m["deletes"] = atomic.LoadUint64(&s.deletes)
m["batches"] = atomic.LoadUint64(&s.batches)
m["errors"] = atomic.LoadUint64(&s.errors)
m["analysis_time"] = atomic.LoadUint64(&s.analysisTime)
m["index_time"] = atomic.LoadUint64(&s.indexTime)
m["term_searchers_started"] = atomic.LoadUint64(&s.termSearchersStarted)
m["term_searchers_finished"] = atomic.LoadUint64(&s.termSearchersFinished)
m["num_plain_text_bytes_indexed"] = atomic.LoadUint64(&s.numPlainTextBytesIndexed)
m["num_items_introduced"] = atomic.LoadUint64(&s.numItemsIntroduced)
m["num_items_persisted"] = atomic.LoadUint64(&s.numItemsPersisted)
if s.i.path != "" {
finfos, err := ioutil.ReadDir(s.i.path)
if err != nil {
return nil, err
sve := reflect.ValueOf(s).Elem()
svet := sve.Type()
for i := 0; i < svet.NumField(); i++ {
svef := sve.Field(i)
if svef.CanAddr() {
svefp := svef.Addr().Interface()
m[svet.Field(i).Name] = atomic.LoadUint64(svefp.(*uint64))
}
var numFilesOnDisk, numBytesUsedDisk uint64
for _, finfo := range finfos {
if !finfo.IsDir() {
numBytesUsedDisk += uint64(finfo.Size())
numFilesOnDisk++
}
}
m["num_bytes_used_disk"] = numBytesUsedDisk
m["num_files_on_disk"] = numFilesOnDisk
}
return m, nil
return m
}
// MarshalJSON implements json.Marshaler
// MarshalJSON implements json.Marshaler, and in contrast to standard
// json marshaling provides atomic safety
func (s *Stats) MarshalJSON() ([]byte, error) {
m, err := s.statsMap()
if err != nil {
return nil, err
}
return json.Marshal(m)
return json.Marshal(s.ToMap())
}