0
0
Fork 0

stats cleanup, renaming, gauges replaced with counters

This commit is contained in:
Steve Yen 2018-02-28 11:36:35 -08:00
parent 4b742505aa
commit 1b661ef844
5 changed files with 181 additions and 120 deletions

View File

@ -48,6 +48,8 @@ func (s *Scorch) mainLoop() {
var epochWatchers []*epochWatcher
OUTER:
for {
atomic.AddUint64(&s.stats.TotIntroduceLoop, 1)
select {
case <-s.closeCh:
break OUTER
@ -92,6 +94,9 @@ OUTER:
}
func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
atomic.AddUint64(&s.stats.TotIntroduceSegmentBeg, 1)
defer atomic.AddUint64(&s.stats.TotIntroduceSegmentEnd, 1)
// acquire lock
s.rootLock.Lock()
@ -160,7 +165,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
// increment numItemsIntroduced which tracks the number of items
// queued for persistence.
atomic.AddUint64(&s.stats.TotIntroducedItems, newSegmentSnapshot.Count())
atomic.AddUint64(&s.stats.TotIntroducedBatchSegments, 1)
atomic.AddUint64(&s.stats.TotIntroducedSegmentsBatch, 1)
}
// copy old values
for key, oldVal := range s.root.internal {
@ -193,6 +198,9 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
}
func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
atomic.AddUint64(&s.stats.TotIntroduceMergeBeg, 1)
defer atomic.AddUint64(&s.stats.TotIntroduceMergeEnd, 1)
// acquire lock
s.rootLock.Lock()
@ -271,7 +279,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
cachedDocs: &cachedDocs{cache: nil},
})
newSnapshot.offsets = append(newSnapshot.offsets, running)
atomic.AddUint64(&s.stats.TotIntroducedMergeSegments, 1)
atomic.AddUint64(&s.stats.TotIntroducedSegmentsMerge, 1)
}
newSnapshot.AddRef() // 1 ref for the nextMerge.notify response
@ -292,6 +300,9 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
}
func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error {
atomic.AddUint64(&s.stats.TotIntroduceRevertBeg, 1)
defer atomic.AddUint64(&s.stats.TotIntroduceRevertEnd, 1)
if revertTo.snapshot == nil {
err := fmt.Errorf("Cannot revert to a nil snapshot")
revertTo.applied <- err
@ -337,8 +348,6 @@ func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error {
// release lock
s.rootLock.Unlock()
atomic.AddUint64(&s.stats.TotRollbackOpsDone, 1)
if rootPrev != nil {
_ = rootPrev.DecRef()
}

View File

@ -40,6 +40,8 @@ func (s *Scorch) mergerLoop() {
OUTER:
for {
atomic.AddUint64(&s.stats.TotFileMergeLoopBeg, 1)
select {
case <-s.closeCh:
break OUTER
@ -59,6 +61,7 @@ OUTER:
if err != nil {
s.fireAsyncError(fmt.Errorf("merging err: %v", err))
_ = ourSnapshot.DecRef()
atomic.AddUint64(&s.stats.TotFileMergeLoopErr, 1)
continue OUTER
}
lastEpochMergePlanned = ourSnapshot.epoch
@ -88,7 +91,10 @@ OUTER:
case <-ew.notifyCh:
}
}
atomic.AddUint64(&s.stats.TotFileMergeLoopEnd, 1)
}
s.asyncTasks.Done()
}
@ -119,35 +125,45 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
}
}
atomic.AddUint64(&s.stats.TotFileMergePlan, 1)
// give this list to the planner
resultMergePlan, err := mergeplan.Plan(onlyZapSnapshots, options)
if err != nil {
atomic.AddUint64(&s.stats.TotFileMergePlanErr, 1)
return fmt.Errorf("merge planning err: %v", err)
}
if resultMergePlan == nil {
// nothing to do
atomic.AddUint64(&s.stats.TotFileMergePlanNone, 1)
return nil
}
mip := uint64(len(resultMergePlan.Tasks))
atomic.AddUint64(&s.stats.CurInProgressFileMerges, mip)
atomic.AddUint64(&s.stats.TotFileMergePlanOk, 1)
atomic.AddUint64(&s.stats.TotFileMergePlanTasks, uint64(len(resultMergePlan.Tasks)))
// process tasks in serial for now
var notifications []chan *IndexSnapshot
for _, task := range resultMergePlan.Tasks {
if len(task.Segments) == 0 {
atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegmentsEmpty, 1)
continue
}
atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegments, uint64(len(task.Segments)))
oldMap := make(map[uint64]*SegmentSnapshot)
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
segmentsToMerge := make([]*zap.Segment, 0, len(task.Segments))
docsToDrop := make([]*roaring.Bitmap, 0, len(task.Segments))
for _, planSegment := range task.Segments {
if segSnapshot, ok := planSegment.(*SegmentSnapshot); ok {
oldMap[segSnapshot.id] = segSnapshot
if zapSeg, ok := segSnapshot.segment.(*zap.Segment); ok {
if segSnapshot.LiveSize() == 0 {
atomic.AddUint64(&s.stats.TotFileMergeSegmentsEmpty, 1)
oldMap[segSnapshot.id] = nil
} else {
segmentsToMerge = append(segmentsToMerge, zapSeg)
@ -163,24 +179,27 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
filename := zapFileName(newSegmentID)
s.markIneligibleForRemoval(filename)
path := s.path + string(os.PathSeparator) + filename
atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024)
atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
return fmt.Errorf("merging failed: %v", err)
}
// update the count of file segments merged
atomic.AddUint64(&s.stats.TotMergedFileSegments, uint64(len(segmentsToMerge)))
segment, err = zap.Open(path)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
return err
}
oldNewDocNums = make(map[uint64][]uint64)
for i, segNewDocNums := range newDocNums {
oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
}
atomic.AddUint64(&s.stats.TotFileMergeSegments, uint64(len(segmentsToMerge)))
}
sm := &segmentMerge{
@ -198,28 +217,24 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
_ = segment.Close()
return nil
case s.merges <- sm:
atomic.AddUint64(&s.stats.TotFileMergeIntroductions, 1)
}
atomic.AddUint64(&s.stats.TotFileMergePlanTasksDone, 1)
}
atomic.AddUint64(&s.stats.CurInProgressFileMerges, ^uint64(mip-1))
var newSnapshot *IndexSnapshot
for _, notification := range notifications {
select {
case <-s.closeCh:
return nil
case newSnapshot = <-notification:
case newSnapshot := <-notification:
atomic.AddUint64(&s.stats.TotFileMergeIntroductionsDone, 1)
if newSnapshot != nil {
_ = newSnapshot.DecRef()
}
}
}
// merge operation completed and the introduction is complete
if newSnapshot != nil {
atomic.AddUint64(&s.stats.TotFileMergeOpsDone, 1)
}
return nil
}
@ -237,16 +252,19 @@ type segmentMerge struct {
func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
sbs []*zap.SegmentBase, sbsDrops []*roaring.Bitmap, sbsIndexes []int,
chunkFactor uint32) (uint64, *IndexSnapshot, uint64, error) {
atomic.AddUint64(&s.stats.TotMemMergeBeg, 1)
var br bytes.Buffer
cr := zap.NewCountHashWriter(&br)
atomic.AddUint64(&s.stats.CurInProgressMemoryMerges, 1)
atomic.AddUint64(&s.stats.TotMemMergeZapBeg, 1)
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset,
docValueOffset, dictLocs, fieldsInv, fieldsMap, err :=
zap.MergeToWriter(sbs, sbsDrops, chunkFactor, cr)
atomic.AddUint64(&s.stats.TotMemMergeZapEnd, 1)
if err != nil {
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
return 0, nil, 0, err
}
@ -254,23 +272,23 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
fieldsMap, fieldsInv, numDocs, storedIndexOffset, fieldsIndexOffset,
docValueOffset, dictLocs)
if err != nil {
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
return 0, nil, 0, err
}
// update the count of in-memory merged segments
atomic.AddUint64(&s.stats.TotMergedMemorySegments, uint64(len(sbs)))
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
filename := zapFileName(newSegmentID)
path := s.path + string(os.PathSeparator) + filename
err = zap.PersistSegmentBase(sb, path)
if err != nil {
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
return 0, nil, 0, err
}
segment, err := zap.Open(path)
if err != nil {
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
return 0, nil, 0, err
}
@ -299,14 +317,12 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
case s.merges <- sm:
}
atomic.AddUint64(&s.stats.CurInProgressMemoryMerges, ^uint64(0))
select { // wait for introduction to complete
case <-s.closeCh:
return 0, nil, 0, nil // TODO: return ErrInterruptedClosed?
case newSnapshot := <-sm.notify:
// update counters on success
atomic.AddUint64(&s.stats.TotMemoryMergeOpsDone, 1)
atomic.AddUint64(&s.stats.TotMemMergeSegments, uint64(len(sbs)))
atomic.AddUint64(&s.stats.TotMemMergeDone, 1)
return numDocs, newSnapshot, newSegmentID, nil
}
}

View File

@ -55,6 +55,8 @@ func (s *Scorch) persisterLoop() {
var ew *epochWatcher
OUTER:
for {
atomic.AddUint64(&s.stats.TotPersistLoopBeg, 1)
select {
case <-s.closeCh:
break OUTER
@ -65,8 +67,8 @@ OUTER:
if ew != nil && ew.epoch > lastMergedEpoch {
lastMergedEpoch = ew.epoch
}
persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch,
&lastMergedEpoch, persistWatchers)
lastMergedEpoch, persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch,
lastMergedEpoch, persistWatchers)
var ourSnapshot *IndexSnapshot
var ourPersisted []chan error
@ -94,6 +96,7 @@ OUTER:
if err != nil {
s.fireAsyncError(fmt.Errorf("got err persisting snapshot: %v", err))
_ = ourSnapshot.DecRef()
atomic.AddUint64(&s.stats.TotPersistLoopErr, 1)
continue OUTER
}
@ -115,6 +118,7 @@ OUTER:
s.fireEvent(EventKindPersisterProgress, time.Since(startTime))
if changed {
atomic.AddUint64(&s.stats.TotPersistLoopProgress, 1)
continue OUTER
}
}
@ -133,17 +137,21 @@ OUTER:
s.removeOldData() // might as well cleanup while waiting
atomic.AddUint64(&s.stats.TotPersistLoopWait, 1)
select {
case <-s.closeCh:
break OUTER
case <-w.notifyCh:
// woken up, next loop should pick up work
continue OUTER
atomic.AddUint64(&s.stats.TotPersistLoopWaitNotified, 1)
case ew = <-s.persisterNotifier:
// if the watchers are already caught up then let them wait,
// else let them continue to do the catch up
persistWatchers = append(persistWatchers, ew)
}
atomic.AddUint64(&s.stats.TotPersistLoopEnd, 1)
}
}
@ -160,31 +168,32 @@ func notifyMergeWatchers(lastPersistedEpoch uint64,
return watchersNext
}
func (s *Scorch) pausePersisterForMergerCatchUp(lastPersistedEpoch uint64, lastMergedEpoch *uint64,
persistWatchers []*epochWatcher) []*epochWatcher {
func (s *Scorch) pausePersisterForMergerCatchUp(lastPersistedEpoch uint64, lastMergedEpoch uint64,
persistWatchers []*epochWatcher) (uint64, []*epochWatcher) {
// first, let the watchers proceed if they lag behind
persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers)
OUTER:
// check for slow merger and await until the merger catch up
for lastPersistedEpoch > *lastMergedEpoch+epochDistance {
// update the stat on each pause cycle
atomic.AddUint64(&s.stats.TotPersisterPause, 1)
for lastPersistedEpoch > lastMergedEpoch+epochDistance {
atomic.AddUint64(&s.stats.TotPersisterSlowMergerPause, 1)
select {
case <-s.closeCh:
break OUTER
case ew := <-s.persisterNotifier:
persistWatchers = append(persistWatchers, ew)
*lastMergedEpoch = ew.epoch
lastMergedEpoch = ew.epoch
}
atomic.AddUint64(&s.stats.TotPersisterSlowMergerResume, 1)
// let the watchers proceed if they lag behind
persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers)
}
return persistWatchers
return lastMergedEpoch, persistWatchers
}
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
@ -413,6 +422,7 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
}
newIndexSnapshot.segment[i] = newSegmentSnapshot
delete(newSegments, segmentSnapshot.id)
// update items persisted incase of a new segment snapshot
atomic.AddUint64(&s.stats.TotPersistedItems, newSegmentSnapshot.Count())
atomic.AddUint64(&s.stats.TotPersistedSegments, 1)

View File

@ -17,6 +17,7 @@ package scorch
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"sync"
"sync/atomic"
@ -43,7 +44,7 @@ type Scorch struct {
version uint8
config map[string]interface{}
analysisQueue *index.AnalysisQueue
stats *Stats
stats Stats
nextSegmentID uint64
path string
@ -80,7 +81,6 @@ func NewScorch(storeName string,
closeCh: make(chan struct{}),
ineligibleForRemoval: map[string]bool{},
}
rv.stats = &Stats{i: rv}
rv.root = &IndexSnapshot{parent: rv, refs: 1}
ro, ok := config["read_only"].(bool)
if ok {
@ -288,7 +288,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
return err
}
} else {
atomic.AddUint64(&s.stats.TotEmptyBatches, 1)
atomic.AddUint64(&s.stats.TotBatchesEmpty, 1)
}
err = s.prepareSegment(newSegment, ids, batch.InternalOps)
@ -377,10 +377,43 @@ func (s *Scorch) Reader() (index.IndexReader, error) {
}
func (s *Scorch) Stats() json.Marshaler {
return s.stats
return &s.stats
}
func (s *Scorch) StatsMap() map[string]interface{} {
m, _ := s.stats.statsMap()
m := s.stats.ToMap()
if s.path != "" {
finfos, err := ioutil.ReadDir(s.path)
if err == nil {
var numFilesOnDisk, numBytesUsedDisk uint64
for _, finfo := range finfos {
if !finfo.IsDir() {
numBytesUsedDisk += uint64(finfo.Size())
numFilesOnDisk++
}
}
m["TotOnDiskBytes"] = numBytesUsedDisk
m["TotOnDiskFiles"] = numFilesOnDisk
}
}
// TODO: consider one day removing these backwards compatible
// names for apps using the old names
m["updates"] = m["TotUpdates"]
m["deletes"] = m["TotDeletes"]
m["batches"] = m["TotBatches"]
m["errors"] = m["TotOnErrors"]
m["analysis_time"] = m["TotAnalysisTime"]
m["index_time"] = m["TotIndexTime"]
m["term_searchers_started"] = m["TotTermSearchersStarted"]
m["term_searchers_finished"] = m["TotTermSearchersFinished"]
m["num_plain_text_bytes_indexed"] = m["TotIndexedPlainTextBytes"]
m["num_items_introduced"] = m["TotIntroducedItems"]
m["num_items_persisted"] = m["TotPersistedItems"]
m["num_bytes_used_disk"] = m["TotOnDiskBytes"]
m["num_files_on_disk"] = m["TotOnDiskFiles"]
return m
}

View File

@ -16,7 +16,7 @@ package scorch
import (
"encoding/json"
"io/ioutil"
"reflect"
"sync/atomic"
)
@ -25,101 +25,94 @@ import (
// and fields that are prefixed like TotXxxx are monotonically
// increasing counters.
type Stats struct {
TotUpdates uint64
TotDeletes uint64
TotBatches uint64
TotEmptyBatches uint64
TotOnErrors uint64
TotAnalysisTime uint64
TotIndexTime uint64
TotUpdates uint64
TotDeletes uint64
TotBatches uint64
TotBatchesEmpty uint64
TotOnErrors uint64
TotAnalysisTime uint64
TotIndexTime uint64
TotIndexedPlainTextBytes uint64
TotIndexSnapshotBeg uint64
TotIndexSnapshotEnd uint64
TotIntroducedBatchSegments uint64
TotIntroducedMergeSegments uint64
TotIntroducedItems uint64
TotTermSearchersStarted uint64
TotTermSearchersFinished uint64
TotIntroduceLoop uint64
TotIntroduceSegmentBeg uint64
TotIntroduceSegmentEnd uint64
TotIntroduceMergeBeg uint64
TotIntroduceMergeEnd uint64
TotIntroduceRevertBeg uint64
TotIntroduceRevertEnd uint64
TotIntroducedItems uint64
TotIntroducedSegmentsBatch uint64
TotIntroducedSegmentsMerge uint64
TotPersistLoopBeg uint64
TotPersistLoopErr uint64
TotPersistLoopProgress uint64
TotPersistLoopWait uint64
TotPersistLoopWaitNotified uint64
TotPersistLoopEnd uint64
TotPersistedItems uint64
TotPersistedSegments uint64
TotPersisterPause uint64
TotMemoryMergeOpsDone uint64
TotMergedMemorySegments uint64
TotMergedFileSegments uint64
TotFileMergeOpsDone uint64
TotPersisterSlowMergerPause uint64
TotPersisterSlowMergerResume uint64
TotRollbackOpsDone uint64
TotFileMergeLoopBeg uint64
TotFileMergeLoopErr uint64
TotFileMergeLoopEnd uint64
CurInProgressMemoryMerges uint64
CurInProgressFileMerges uint64
TotFileMergePlan uint64
TotFileMergePlanErr uint64
TotFileMergePlanNone uint64
TotFileMergePlanOk uint64
CurMemoryBytes uint64
TotFileMergePlanTasks uint64
TotFileMergePlanTasksDone uint64
TotFileMergePlanTasksErr uint64
TotFileMergePlanTasksSegments uint64
TotFileMergePlanTasksSegmentsEmpty uint64
i *Scorch
TotFileMergeSegmentsEmpty uint64
TotFileMergeSegments uint64
TotFileMergeZapBeg uint64
TotFileMergeZapEnd uint64
TotFileMergeIntroductions uint64
TotFileMergeIntroductionsDone uint64
TotMemMergeBeg uint64
TotMemMergeErr uint64
TotMemMergeDone uint64
TotMemMergeZapBeg uint64
TotMemMergeZapEnd uint64
TotMemMergeSegments uint64
}
func (s *Stats) statsMap() (map[string]interface{}, error) {
// atomically populates the returned map
func (s *Stats) ToMap() map[string]interface{} {
m := map[string]interface{}{}
m["TotUpdates"] = atomic.LoadUint64(&s.TotUpdates)
m["TotDeletes"] = atomic.LoadUint64(&s.TotDeletes)
m["TotBatches"] = atomic.LoadUint64(&s.TotBatches)
m["TotEmptyBatches"] = atomic.LoadUint64(&s.TotEmptyBatches)
m["TotOnErrors"] = atomic.LoadUint64(&s.TotOnErrors)
m["TotAnalysisTime"] = atomic.LoadUint64(&s.TotAnalysisTime)
m["TotIndexSnapshotBeg"] = atomic.LoadUint64(&s.TotIndexSnapshotBeg)
m["TotIndexSnapshotEnd"] = atomic.LoadUint64(&s.TotIndexSnapshotEnd)
m["TotTermSearchersStarted"] = atomic.LoadUint64(&s.TotTermSearchersStarted)
m["TotTermSearchersFinished"] = atomic.LoadUint64(&s.TotTermSearchersFinished)
m["TotIndexedPlainTextBytes"] = atomic.LoadUint64(&s.TotIndexedPlainTextBytes)
m["TotIntroducedItems"] = atomic.LoadUint64(&s.TotIntroducedItems)
m["TotPersistedItems"] = atomic.LoadUint64(&s.TotPersistedItems)
m["TotMemoryMergeOpsDone"] = atomic.LoadUint64(&s.TotMemoryMergeOpsDone)
m["TotFileMergeOpsDone"] = atomic.LoadUint64(&s.TotFileMergeOpsDone)
m["TotIntroducedBatchSegments"] = atomic.LoadUint64(&s.TotIntroducedBatchSegments)
m["TotIntroducedMergeSegments"] = atomic.LoadUint64(&s.TotIntroducedMergeSegments)
m["TotPersistedSegments"] = atomic.LoadUint64(&s.TotPersistedSegments)
m["TotRollbackOpsDone"] = atomic.LoadUint64(&s.TotRollbackOpsDone)
m["CurInProgressFileMerges"] = atomic.LoadUint64(&s.CurInProgressFileMerges)
m["CurInProgressMemoryMerges"] = atomic.LoadUint64(&s.CurInProgressMemoryMerges)
m["TotPersisterPause"] = atomic.LoadUint64(&s.TotPersisterPause)
m["TotMergedMemorySegments"] = atomic.LoadUint64(&s.TotMergedMemorySegments)
m["TotMergedFileSegments"] = atomic.LoadUint64(&s.TotMergedFileSegments)
m["CurMemoryBytes"] = s.i.MemoryUsed()
if s.i.path != "" {
finfos, err := ioutil.ReadDir(s.i.path)
if err != nil {
return nil, err
sve := reflect.ValueOf(s).Elem()
svet := sve.Type()
for i := 0; i < svet.NumField(); i++ {
svef := sve.Field(i)
if svef.CanAddr() {
svefp := svef.Addr().Interface()
m[svet.Field(i).Name] = atomic.LoadUint64(svefp.(*uint64))
}
var numFilesOnDisk, numBytesUsedDisk uint64
for _, finfo := range finfos {
if !finfo.IsDir() {
numBytesUsedDisk += uint64(finfo.Size())
numFilesOnDisk++
}
}
m["TotOnDiskBytes"] = numBytesUsedDisk
m["TotOnDiskFiles"] = numFilesOnDisk
}
return m, nil
return m
}
// MarshalJSON implements json.Marshaler
// MarshalJSON implements json.Marshaler, and in contrast to standard
// json marshaling provides atomic safety
func (s *Stats) MarshalJSON() ([]byte, error) {
m, err := s.statsMap()
if err != nil {
return nil, err
}
return json.Marshal(m)
return json.Marshal(s.ToMap())
}