Merge pull request #799 from steveyen/scorch-optimizations
more scorch optimizations
This commit is contained in:
commit
16174c589d
|
@ -179,9 +179,19 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
|
|||
filename := zapFileName(newSegmentID)
|
||||
s.markIneligibleForRemoval(filename)
|
||||
path := s.path + string(os.PathSeparator) + filename
|
||||
|
||||
fileMergeZapStartTime := time.Now()
|
||||
|
||||
atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
|
||||
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024)
|
||||
atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)
|
||||
|
||||
fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime))
|
||||
atomic.AddUint64(&s.stats.TotFileMergeZapTime, fileMergeZapTime)
|
||||
if atomic.LoadUint64(&s.stats.MaxFileMergeZapTime) < fileMergeZapTime {
|
||||
atomic.StoreUint64(&s.stats.MaxFileMergeZapTime, fileMergeZapTime)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
s.unmarkIneligibleForRemoval(filename)
|
||||
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
|
||||
|
@ -258,11 +268,20 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
|
|||
|
||||
cr := zap.NewCountHashWriter(&br)
|
||||
|
||||
memMergeZapStartTime := time.Now()
|
||||
|
||||
atomic.AddUint64(&s.stats.TotMemMergeZapBeg, 1)
|
||||
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset,
|
||||
docValueOffset, dictLocs, fieldsInv, fieldsMap, err :=
|
||||
zap.MergeToWriter(sbs, sbsDrops, chunkFactor, cr)
|
||||
atomic.AddUint64(&s.stats.TotMemMergeZapEnd, 1)
|
||||
|
||||
memMergeZapTime := uint64(time.Since(memMergeZapStartTime))
|
||||
atomic.AddUint64(&s.stats.TotMemMergeZapTime, memMergeZapTime)
|
||||
if atomic.LoadUint64(&s.stats.MaxMemMergeZapTime) < memMergeZapTime {
|
||||
atomic.StoreUint64(&s.stats.MaxMemMergeZapTime, memMergeZapTime)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
|
||||
return 0, nil, 0, err
|
||||
|
|
|
@ -365,7 +365,7 @@ func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string,
|
|||
introTime := uint64(time.Since(introStartTime))
|
||||
atomic.AddUint64(&s.stats.TotBatchIntroTime, introTime)
|
||||
if atomic.LoadUint64(&s.stats.MaxBatchIntroTime) < introTime {
|
||||
atomic.AddUint64(&s.stats.MaxBatchIntroTime, introTime)
|
||||
atomic.StoreUint64(&s.stats.MaxBatchIntroTime, introTime)
|
||||
}
|
||||
|
||||
return err
|
||||
|
|
|
@ -253,7 +253,7 @@ func (s *Segment) processDocument(result *index.AnalysisResult) {
|
|||
// now that its been rolled up into docMap, walk that
|
||||
for fieldID, tokenFrequencies := range docMap {
|
||||
dict := s.Dicts[fieldID]
|
||||
norm := float32(1.0/math.Sqrt(float64(fieldLens[fieldID])))
|
||||
norm := float32(1.0 / math.Sqrt(float64(fieldLens[fieldID])))
|
||||
for term, tokenFreq := range tokenFrequencies {
|
||||
pid := dict[term] - 1
|
||||
bs := s.Postings[pid]
|
||||
|
|
|
@ -33,12 +33,20 @@ type Dictionary struct {
|
|||
// PostingsList returns the postings list for the specified term
|
||||
func (d *Dictionary) PostingsList(term string,
|
||||
except *roaring.Bitmap) (segment.PostingsList, error) {
|
||||
return &PostingsList{
|
||||
dictionary: d,
|
||||
term: term,
|
||||
postingsID: d.segment.Dicts[d.fieldID][term],
|
||||
except: except,
|
||||
}, nil
|
||||
return d.InitPostingsList(term, except, nil)
|
||||
}
|
||||
|
||||
func (d *Dictionary) InitPostingsList(term string, except *roaring.Bitmap,
|
||||
prealloc *PostingsList) (*PostingsList, error) {
|
||||
rv := prealloc
|
||||
if rv == nil {
|
||||
rv = &PostingsList{}
|
||||
}
|
||||
rv.dictionary = d
|
||||
rv.term = term
|
||||
rv.postingsID = d.segment.Dicts[d.fieldID][term]
|
||||
rv.except = except
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
// Iterator returns an iterator for this dictionary
|
||||
|
|
|
@ -46,9 +46,16 @@ func (p *PostingsList) Count() uint64 {
|
|||
|
||||
// Iterator returns an iterator for this postings list
|
||||
func (p *PostingsList) Iterator() segment.PostingsIterator {
|
||||
rv := &PostingsIterator{
|
||||
postings: p,
|
||||
return p.InitIterator(nil)
|
||||
}
|
||||
func (p *PostingsList) InitIterator(prealloc *PostingsIterator) *PostingsIterator {
|
||||
rv := prealloc
|
||||
if rv == nil {
|
||||
rv = &PostingsIterator{postings: p}
|
||||
} else {
|
||||
*rv = PostingsIterator{postings: p}
|
||||
}
|
||||
|
||||
if p.postingsID > 0 {
|
||||
allbits := p.dictionary.segment.Postings[p.postingsID-1]
|
||||
rv.locations = p.dictionary.segment.PostingsLocs[p.postingsID-1]
|
||||
|
|
|
@ -308,7 +308,7 @@ func persistStoredFieldValues(fieldID int,
|
|||
}
|
||||
|
||||
func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) {
|
||||
var freqOffsets, locOfffsets []uint64
|
||||
freqOffsets := make([]uint64, 0, len(memSegment.Postings))
|
||||
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
|
||||
for postingID := range memSegment.Postings {
|
||||
if postingID != 0 {
|
||||
|
@ -351,6 +351,7 @@ func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFac
|
|||
}
|
||||
|
||||
// now do it again for the locations
|
||||
locOffsets := make([]uint64, 0, len(memSegment.Postings))
|
||||
locEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
|
||||
for postingID := range memSegment.Postings {
|
||||
if postingID != 0 {
|
||||
|
@ -367,7 +368,8 @@ func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFac
|
|||
var locOffset int
|
||||
for postingsListItr.HasNext() {
|
||||
docNum := uint64(postingsListItr.Next())
|
||||
for i := 0; i < int(freqs[offset]); i++ {
|
||||
n := int(freqs[offset])
|
||||
for i := 0; i < n; i++ {
|
||||
if len(locfields) > 0 {
|
||||
// put field
|
||||
err := locEncoder.Add(docNum, uint64(locfields[locOffset]))
|
||||
|
@ -414,14 +416,15 @@ func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFac
|
|||
}
|
||||
|
||||
// record where this postings loc info starts
|
||||
locOfffsets = append(locOfffsets, uint64(w.Count()))
|
||||
locOffsets = append(locOffsets, uint64(w.Count()))
|
||||
locEncoder.Close()
|
||||
_, err := locEncoder.Write(w)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
return freqOffsets, locOfffsets, nil
|
||||
|
||||
return freqOffsets, locOffsets, nil
|
||||
}
|
||||
|
||||
func persistPostingsLocs(memSegment *mem.Segment, w *CountHashWriter) (rv []uint64, err error) {
|
||||
|
@ -532,6 +535,9 @@ func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
|
|||
fieldChunkOffsets := make(map[uint16]uint64, len(memSegment.FieldsInv))
|
||||
fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
|
||||
|
||||
var postings *mem.PostingsList
|
||||
var postingsItr *mem.PostingsIterator
|
||||
|
||||
for fieldID := range memSegment.DocValueFields {
|
||||
field := memSegment.FieldsInv[fieldID]
|
||||
docTermMap := make(map[uint64][]byte, 0)
|
||||
|
@ -543,12 +549,13 @@ func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
|
|||
dictItr := dict.Iterator()
|
||||
next, err := dictItr.Next()
|
||||
for err == nil && next != nil {
|
||||
postings, err1 := dict.PostingsList(next.Term, nil)
|
||||
var err1 error
|
||||
postings, err1 = dict.(*mem.Dictionary).InitPostingsList(next.Term, nil, postings)
|
||||
if err1 != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
postingsItr := postings.Iterator()
|
||||
postingsItr = postings.InitIterator(postingsItr)
|
||||
nextPosting, err2 := postingsItr.Next()
|
||||
for err2 == nil && nextPosting != nil {
|
||||
docNum := nextPosting.Number()
|
||||
|
@ -566,7 +573,7 @@ func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
|
|||
}
|
||||
|
||||
// sort wrt to docIDs
|
||||
var docNumbers docIDRange
|
||||
docNumbers := make(docIDRange, 0, len(docTermMap))
|
||||
for k := range docTermMap {
|
||||
docNumbers = append(docNumbers, k)
|
||||
}
|
||||
|
|
|
@ -88,8 +88,10 @@ type Stats struct {
|
|||
TotFileMergeSegmentsEmpty uint64
|
||||
TotFileMergeSegments uint64
|
||||
|
||||
TotFileMergeZapBeg uint64
|
||||
TotFileMergeZapEnd uint64
|
||||
TotFileMergeZapBeg uint64
|
||||
TotFileMergeZapEnd uint64
|
||||
TotFileMergeZapTime uint64
|
||||
MaxFileMergeZapTime uint64
|
||||
|
||||
TotFileMergeIntroductions uint64
|
||||
TotFileMergeIntroductionsDone uint64
|
||||
|
@ -99,6 +101,8 @@ type Stats struct {
|
|||
TotMemMergeDone uint64
|
||||
TotMemMergeZapBeg uint64
|
||||
TotMemMergeZapEnd uint64
|
||||
TotMemMergeZapTime uint64
|
||||
MaxMemMergeZapTime uint64
|
||||
TotMemMergeSegments uint64
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue