0
0
Fork 0

Merge pull request #734 from steveyen/master

scorch mem segment optimizations
This commit is contained in:
Steve Yen 2018-01-16 08:57:02 -08:00 committed by GitHub
commit f4c3f984a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 167 additions and 93 deletions

View File

@ -24,13 +24,16 @@ import (
"github.com/blevesearch/bleve/index"
)
// NewFromAnalyzedDocs places the analyzed document mutations into this segment
// NewFromAnalyzedDocs places the analyzed document mutations into a new segment
func NewFromAnalyzedDocs(results []*index.AnalysisResult) *Segment {
s := New()
// ensure that _id field get fieldID 0
s.getOrDefineField("_id")
// fill Dicts/DictKeys and preallocate memory
s.initializeDict(results)
// walk each doc
for _, result := range results {
s.processDocument(result)
@ -82,10 +85,117 @@ func NewFromAnalyzedDocs(results []*index.AnalysisResult) *Segment {
return s
}
// fill Dicts/DictKeys and preallocate memory for postings
func (s *Segment) initializeDict(results []*index.AnalysisResult) {
var numPostingsLists int
numTermsPerPostingsList := make([]int, 0, 64) // Keyed by postings list id.
numLocsPerPostingsList := make([]int, 0, 64) // Keyed by postings list id.
var numTokenFrequencies int
var totLocs int
processField := func(fieldID uint16, tfs analysis.TokenFrequencies) {
for term, tf := range tfs {
pidPlus1, exists := s.Dicts[fieldID][term]
if !exists {
numPostingsLists++
pidPlus1 = uint64(numPostingsLists)
s.Dicts[fieldID][term] = pidPlus1
s.DictKeys[fieldID] = append(s.DictKeys[fieldID], term)
numTermsPerPostingsList = append(numTermsPerPostingsList, 0)
numLocsPerPostingsList = append(numLocsPerPostingsList, 0)
}
pid := pidPlus1 - 1
numTermsPerPostingsList[pid] += 1
numLocsPerPostingsList[pid] += len(tf.Locations)
totLocs += len(tf.Locations)
}
numTokenFrequencies += len(tfs)
}
for _, result := range results {
// walk each composite field
for _, field := range result.Document.CompositeFields {
fieldID := uint16(s.getOrDefineField(field.Name()))
_, tf := field.Analyze()
processField(fieldID, tf)
}
// walk each field
for i, field := range result.Document.Fields {
fieldID := uint16(s.getOrDefineField(field.Name()))
tf := result.Analyzed[i]
processField(fieldID, tf)
}
}
s.Postings = make([]*roaring.Bitmap, numPostingsLists)
for i := 0; i < numPostingsLists; i++ {
s.Postings[i] = roaring.New()
}
s.PostingsLocs = make([]*roaring.Bitmap, numPostingsLists)
for i := 0; i < numPostingsLists; i++ {
s.PostingsLocs[i] = roaring.New()
}
// Preallocate big, contiguous backing arrays.
auint64Backing := make([][]uint64, numPostingsLists*4+totLocs) // For Freqs, Locstarts, Locends, Locpos, sub-Locarraypos.
uint64Backing := make([]uint64, numTokenFrequencies+totLocs*3) // For sub-Freqs, sub-Locstarts, sub-Locends, sub-Locpos.
float32Backing := make([]float32, numTokenFrequencies) // For sub-Norms.
uint16Backing := make([]uint16, totLocs) // For sub-Locfields.
// Point top-level slices to the backing arrays.
s.Freqs = auint64Backing[0:numPostingsLists]
auint64Backing = auint64Backing[numPostingsLists:]
s.Norms = make([][]float32, numPostingsLists)
s.Locfields = make([][]uint16, numPostingsLists)
s.Locstarts = auint64Backing[0:numPostingsLists]
auint64Backing = auint64Backing[numPostingsLists:]
s.Locends = auint64Backing[0:numPostingsLists]
auint64Backing = auint64Backing[numPostingsLists:]
s.Locpos = auint64Backing[0:numPostingsLists]
auint64Backing = auint64Backing[numPostingsLists:]
s.Locarraypos = make([][][]uint64, numPostingsLists)
// Point sub-slices to the backing arrays.
for pid, numTerms := range numTermsPerPostingsList {
s.Freqs[pid] = uint64Backing[0:0]
uint64Backing = uint64Backing[numTerms:]
s.Norms[pid] = float32Backing[0:0]
float32Backing = float32Backing[numTerms:]
}
for pid, numLocs := range numLocsPerPostingsList {
s.Locfields[pid] = uint16Backing[0:0]
uint16Backing = uint16Backing[numLocs:]
s.Locstarts[pid] = uint64Backing[0:0]
uint64Backing = uint64Backing[numLocs:]
s.Locends[pid] = uint64Backing[0:0]
uint64Backing = uint64Backing[numLocs:]
s.Locpos[pid] = uint64Backing[0:0]
uint64Backing = uint64Backing[numLocs:]
s.Locarraypos[pid] = auint64Backing[0:0]
auint64Backing = auint64Backing[numLocs:]
}
}
func (s *Segment) processDocument(result *index.AnalysisResult) {
// used to collate information across fields
docMap := map[uint16]analysis.TokenFrequencies{}
fieldLens := map[uint16]int{}
docMap := make(map[uint16]analysis.TokenFrequencies, len(s.FieldsMap))
fieldLens := make(map[uint16]int, len(s.FieldsMap))
docNum := uint64(s.addDocument())
processField := func(field uint16, name string, l int, tf analysis.TokenFrequencies) {
@ -128,81 +238,27 @@ func (s *Segment) processDocument(result *index.AnalysisResult) {
// now that its been rolled up into docMap, walk that
for fieldID, tokenFrequencies := range docMap {
for term, tokenFreq := range tokenFrequencies {
fieldTermPostings := s.Dicts[fieldID][term]
// FIXME this if/else block has duplicate code that has resulted in
// bugs fixed/missed more than once, need to refactor
if fieldTermPostings == 0 {
// need to build new posting
bs := roaring.New()
bs.AddInt(int(docNum))
newPostingID := uint64(len(s.Postings) + 1)
// add this new bitset to the postings slice
s.Postings = append(s.Postings, bs)
locationBS := roaring.New()
s.PostingsLocs = append(s.PostingsLocs, locationBS)
// add this to the details slice
s.Freqs = append(s.Freqs, []uint64{uint64(tokenFreq.Frequency())})
s.Norms = append(s.Norms, []float32{float32(1.0 / math.Sqrt(float64(fieldLens[fieldID])))})
// add to locations
var locfields []uint16
var locstarts []uint64
var locends []uint64
var locpos []uint64
var locarraypos [][]uint64
if len(tokenFreq.Locations) > 0 {
locationBS.AddInt(int(docNum))
}
pid := s.Dicts[fieldID][term] - 1
bs := s.Postings[pid]
bs.AddInt(int(docNum))
s.Freqs[pid] = append(s.Freqs[pid], uint64(tokenFreq.Frequency()))
s.Norms[pid] = append(s.Norms[pid], float32(1.0/math.Sqrt(float64(fieldLens[fieldID]))))
locationBS := s.PostingsLocs[pid]
if len(tokenFreq.Locations) > 0 {
locationBS.AddInt(int(docNum))
for _, loc := range tokenFreq.Locations {
var locf = fieldID
if loc.Field != "" {
locf = uint16(s.getOrDefineField(loc.Field))
}
locfields = append(locfields, locf)
locstarts = append(locstarts, uint64(loc.Start))
locends = append(locends, uint64(loc.End))
locpos = append(locpos, uint64(loc.Position))
s.Locfields[pid] = append(s.Locfields[pid], locf)
s.Locstarts[pid] = append(s.Locstarts[pid], uint64(loc.Start))
s.Locends[pid] = append(s.Locends[pid], uint64(loc.End))
s.Locpos[pid] = append(s.Locpos[pid], uint64(loc.Position))
if len(loc.ArrayPositions) > 0 {
locarraypos = append(locarraypos, loc.ArrayPositions)
s.Locarraypos[pid] = append(s.Locarraypos[pid], loc.ArrayPositions)
} else {
locarraypos = append(locarraypos, nil)
}
}
s.Locfields = append(s.Locfields, locfields)
s.Locstarts = append(s.Locstarts, locstarts)
s.Locends = append(s.Locends, locends)
s.Locpos = append(s.Locpos, locpos)
s.Locarraypos = append(s.Locarraypos, locarraypos)
// record it
s.Dicts[fieldID][term] = newPostingID
// this term was new for this field, add it to dictKeys
s.DictKeys[fieldID] = append(s.DictKeys[fieldID], term)
} else {
// posting already started for this field/term
// the actual offset is - 1, because 0 is zero value
bs := s.Postings[fieldTermPostings-1]
bs.AddInt(int(docNum))
locationBS := s.PostingsLocs[fieldTermPostings-1]
s.Freqs[fieldTermPostings-1] = append(s.Freqs[fieldTermPostings-1], uint64(tokenFreq.Frequency()))
s.Norms[fieldTermPostings-1] = append(s.Norms[fieldTermPostings-1], float32(1.0/math.Sqrt(float64(fieldLens[fieldID]))))
if len(tokenFreq.Locations) > 0 {
locationBS.AddInt(int(docNum))
}
for _, loc := range tokenFreq.Locations {
var locf = fieldID
if loc.Field != "" {
locf = uint16(s.getOrDefineField(loc.Field))
}
s.Locfields[fieldTermPostings-1] = append(s.Locfields[fieldTermPostings-1], locf)
s.Locstarts[fieldTermPostings-1] = append(s.Locstarts[fieldTermPostings-1], uint64(loc.Start))
s.Locends[fieldTermPostings-1] = append(s.Locends[fieldTermPostings-1], uint64(loc.End))
s.Locpos[fieldTermPostings-1] = append(s.Locpos[fieldTermPostings-1], uint64(loc.Position))
if len(loc.ArrayPositions) > 0 {
s.Locarraypos[fieldTermPostings-1] = append(s.Locarraypos[fieldTermPostings-1], loc.ArrayPositions)
} else {
s.Locarraypos[fieldTermPostings-1] = append(s.Locarraypos[fieldTermPostings-1], nil)
s.Locarraypos[pid] = append(s.Locarraypos[pid], nil)
}
}
}

View File

@ -46,7 +46,7 @@ type Segment struct {
FieldsInv []string
// term dictionary
// field id -> term -> posting id + 1
// field id -> term -> postings list id + 1
Dicts []map[string]uint64
// term dictionary keys
@ -54,7 +54,7 @@ type Segment struct {
DictKeys [][]string
// Postings list
// Postings list id -> Postings bitmap
// postings list id -> Postings bitmap
Postings []*roaring.Bitmap
// Postings List has locations
@ -188,9 +188,11 @@ func (s *Segment) VisitDocument(num uint64, visitor segment.DocumentFieldValueVi
return nil
}
docFields := s.Stored[int(num)]
st := s.StoredTypes[int(num)]
sp := s.StoredPos[int(num)]
for field, values := range docFields {
for i, value := range values {
keepGoing := visitor(s.FieldsInv[field], s.StoredTypes[int(num)][field][i], value, s.StoredPos[int(num)][field][i])
keepGoing := visitor(s.FieldsInv[field], st[field][i], value, sp[field][i])
if !keepGoing {
return nil
}

View File

@ -140,12 +140,18 @@ func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error)
metaEncoder := govarint.NewU64Base128Encoder(&metaBuf)
st := memSegment.StoredTypes[docNum]
sp := memSegment.StoredPos[docNum]
// encode fields in order
for fieldID := range memSegment.FieldsInv {
if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok {
// has stored values for this field
num := len(storedFieldValues)
stf := st[uint16(fieldID)]
spf := sp[uint16(fieldID)]
// process each value
for i := 0; i < num; i++ {
// encode field
@ -154,7 +160,7 @@ func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error)
return 0, err2
}
// encode type
_, err2 = metaEncoder.PutU64(uint64(memSegment.StoredTypes[docNum][uint16(fieldID)][i]))
_, err2 = metaEncoder.PutU64(uint64(stf[i]))
if err2 != nil {
return 0, err2
}
@ -169,13 +175,13 @@ func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error)
return 0, err2
}
// encode number of array pos
_, err2 = metaEncoder.PutU64(uint64(len(memSegment.StoredPos[docNum][uint16(fieldID)][i])))
_, err2 = metaEncoder.PutU64(uint64(len(spf[i])))
if err2 != nil {
return 0, err2
}
// encode all array positions
for j := 0; j < len(memSegment.StoredPos[docNum][uint16(fieldID)][i]); j++ {
_, err2 = metaEncoder.PutU64(memSegment.StoredPos[docNum][uint16(fieldID)][i][j])
for _, pos := range spf[i] {
_, err2 = metaEncoder.PutU64(pos)
if err2 != nil {
return 0, err2
}
@ -235,6 +241,8 @@ func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFac
if postingID != 0 {
tfEncoder.Reset()
}
freqs := memSegment.Freqs[postingID]
norms := memSegment.Norms[postingID]
postingsListItr := memSegment.Postings[postingID].Iterator()
var offset int
for postingsListItr.HasNext() {
@ -242,13 +250,13 @@ func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFac
docNum := uint64(postingsListItr.Next())
// put freq
err := tfEncoder.Add(docNum, memSegment.Freqs[postingID][offset])
err := tfEncoder.Add(docNum, freqs[offset])
if err != nil {
return nil, nil, err
}
// put norm
norm := memSegment.Norms[postingID][offset]
norm := norms[offset]
normBits := math.Float32bits(norm)
err = tfEncoder.Add(docNum, uint64(normBits))
if err != nil {
@ -275,40 +283,46 @@ func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFac
if postingID != 0 {
locEncoder.Reset()
}
freqs := memSegment.Freqs[postingID]
locfields := memSegment.Locfields[postingID]
locpos := memSegment.Locpos[postingID]
locstarts := memSegment.Locstarts[postingID]
locends := memSegment.Locends[postingID]
locarraypos := memSegment.Locarraypos[postingID]
postingsListItr := memSegment.Postings[postingID].Iterator()
var offset int
var locOffset int
for postingsListItr.HasNext() {
docNum := uint64(postingsListItr.Next())
for i := 0; i < int(memSegment.Freqs[postingID][offset]); i++ {
if len(memSegment.Locfields[postingID]) > 0 {
for i := 0; i < int(freqs[offset]); i++ {
if len(locfields) > 0 {
// put field
err := locEncoder.Add(docNum, uint64(memSegment.Locfields[postingID][locOffset]))
err := locEncoder.Add(docNum, uint64(locfields[locOffset]))
if err != nil {
return nil, nil, err
}
// put pos
err = locEncoder.Add(docNum, memSegment.Locpos[postingID][locOffset])
err = locEncoder.Add(docNum, locpos[locOffset])
if err != nil {
return nil, nil, err
}
// put start
err = locEncoder.Add(docNum, memSegment.Locstarts[postingID][locOffset])
err = locEncoder.Add(docNum, locstarts[locOffset])
if err != nil {
return nil, nil, err
}
// put end
err = locEncoder.Add(docNum, memSegment.Locends[postingID][locOffset])
err = locEncoder.Add(docNum, locends[locOffset])
if err != nil {
return nil, nil, err
}
// put array positions
num := len(memSegment.Locarraypos[postingID][locOffset])
num := len(locarraypos[locOffset])
// put the number of array positions to follow
err = locEncoder.Add(docNum, uint64(num))
@ -317,8 +331,8 @@ func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFac
}
// put each array position
for j := 0; j < num; j++ {
err = locEncoder.Add(docNum, memSegment.Locarraypos[postingID][locOffset][j])
for _, pos := range locarraypos[locOffset] {
err = locEncoder.Add(docNum, pos)
if err != nil {
return nil, nil, err
}
@ -341,6 +355,7 @@ func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFac
}
func persistPostingsLocs(memSegment *mem.Segment, w *CountHashWriter) (rv []uint64, err error) {
rv = make([]uint64, 0, len(memSegment.PostingsLocs))
for postingID := range memSegment.PostingsLocs {
// record where we start this posting loc
rv = append(rv, uint64(w.Count()))
@ -355,6 +370,7 @@ func persistPostingsLocs(memSegment *mem.Segment, w *CountHashWriter) (rv []uint
func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter,
postingsListLocs, freqOffsets, locOffsets []uint64) (rv []uint64, err error) {
rv = make([]uint64, 0, len(memSegment.Postings))
for postingID := range memSegment.Postings {
// record where we start this posting list
rv = append(rv, uint64(w.Count()))
@ -376,7 +392,7 @@ func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter,
}
func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs []uint64) ([]uint64, error) {
var rv []uint64
rv := make([]uint64, 0, len(memSegment.DictKeys))
var buffer bytes.Buffer
for fieldID, fieldTerms := range memSegment.DictKeys {
@ -392,10 +408,10 @@ func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs
dict := memSegment.Dicts[fieldID]
// now walk the dictionary in order of fieldTerms (already sorted)
for i := range fieldTerms {
postingID := dict[fieldTerms[i]] - 1
for _, fieldTerm := range fieldTerms {
postingID := dict[fieldTerm] - 1
postingsAddr := postingsLocs[postingID]
err = builder.Insert([]byte(fieldTerms[i]), postingsAddr)
err = builder.Insert([]byte(fieldTerm), postingsAddr)
if err != nil {
return nil, err
}