Merge pull request #830 from mschoch/avoid-app-herder-hot-lock
memoize the size of an entire index snapshot
This commit is contained in:
commit
9a87593fd7
|
@ -186,6 +186,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
|||
}
|
||||
}
|
||||
|
||||
newSnapshot.updateSize()
|
||||
s.rootLock.Lock()
|
||||
if next.persisted != nil {
|
||||
s.rootPersisted = append(s.rootPersisted, next.persisted)
|
||||
|
@ -251,6 +252,7 @@ func (s *Scorch) introducePersist(persist *persistIntroduction) {
|
|||
newIndexSnapshot.internal[k] = v
|
||||
}
|
||||
|
||||
newIndexSnapshot.updateSize()
|
||||
s.rootLock.Lock()
|
||||
rootPrev := s.root
|
||||
s.root = newIndexSnapshot
|
||||
|
@ -348,6 +350,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
|||
|
||||
newSnapshot.AddRef() // 1 ref for the nextMerge.notify response
|
||||
|
||||
newSnapshot.updateSize()
|
||||
s.rootLock.Lock()
|
||||
// swap in new index snapshot
|
||||
newSnapshot.epoch = s.nextSnapshotEpoch
|
||||
|
@ -409,6 +412,7 @@ func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error {
|
|||
s.rootPersisted = append(s.rootPersisted, revertTo.persisted)
|
||||
}
|
||||
|
||||
newSnapshot.updateSize()
|
||||
// swap in new snapshot
|
||||
rootPrev := s.root
|
||||
s.root = newSnapshot
|
||||
|
|
|
@ -397,11 +397,15 @@ func (s *Scorch) DeleteInternal(key []byte) error {
|
|||
// Reader returns a low-level accessor on the index data. Close it to
|
||||
// release associated resources.
|
||||
func (s *Scorch) Reader() (index.IndexReader, error) {
|
||||
return s.currentSnapshot(), nil
|
||||
}
|
||||
|
||||
func (s *Scorch) currentSnapshot() *IndexSnapshot {
|
||||
s.rootLock.RLock()
|
||||
rv := s.root
|
||||
rv.AddRef()
|
||||
s.rootLock.RUnlock()
|
||||
return rv, nil
|
||||
return rv
|
||||
}
|
||||
|
||||
func (s *Scorch) Stats() json.Marshaler {
|
||||
|
@ -484,20 +488,11 @@ func (s *Scorch) AddEligibleForRemoval(epoch uint64) {
|
|||
}
|
||||
|
||||
func (s *Scorch) MemoryUsed() uint64 {
|
||||
var memUsed int
|
||||
s.rootLock.RLock()
|
||||
if s.root != nil {
|
||||
for _, segmentSnapshot := range s.root.segment {
|
||||
memUsed += 8 /* size of id -> uint64 */ +
|
||||
segmentSnapshot.segment.Size()
|
||||
if segmentSnapshot.deleted != nil {
|
||||
memUsed += int(segmentSnapshot.deleted.GetSizeInBytes())
|
||||
}
|
||||
memUsed += segmentSnapshot.cachedDocs.size()
|
||||
}
|
||||
}
|
||||
s.rootLock.RUnlock()
|
||||
return uint64(memUsed)
|
||||
indexSnapshot := s.currentSnapshot()
|
||||
defer func() {
|
||||
_ = indexSnapshot.Close()
|
||||
}()
|
||||
return uint64(indexSnapshot.SizeFull())
|
||||
}
|
||||
|
||||
func (s *Scorch) markIneligibleForRemoval(filename string) {
|
||||
|
|
|
@ -138,6 +138,7 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkFactor uint32,
|
|||
dictLocs: dictLocs,
|
||||
fieldDvIterMap: make(map[uint16]*docValueIterator),
|
||||
}
|
||||
sb.updateSize()
|
||||
|
||||
err := sb.loadDvIterators()
|
||||
if err != nil {
|
||||
|
|
|
@ -63,6 +63,7 @@ func Open(path string) (segment.Segment, error) {
|
|||
path: path,
|
||||
refs: 1,
|
||||
}
|
||||
rv.SegmentBase.updateSize()
|
||||
|
||||
err = rv.loadConfig()
|
||||
if err != nil {
|
||||
|
@ -99,9 +100,14 @@ type SegmentBase struct {
|
|||
docValueOffset uint64
|
||||
dictLocs []uint64
|
||||
fieldDvIterMap map[uint16]*docValueIterator // naive chunk cache per field
|
||||
size uint64
|
||||
}
|
||||
|
||||
func (sb *SegmentBase) Size() int {
|
||||
return int(sb.size)
|
||||
}
|
||||
|
||||
func (sb *SegmentBase) updateSize() {
|
||||
sizeInBytes := reflectStaticSizeSegmentBase +
|
||||
len(sb.mem)
|
||||
|
||||
|
@ -124,7 +130,7 @@ func (sb *SegmentBase) Size() int {
|
|||
}
|
||||
}
|
||||
|
||||
return sizeInBytes
|
||||
sb.size = uint64(sizeInBytes)
|
||||
}
|
||||
|
||||
func (sb *SegmentBase) AddRef() {}
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"container/heap"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -40,12 +41,20 @@ type asynchSegmentResult struct {
|
|||
err error
|
||||
}
|
||||
|
||||
var reflectStaticSizeIndexSnapshot int
|
||||
|
||||
func init() {
|
||||
var is interface{} = IndexSnapshot{}
|
||||
reflectStaticSizeIndexSnapshot = int(reflect.TypeOf(is).Size())
|
||||
}
|
||||
|
||||
type IndexSnapshot struct {
|
||||
parent *Scorch
|
||||
segment []*SegmentSnapshot
|
||||
offsets []uint64
|
||||
internal map[string][]byte
|
||||
epoch uint64
|
||||
size uint64
|
||||
|
||||
m sync.Mutex // Protects the fields that follow.
|
||||
refs int64
|
||||
|
@ -89,6 +98,17 @@ func (i *IndexSnapshot) Close() error {
|
|||
return i.DecRef()
|
||||
}
|
||||
|
||||
func (i *IndexSnapshot) SizeFull() int {
|
||||
return int(i.size)
|
||||
}
|
||||
|
||||
func (i *IndexSnapshot) updateSize() {
|
||||
i.size += uint64(reflectStaticSizeIndexSnapshot)
|
||||
for _, s := range i.segment {
|
||||
i.size += uint64(s.Size())
|
||||
}
|
||||
}
|
||||
|
||||
func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string, makeItr func(i segment.TermDictionary) segment.DictionaryIterator) (*IndexSnapshotFieldDict, error) {
|
||||
|
||||
results := make(chan *asynchSegmentResult)
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment"
|
||||
"github.com/blevesearch/bleve/size"
|
||||
)
|
||||
|
||||
var TermSeparator byte = 0xff
|
||||
|
@ -128,15 +129,26 @@ func (s *SegmentSnapshot) Fields() []string {
|
|||
return s.segment.Fields()
|
||||
}
|
||||
|
||||
func (s *SegmentSnapshot) Size() (rv int) {
|
||||
rv = s.segment.Size()
|
||||
if s.deleted != nil {
|
||||
rv += int(s.deleted.GetSizeInBytes())
|
||||
}
|
||||
rv += s.cachedDocs.Size()
|
||||
return
|
||||
}
|
||||
|
||||
type cachedFieldDocs struct {
|
||||
readyCh chan struct{} // closed when the cachedFieldDocs.docs is ready to be used.
|
||||
err error // Non-nil if there was an error when preparing this cachedFieldDocs.
|
||||
docs map[uint64][]byte // Keyed by localDocNum, value is a list of terms delimited by 0xFF.
|
||||
size uint64
|
||||
}
|
||||
|
||||
func (cfd *cachedFieldDocs) prepareFields(field string, ss *SegmentSnapshot) {
|
||||
defer close(cfd.readyCh)
|
||||
|
||||
cfd.size += uint64(size.SizeOfUint64) /* size field */
|
||||
dict, err := ss.segment.Dictionary(field)
|
||||
if err != nil {
|
||||
cfd.err = err
|
||||
|
@ -152,12 +164,14 @@ func (cfd *cachedFieldDocs) prepareFields(field string, ss *SegmentSnapshot) {
|
|||
return
|
||||
}
|
||||
|
||||
cfd.size += uint64(size.SizeOfUint64) /* map key */
|
||||
postingsItr := postings.Iterator()
|
||||
nextPosting, err2 := postingsItr.Next()
|
||||
for err2 == nil && nextPosting != nil {
|
||||
docNum := nextPosting.Number()
|
||||
cfd.docs[docNum] = append(cfd.docs[docNum], []byte(next.Term)...)
|
||||
cfd.docs[docNum] = append(cfd.docs[docNum], TermSeparator)
|
||||
cfd.size += uint64(len(next.Term) + 1) // map value
|
||||
nextPosting, err2 = postingsItr.Next()
|
||||
}
|
||||
|
||||
|
@ -178,6 +192,7 @@ func (cfd *cachedFieldDocs) prepareFields(field string, ss *SegmentSnapshot) {
|
|||
type cachedDocs struct {
|
||||
m sync.Mutex // As the cache is asynchronously prepared, need a lock
|
||||
cache map[string]*cachedFieldDocs // Keyed by field
|
||||
size uint64
|
||||
}
|
||||
|
||||
func (c *cachedDocs) prepareFields(wantedFields []string, ss *SegmentSnapshot) error {
|
||||
|
@ -208,14 +223,18 @@ func (c *cachedDocs) prepareFields(wantedFields []string, ss *SegmentSnapshot) e
|
|||
}
|
||||
c.m.Lock()
|
||||
}
|
||||
c.updateSizeLOCKED()
|
||||
|
||||
c.m.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cachedDocs) size() int {
|
||||
func (c *cachedDocs) Size() int {
|
||||
return int(c.size)
|
||||
}
|
||||
|
||||
func (c *cachedDocs) updateSizeLOCKED() {
|
||||
sizeInBytes := 0
|
||||
c.m.Lock()
|
||||
for k, v := range c.cache { // cachedFieldDocs
|
||||
sizeInBytes += len(k)
|
||||
if v != nil {
|
||||
|
@ -224,6 +243,5 @@ func (c *cachedDocs) size() int {
|
|||
}
|
||||
}
|
||||
}
|
||||
c.m.Unlock()
|
||||
return sizeInBytes
|
||||
c.size = uint64(sizeInBytes)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue