Merge pull request #705 from abhinavdangeti/scorch
Scorch specific stats
This commit is contained in:
commit
a475ee886d
|
@ -16,6 +16,7 @@ package scorch
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment"
|
||||
|
@ -142,12 +143,17 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
|||
}
|
||||
// append new segment, if any, to end of the new index snapshot
|
||||
if next.data != nil {
|
||||
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
|
||||
newSegmentSnapshot := &SegmentSnapshot{
|
||||
id: next.id,
|
||||
segment: next.data, // take ownership of next.data's ref-count
|
||||
cachedDocs: &cachedDocs{cache: nil},
|
||||
})
|
||||
}
|
||||
newSnapshot.segment = append(newSnapshot.segment, newSegmentSnapshot)
|
||||
newSnapshot.offsets = append(newSnapshot.offsets, running)
|
||||
|
||||
// increment numItemsIntroduced which tracks the number of items
|
||||
// queued for persistence.
|
||||
atomic.AddUint64(&s.stats.numItemsIntroduced, newSegmentSnapshot.Count())
|
||||
}
|
||||
// copy old values
|
||||
for key, oldVal := range s.root.internal {
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment"
|
||||
|
@ -75,6 +76,7 @@ OUTER:
|
|||
_ = ourSnapshot.DecRef()
|
||||
continue OUTER
|
||||
}
|
||||
|
||||
lastPersistedEpoch = ourSnapshot.epoch
|
||||
for _, notifyCh := range notifyChs {
|
||||
close(notifyCh)
|
||||
|
@ -243,6 +245,8 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
|
|||
cachedDocs: segmentSnapshot.cachedDocs,
|
||||
}
|
||||
newIndexSnapshot.segment[i] = newSegmentSnapshot
|
||||
// update items persisted incase of a new segment snapshot
|
||||
atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count())
|
||||
} else {
|
||||
newIndexSnapshot.segment[i] = s.root.segment[i]
|
||||
newIndexSnapshot.segment[i].segment.AddRef()
|
||||
|
|
|
@ -192,6 +192,7 @@ func (s *Scorch) Batch(batch *index.Batch) error {
|
|||
resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
|
||||
|
||||
var numUpdates uint64
|
||||
var numDeletes uint64
|
||||
var numPlainTextBytes uint64
|
||||
var ids []string
|
||||
for docID, doc := range batch.IndexOps {
|
||||
|
@ -200,6 +201,8 @@ func (s *Scorch) Batch(batch *index.Batch) error {
|
|||
doc.AddField(document.NewTextFieldCustom("_id", nil, []byte(doc.ID), document.IndexField|document.StoreField, nil))
|
||||
numUpdates++
|
||||
numPlainTextBytes += doc.NumPlainTextBytes()
|
||||
} else {
|
||||
numDeletes++
|
||||
}
|
||||
ids = append(ids, docID)
|
||||
}
|
||||
|
@ -234,8 +237,16 @@ func (s *Scorch) Batch(batch *index.Batch) error {
|
|||
}
|
||||
|
||||
err := s.prepareSegment(newSegment, ids, batch.InternalOps)
|
||||
if err != nil && newSegment != nil {
|
||||
_ = newSegment.Close()
|
||||
if err != nil {
|
||||
if newSegment != nil {
|
||||
_ = newSegment.Close()
|
||||
}
|
||||
atomic.AddUint64(&s.stats.errors, 1)
|
||||
} else {
|
||||
atomic.AddUint64(&s.stats.updates, numUpdates)
|
||||
atomic.AddUint64(&s.stats.deletes, numDeletes)
|
||||
atomic.AddUint64(&s.stats.batches, 1)
|
||||
atomic.AddUint64(&s.stats.numPlainTextBytesIndexed, numPlainTextBytes)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/blevesearch/bleve/document"
|
||||
|
@ -363,6 +364,7 @@ func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
|
|||
rv.postings[i] = pl
|
||||
rv.iterators[i] = pl.Iterator()
|
||||
}
|
||||
atomic.AddUint64(&i.parent.stats.termSearchersStarted, uint64(1))
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ package scorch
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/blevesearch/bleve/index"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment"
|
||||
|
@ -124,5 +125,8 @@ func (i *IndexSnapshotTermFieldReader) Count() uint64 {
|
|||
}
|
||||
|
||||
func (i *IndexSnapshotTermFieldReader) Close() error {
|
||||
if i.snapshot != nil {
|
||||
atomic.AddUint64(&i.snapshot.parent.stats.termSearchersFinished, uint64(1))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -21,21 +21,28 @@ import (
|
|||
|
||||
// Stats tracks statistics about the index
|
||||
type Stats struct {
|
||||
analysisTime, indexTime uint64
|
||||
updates, deletes, batches, errors uint64
|
||||
analysisTime, indexTime uint64
|
||||
termSearchersStarted uint64
|
||||
termSearchersFinished uint64
|
||||
numPlainTextBytesIndexed uint64
|
||||
numItemsIntroduced uint64
|
||||
numItemsPersisted uint64
|
||||
}
|
||||
|
||||
// FIXME wire up these other stats again
|
||||
func (s *Stats) statsMap() map[string]interface{} {
|
||||
m := map[string]interface{}{}
|
||||
// m["updates"] = atomic.LoadUint64(&i.updates)
|
||||
// m["deletes"] = atomic.LoadUint64(&i.deletes)
|
||||
// m["batches"] = atomic.LoadUint64(&i.batches)
|
||||
// m["errors"] = atomic.LoadUint64(&i.errors)
|
||||
m["updates"] = atomic.LoadUint64(&s.updates)
|
||||
m["deletes"] = atomic.LoadUint64(&s.deletes)
|
||||
m["batches"] = atomic.LoadUint64(&s.batches)
|
||||
m["errors"] = atomic.LoadUint64(&s.errors)
|
||||
m["analysis_time"] = atomic.LoadUint64(&s.analysisTime)
|
||||
m["index_time"] = atomic.LoadUint64(&s.indexTime)
|
||||
// m["term_searchers_started"] = atomic.LoadUint64(&i.termSearchersStarted)
|
||||
// m["term_searchers_finished"] = atomic.LoadUint64(&i.termSearchersFinished)
|
||||
// m["num_plain_text_bytes_indexed"] = atomic.LoadUint64(&i.numPlainTextBytesIndexed)
|
||||
m["term_searchers_started"] = atomic.LoadUint64(&s.termSearchersStarted)
|
||||
m["term_searchers_finished"] = atomic.LoadUint64(&s.termSearchersFinished)
|
||||
m["num_plain_text_bytes_indexed"] = atomic.LoadUint64(&s.numPlainTextBytesIndexed)
|
||||
m["num_items_introduced"] = atomic.LoadUint64(&s.numItemsIntroduced)
|
||||
m["num_items_persisted"] = atomic.LoadUint64(&s.numItemsPersisted)
|
||||
|
||||
return m
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue