0
0
Fork 0

adding stats for scorch

This commit is contained in:
Sreekanth Sivasankaran 2018-02-28 15:31:55 +05:30
parent 56c2acd990
commit 4b742505aa
7 changed files with 120 additions and 33 deletions

View File

@ -159,7 +159,8 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
// increment numItemsIntroduced which tracks the number of items
// queued for persistence.
atomic.AddUint64(&s.stats.numItemsIntroduced, newSegmentSnapshot.Count())
atomic.AddUint64(&s.stats.TotIntroducedItems, newSegmentSnapshot.Count())
atomic.AddUint64(&s.stats.TotIntroducedBatchSegments, 1)
}
// copy old values
for key, oldVal := range s.root.internal {
@ -270,6 +271,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
cachedDocs: &cachedDocs{cache: nil},
})
newSnapshot.offsets = append(newSnapshot.offsets, running)
atomic.AddUint64(&s.stats.TotIntroducedMergeSegments, 1)
}
newSnapshot.AddRef() // 1 ref for the nextMerge.notify response
@ -335,6 +337,8 @@ func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error {
// release lock
s.rootLock.Unlock()
atomic.AddUint64(&s.stats.TotRollbackOpsDone, 1)
if rootPrev != nil {
_ = rootPrev.DecRef()
}

View File

@ -129,6 +129,9 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
return nil
}
mip := uint64(len(resultMergePlan.Tasks))
atomic.AddUint64(&s.stats.CurInProgressFileMerges, mip)
// process tasks in serial for now
var notifications []chan *IndexSnapshot
for _, task := range resultMergePlan.Tasks {
@ -165,6 +168,10 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
s.unmarkIneligibleForRemoval(filename)
return fmt.Errorf("merging failed: %v", err)
}
// update the count of file segments merged
atomic.AddUint64(&s.stats.TotMergedFileSegments, uint64(len(segmentsToMerge)))
segment, err = zap.Open(path)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
@ -193,16 +200,26 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
case s.merges <- sm:
}
}
atomic.AddUint64(&s.stats.CurInProgressFileMerges, ^uint64(mip-1))
var newSnapshot *IndexSnapshot
for _, notification := range notifications {
select {
case <-s.closeCh:
return nil
case newSnapshot := <-notification:
case newSnapshot = <-notification:
if newSnapshot != nil {
_ = newSnapshot.DecRef()
}
}
}
// merge operation completed and the introduction is complete
if newSnapshot != nil {
atomic.AddUint64(&s.stats.TotFileMergeOpsDone, 1)
}
return nil
}
@ -224,6 +241,8 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
cr := zap.NewCountHashWriter(&br)
atomic.AddUint64(&s.stats.CurInProgressMemoryMerges, 1)
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset,
docValueOffset, dictLocs, fieldsInv, fieldsMap, err :=
zap.MergeToWriter(sbs, sbsDrops, chunkFactor, cr)
@ -238,6 +257,9 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
return 0, nil, 0, err
}
// update the count of in-memory merged segments
atomic.AddUint64(&s.stats.TotMergedMemorySegments, uint64(len(sbs)))
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
filename := zapFileName(newSegmentID)
@ -252,6 +274,10 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
return 0, nil, 0, err
}
// update persisted stats
atomic.AddUint64(&s.stats.TotPersistedItems, segment.Count())
atomic.AddUint64(&s.stats.TotPersistedSegments, 1)
sm := &segmentMerge{
id: newSegmentID,
old: make(map[uint64]*SegmentSnapshot),
@ -273,10 +299,14 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
case s.merges <- sm:
}
atomic.AddUint64(&s.stats.CurInProgressMemoryMerges, ^uint64(0))
select { // wait for introduction to complete
case <-s.closeCh:
return 0, nil, 0, nil // TODO: return ErrInterruptedClosed?
case newSnapshot := <-sm.notify:
// update counters on success
atomic.AddUint64(&s.stats.TotMemoryMergeOpsDone, 1)
return numDocs, newSnapshot, newSegmentID, nil
}
}

View File

@ -169,6 +169,8 @@ func (s *Scorch) pausePersisterForMergerCatchUp(lastPersistedEpoch uint64, lastM
OUTER:
// check for slow merger and await until the merger catch up
for lastPersistedEpoch > *lastMergedEpoch+epochDistance {
// update the stat on each pause cycle
atomic.AddUint64(&s.stats.TotPersisterPause, 1)
select {
case <-s.closeCh:
@ -412,7 +414,8 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
newIndexSnapshot.segment[i] = newSegmentSnapshot
delete(newSegments, segmentSnapshot.id)
// update items persisted incase of a new segment snapshot
atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count())
atomic.AddUint64(&s.stats.TotPersistedItems, newSegmentSnapshot.Count())
atomic.AddUint64(&s.stats.TotPersistedSegments, 1)
} else {
newIndexSnapshot.segment[i] = s.root.segment[i]
newIndexSnapshot.segment[i].segment.AddRef()

View File

@ -111,6 +111,7 @@ func (s *Scorch) fireAsyncError(err error) {
if s.onAsyncError != nil {
s.onAsyncError(err)
}
atomic.AddUint64(&s.stats.TotOnErrors, 1)
}
func (s *Scorch) Open() error {
@ -275,7 +276,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
}
close(resultChan)
atomic.AddUint64(&s.stats.analysisTime, uint64(time.Since(start)))
atomic.AddUint64(&s.stats.TotAnalysisTime, uint64(time.Since(start)))
// notify handlers that we're about to introduce a segment
s.fireEvent(EventKindBatchIntroductionStart, 0)
@ -286,6 +287,8 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
if err != nil {
return err
}
} else {
atomic.AddUint64(&s.stats.TotEmptyBatches, 1)
}
err = s.prepareSegment(newSegment, ids, batch.InternalOps)
@ -293,12 +296,12 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
if newSegment != nil {
_ = newSegment.Close()
}
atomic.AddUint64(&s.stats.errors, 1)
atomic.AddUint64(&s.stats.TotOnErrors, 1)
} else {
atomic.AddUint64(&s.stats.updates, numUpdates)
atomic.AddUint64(&s.stats.deletes, numDeletes)
atomic.AddUint64(&s.stats.batches, 1)
atomic.AddUint64(&s.stats.numPlainTextBytesIndexed, numPlainTextBytes)
atomic.AddUint64(&s.stats.TotUpdates, numUpdates)
atomic.AddUint64(&s.stats.TotDeletes, numDeletes)
atomic.AddUint64(&s.stats.TotBatches, 1)
atomic.AddUint64(&s.stats.TotIndexedPlainTextBytes, numPlainTextBytes)
}
return err
}

View File

@ -372,7 +372,7 @@ func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
rv.postings[i] = pl
rv.iterators[i] = pl.Iterator()
}
atomic.AddUint64(&i.parent.stats.termSearchersStarted, uint64(1))
atomic.AddUint64(&i.parent.stats.TotTermSearchersStarted, uint64(1))
return rv, nil
}

View File

@ -126,7 +126,7 @@ func (i *IndexSnapshotTermFieldReader) Count() uint64 {
func (i *IndexSnapshotTermFieldReader) Close() error {
if i.snapshot != nil {
atomic.AddUint64(&i.snapshot.parent.stats.termSearchersFinished, uint64(1))
atomic.AddUint64(&i.snapshot.parent.stats.TotTermSearchersFinished, uint64(1))
}
return nil
}

View File

@ -20,31 +20,78 @@ import (
"sync/atomic"
)
// Stats tracks statistics about the index
// Stats tracks statistics about the index, fields that are
// prefixed like CurXxxx are gauges (can go up and down),
// and fields that are prefixed like TotXxxx are monotonically
// increasing counters.
type Stats struct {
updates, deletes, batches, errors uint64
analysisTime, indexTime uint64
termSearchersStarted uint64
termSearchersFinished uint64
numPlainTextBytesIndexed uint64
numItemsIntroduced uint64
numItemsPersisted uint64
i *Scorch
TotUpdates uint64
TotDeletes uint64
TotBatches uint64
TotEmptyBatches uint64
TotOnErrors uint64
TotAnalysisTime uint64
TotIndexTime uint64
TotIndexedPlainTextBytes uint64
TotIndexSnapshotBeg uint64
TotIndexSnapshotEnd uint64
TotIntroducedBatchSegments uint64
TotIntroducedMergeSegments uint64
TotIntroducedItems uint64
TotTermSearchersStarted uint64
TotTermSearchersFinished uint64
TotPersistedItems uint64
TotPersistedSegments uint64
TotPersisterPause uint64
TotMemoryMergeOpsDone uint64
TotMergedMemorySegments uint64
TotMergedFileSegments uint64
TotFileMergeOpsDone uint64
TotRollbackOpsDone uint64
CurInProgressMemoryMerges uint64
CurInProgressFileMerges uint64
CurMemoryBytes uint64
i *Scorch
}
func (s *Stats) statsMap() (map[string]interface{}, error) {
m := map[string]interface{}{}
m["updates"] = atomic.LoadUint64(&s.updates)
m["deletes"] = atomic.LoadUint64(&s.deletes)
m["batches"] = atomic.LoadUint64(&s.batches)
m["errors"] = atomic.LoadUint64(&s.errors)
m["analysis_time"] = atomic.LoadUint64(&s.analysisTime)
m["index_time"] = atomic.LoadUint64(&s.indexTime)
m["term_searchers_started"] = atomic.LoadUint64(&s.termSearchersStarted)
m["term_searchers_finished"] = atomic.LoadUint64(&s.termSearchersFinished)
m["num_plain_text_bytes_indexed"] = atomic.LoadUint64(&s.numPlainTextBytesIndexed)
m["num_items_introduced"] = atomic.LoadUint64(&s.numItemsIntroduced)
m["num_items_persisted"] = atomic.LoadUint64(&s.numItemsPersisted)
m["TotUpdates"] = atomic.LoadUint64(&s.TotUpdates)
m["TotDeletes"] = atomic.LoadUint64(&s.TotDeletes)
m["TotBatches"] = atomic.LoadUint64(&s.TotBatches)
m["TotEmptyBatches"] = atomic.LoadUint64(&s.TotEmptyBatches)
m["TotOnErrors"] = atomic.LoadUint64(&s.TotOnErrors)
m["TotAnalysisTime"] = atomic.LoadUint64(&s.TotAnalysisTime)
m["TotIndexSnapshotBeg"] = atomic.LoadUint64(&s.TotIndexSnapshotBeg)
m["TotIndexSnapshotEnd"] = atomic.LoadUint64(&s.TotIndexSnapshotEnd)
m["TotTermSearchersStarted"] = atomic.LoadUint64(&s.TotTermSearchersStarted)
m["TotTermSearchersFinished"] = atomic.LoadUint64(&s.TotTermSearchersFinished)
m["TotIndexedPlainTextBytes"] = atomic.LoadUint64(&s.TotIndexedPlainTextBytes)
m["TotIntroducedItems"] = atomic.LoadUint64(&s.TotIntroducedItems)
m["TotPersistedItems"] = atomic.LoadUint64(&s.TotPersistedItems)
m["TotMemoryMergeOpsDone"] = atomic.LoadUint64(&s.TotMemoryMergeOpsDone)
m["TotFileMergeOpsDone"] = atomic.LoadUint64(&s.TotFileMergeOpsDone)
m["TotIntroducedBatchSegments"] = atomic.LoadUint64(&s.TotIntroducedBatchSegments)
m["TotIntroducedMergeSegments"] = atomic.LoadUint64(&s.TotIntroducedMergeSegments)
m["TotPersistedSegments"] = atomic.LoadUint64(&s.TotPersistedSegments)
m["TotRollbackOpsDone"] = atomic.LoadUint64(&s.TotRollbackOpsDone)
m["CurInProgressFileMerges"] = atomic.LoadUint64(&s.CurInProgressFileMerges)
m["CurInProgressMemoryMerges"] = atomic.LoadUint64(&s.CurInProgressMemoryMerges)
m["TotPersisterPause"] = atomic.LoadUint64(&s.TotPersisterPause)
m["TotMergedMemorySegments"] = atomic.LoadUint64(&s.TotMergedMemorySegments)
m["TotMergedFileSegments"] = atomic.LoadUint64(&s.TotMergedFileSegments)
m["CurMemoryBytes"] = s.i.MemoryUsed()
if s.i.path != "" {
finfos, err := ioutil.ReadDir(s.i.path)
@ -61,8 +108,8 @@ func (s *Stats) statsMap() (map[string]interface{}, error) {
}
}
m["num_bytes_used_disk"] = numBytesUsedDisk
m["num_files_on_disk"] = numFilesOnDisk
m["TotOnDiskBytes"] = numBytesUsedDisk
m["TotOnDiskFiles"] = numFilesOnDisk
}
return m, nil