abhinavdangeti 7e36109b3c MB-28162: Provide API to estimate memory needed to run a search query
This API (unexported) will estimate the amount of memory needed to execute
a search query over an index before the collector begins data collection.

Sample estimates for certain queries:
{Size: 10, BenchmarkUpsidedownSearchOverhead}
                                                           ESTIMATE    BENCHMEM
TermQuery                                                  4616        4796
MatchQuery                                                 5210        5405
DisjunctionQuery (Match queries)                           7700        8447
DisjunctionQuery (Term queries)                            6514        6591
ConjunctionQuery (Match queries)                           7524        8175
Nested disjunction query (disjunction of disjunctions)     10306       10708
2018-03-06 13:53:42 -08:00

506 lines
12 KiB

// Copyright (c) 2018 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package scorch
import (
const Name = "scorch"
const Version uint8 = 1
var ErrClosed = fmt.Errorf("scorch closed")
type Scorch struct {
readOnly bool
version uint8
config map[string]interface{}
analysisQueue *index.AnalysisQueue
stats Stats
nextSegmentID uint64
path string
unsafeBatch bool
rootLock sync.RWMutex
root *IndexSnapshot // holds 1 ref-count on the root
rootPersisted []chan error // closed when root is persisted
nextSnapshotEpoch uint64
eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC.
ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet.
closeCh chan struct{}
introductions chan *segmentIntroduction
persists chan *persistIntroduction
merges chan *segmentMerge
introducerNotifier chan *epochWatcher
revertToSnapshots chan *snapshotReversion
persisterNotifier chan *epochWatcher
rootBolt *bolt.DB
asyncTasks sync.WaitGroup
onEvent func(event Event)
onAsyncError func(err error)
func NewScorch(storeName string,
config map[string]interface{},
analysisQueue *index.AnalysisQueue) (index.Index, error) {
rv := &Scorch{
version: Version,
config: config,
analysisQueue: analysisQueue,
nextSnapshotEpoch: 1,
closeCh: make(chan struct{}),
ineligibleForRemoval: map[string]bool{},
rv.root = &IndexSnapshot{parent: rv, refs: 1}
ro, ok := config["read_only"].(bool)
if ok {
rv.readOnly = ro
ub, ok := config["unsafe_batch"].(bool)
if ok {
rv.unsafeBatch = ub
ecbName, ok := config["eventCallbackName"].(string)
if ok {
rv.onEvent = RegistryEventCallbacks[ecbName]
aecbName, ok := config["asyncErrorCallbackName"].(string)
if ok {
rv.onAsyncError = RegistryAsyncErrorCallbacks[aecbName]
return rv, nil
func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) {
if s.onEvent != nil {
s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur})
func (s *Scorch) fireAsyncError(err error) {
if s.onAsyncError != nil {
atomic.AddUint64(&s.stats.TotOnErrors, 1)
func (s *Scorch) Open() error {
err := s.openBolt()
if err != nil {
return err
go s.mainLoop()
if !s.readOnly && s.path != "" {
go s.persisterLoop()
go s.mergerLoop()
return nil
func (s *Scorch) openBolt() error {
var ok bool
s.path, ok = s.config["path"].(string)
if !ok {
return fmt.Errorf("must specify path")
if s.path == "" {
s.unsafeBatch = true
var rootBoltOpt *bolt.Options
if s.readOnly {
rootBoltOpt = &bolt.Options{
ReadOnly: true,
} else {
if s.path != "" {
err := os.MkdirAll(s.path, 0700)
if err != nil {
return err
rootBoltPath := s.path + string(os.PathSeparator) + "root.bolt"
var err error
if s.path != "" {
s.rootBolt, err = bolt.Open(rootBoltPath, 0600, rootBoltOpt)
if err != nil {
return err
// now see if there is any existing state to load
err = s.loadFromBolt()
if err != nil {
_ = s.Close()
return err
s.introductions = make(chan *segmentIntroduction)
s.persists = make(chan *persistIntroduction)
s.merges = make(chan *segmentMerge)
s.introducerNotifier = make(chan *epochWatcher, 1)
s.revertToSnapshots = make(chan *snapshotReversion)
s.persisterNotifier = make(chan *epochWatcher, 1)
if !s.readOnly && s.path != "" {
err := s.removeOldZapFiles() // Before persister or merger create any new files.
if err != nil {
_ = s.Close()
return err
return nil
func (s *Scorch) Close() (err error) {
startTime := time.Now()
defer func() {
s.fireEvent(EventKindClose, time.Since(startTime))
s.fireEvent(EventKindCloseStart, 0)
// signal to async tasks we want to close
// wait for them to close
// now close the root bolt
if s.rootBolt != nil {
err = s.rootBolt.Close()
if s.root != nil {
_ = s.root.DecRef()
s.root = nil
func (s *Scorch) Update(doc *document.Document) error {
b := index.NewBatch()
return s.Batch(b)
func (s *Scorch) Delete(id string) error {
b := index.NewBatch()
return s.Batch(b)
// Batch applices a batch of changes to the index atomically
func (s *Scorch) Batch(batch *index.Batch) (err error) {
start := time.Now()
defer func() {
s.fireEvent(EventKindBatchIntroduction, time.Since(start))
resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
var numUpdates uint64
var numDeletes uint64
var numPlainTextBytes uint64
var ids []string
for docID, doc := range batch.IndexOps {
if doc != nil {
// insert _id field
doc.AddField(document.NewTextFieldCustom("_id", nil, []byte(doc.ID), document.IndexField|document.StoreField, nil))
numPlainTextBytes += doc.NumPlainTextBytes()
} else {
ids = append(ids, docID)
// FIXME could sort ids list concurrent with analysis?
go func() {
for _, doc := range batch.IndexOps {
if doc != nil {
aw := index.NewAnalysisWork(s, doc, resultChan)
// put the work on the queue
// wait for analysis result
analysisResults := make([]*index.AnalysisResult, int(numUpdates))
var itemsDeQueued uint64
for itemsDeQueued < numUpdates {
result := <-resultChan
analysisResults[itemsDeQueued] = result
atomic.AddUint64(&s.stats.TotAnalysisTime, uint64(time.Since(start)))
indexStart := time.Now()
// notify handlers that we're about to introduce a segment
s.fireEvent(EventKindBatchIntroductionStart, 0)
var newSegment segment.Segment
if len(analysisResults) > 0 {
newSegment, err = zap.NewSegmentBase(mem.NewFromAnalyzedDocs(analysisResults), DefaultChunkFactor)
if err != nil {
return err
} else {
atomic.AddUint64(&s.stats.TotBatchesEmpty, 1)
err = s.prepareSegment(newSegment, ids, batch.InternalOps)
if err != nil {
if newSegment != nil {
_ = newSegment.Close()
atomic.AddUint64(&s.stats.TotOnErrors, 1)
} else {
atomic.AddUint64(&s.stats.TotUpdates, numUpdates)
atomic.AddUint64(&s.stats.TotDeletes, numDeletes)
atomic.AddUint64(&s.stats.TotBatches, 1)
atomic.AddUint64(&s.stats.TotIndexedPlainTextBytes, numPlainTextBytes)
atomic.AddUint64(&s.stats.TotIndexTime, uint64(time.Since(indexStart)))
return err
func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string,
internalOps map[string][]byte) error {
// new introduction
introduction := &segmentIntroduction{
id: atomic.AddUint64(&s.nextSegmentID, 1),
data: newSegment,
ids: ids,
obsoletes: make(map[uint64]*roaring.Bitmap),
internal: internalOps,
applied: make(chan error),
if !s.unsafeBatch {
introduction.persisted = make(chan error, 1)
// optimistically prepare obsoletes outside of rootLock
root := s.root
for _, seg := range root.segment {
delta, err := seg.segment.DocNumbers(ids)
if err != nil {
return err
introduction.obsoletes[seg.id] = delta
_ = root.DecRef()
introStartTime := time.Now()
s.introductions <- introduction
// block until this segment is applied
err := <-introduction.applied
if err != nil {
return err
if introduction.persisted != nil {
err = <-introduction.persisted
introTime := uint64(time.Since(introStartTime))
atomic.AddUint64(&s.stats.TotBatchIntroTime, introTime)
if atomic.LoadUint64(&s.stats.MaxBatchIntroTime) < introTime {
atomic.StoreUint64(&s.stats.MaxBatchIntroTime, introTime)
return err
func (s *Scorch) SetInternal(key, val []byte) error {
b := index.NewBatch()
b.SetInternal(key, val)
return s.Batch(b)
func (s *Scorch) DeleteInternal(key []byte) error {
b := index.NewBatch()
return s.Batch(b)
// Reader returns a low-level accessor on the index data. Close it to
// release associated resources.
func (s *Scorch) Reader() (index.IndexReader, error) {
rv := s.root
return rv, nil
func (s *Scorch) Stats() json.Marshaler {
return &s.stats
func (s *Scorch) StatsMap() map[string]interface{} {
m := s.stats.ToMap()
if s.path != "" {
finfos, err := ioutil.ReadDir(s.path)
if err == nil {
var numFilesOnDisk, numBytesUsedDisk uint64
for _, finfo := range finfos {
if !finfo.IsDir() {
numBytesUsedDisk += uint64(finfo.Size())
m["CurOnDiskBytes"] = numBytesUsedDisk
m["CurOnDiskFiles"] = numFilesOnDisk
// TODO: consider one day removing these backwards compatible
// names for apps using the old names
m["updates"] = m["TotUpdates"]
m["deletes"] = m["TotDeletes"]
m["batches"] = m["TotBatches"]
m["errors"] = m["TotOnErrors"]
m["analysis_time"] = m["TotAnalysisTime"]
m["index_time"] = m["TotIndexTime"]
m["term_searchers_started"] = m["TotTermSearchersStarted"]
m["term_searchers_finished"] = m["TotTermSearchersFinished"]
m["num_plain_text_bytes_indexed"] = m["TotIndexedPlainTextBytes"]
m["num_items_introduced"] = m["TotIntroducedItems"]
m["num_items_persisted"] = m["TotPersistedItems"]
m["num_bytes_used_disk"] = m["CurOnDiskBytes"]
m["num_files_on_disk"] = m["CurOnDiskFiles"]
return m
func (s *Scorch) Analyze(d *document.Document) *index.AnalysisResult {
rv := &index.AnalysisResult{
Document: d,
Analyzed: make([]analysis.TokenFrequencies, len(d.Fields)+len(d.CompositeFields)),
Length: make([]int, len(d.Fields)+len(d.CompositeFields)),
for i, field := range d.Fields {
if field.Options().IsIndexed() {
fieldLength, tokenFreqs := field.Analyze()
rv.Analyzed[i] = tokenFreqs
rv.Length[i] = fieldLength
if len(d.CompositeFields) > 0 {
// see if any of the composite fields need this
for _, compositeField := range d.CompositeFields {
compositeField.Compose(field.Name(), fieldLength, tokenFreqs)
return rv
func (s *Scorch) Advanced() (store.KVStore, error) {
return nil, nil
func (s *Scorch) AddEligibleForRemoval(epoch uint64) {
if s.root == nil || s.root.epoch != epoch {
s.eligibleForRemoval = append(s.eligibleForRemoval, epoch)
func (s *Scorch) MemoryUsed() uint64 {
var memUsed int
if s.root != nil {
for _, segmentSnapshot := range s.root.segment {
memUsed += 8 /* size of id -> uint64 */ +
if segmentSnapshot.deleted != nil {
memUsed += int(segmentSnapshot.deleted.GetSizeInBytes())
memUsed += segmentSnapshot.cachedDocs.size()
return uint64(memUsed)
func (s *Scorch) markIneligibleForRemoval(filename string) {
s.ineligibleForRemoval[filename] = true
func (s *Scorch) unmarkIneligibleForRemoval(filename string) {
delete(s.ineligibleForRemoval, filename)
func init() {
registry.RegisterIndexType(Name, NewScorch)