scorch zap mergeSegmentBases() func
As part of this, zap.MergeToWriter() now returns more information -- enough so that callers can now create their own SegmentBase instances. Also, the fieldsMap maintained and returned by zap.MergeToWriter() is now a mapping from fieldName ==> fieldID+1 (instead of the previous mapping from fieldName ==> fieldID). This makes it similar to how fieldsMap are handled in other parts of zap to avoid "zero value" issues.
This commit is contained in:
parent
720010783e
commit
a0b7508da7
|
@ -15,6 +15,7 @@
|
|||
package scorch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
|
@ -168,3 +169,57 @@ type segmentMerge struct {
|
|||
new segment.Segment
|
||||
notify notificationChan
|
||||
}
|
||||
|
||||
// perform in-memory merging of the given SegmentBase instances, and
|
||||
// synchronously introduce the merged segment into the root
|
||||
func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
|
||||
sbs []*zap.SegmentBase, sbsDrops []*roaring.Bitmap, sbsIndexes []int,
|
||||
chunkFactor uint32) (uint64, error) {
|
||||
var br bytes.Buffer
|
||||
|
||||
cr := zap.NewCountHashWriter(&br)
|
||||
|
||||
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset,
|
||||
docValueOffset, dictLocs, fieldsInv, fieldsMap, err :=
|
||||
zap.MergeToWriter(sbs, sbsDrops, chunkFactor, cr)
|
||||
if err != nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
segment, err := zap.InitSegmentBase(br.Bytes(), cr.Sum32(), chunkFactor,
|
||||
fieldsMap, fieldsInv, numDocs, storedIndexOffset, fieldsIndexOffset,
|
||||
docValueOffset, dictLocs)
|
||||
if err != nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
|
||||
|
||||
sm := &segmentMerge{
|
||||
id: newSegmentID,
|
||||
old: make(map[uint64]*SegmentSnapshot),
|
||||
oldNewDocNums: make(map[uint64][]uint64),
|
||||
new: segment,
|
||||
notify: make(notificationChan),
|
||||
}
|
||||
|
||||
for i, idx := range sbsIndexes {
|
||||
ss := snapshot.segment[idx]
|
||||
sm.old[ss.id] = ss
|
||||
sm.oldNewDocNums[ss.id] = newDocNums[i]
|
||||
}
|
||||
|
||||
select { // send to introducer
|
||||
case <-s.closeCh:
|
||||
return 0, nil // TODO: instead return some ErrInterruptedClosed?
|
||||
case s.merges <- sm:
|
||||
}
|
||||
|
||||
select { // wait for introduction to complete
|
||||
case <-s.closeCh:
|
||||
return 0, nil // TODO: instead return some ErrInterruptedClosed?
|
||||
case <-sm.notify:
|
||||
}
|
||||
|
||||
return numDocs, nil
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
|
|||
// wrap it for counting (tracking offsets)
|
||||
cr := NewCountHashWriter(br)
|
||||
|
||||
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, err :=
|
||||
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, _, _, _, err :=
|
||||
MergeToWriter(segmentBases, drops, chunkFactor, cr)
|
||||
if err != nil {
|
||||
cleanup()
|
||||
|
@ -99,26 +99,26 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
|
|||
chunkFactor uint32, cr *CountHashWriter) (
|
||||
newDocNums [][]uint64,
|
||||
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64,
|
||||
dictLocs []uint64, fieldsInv []string, fieldsMap map[string]uint16,
|
||||
err error) {
|
||||
docValueOffset = uint64(fieldNotUninverted)
|
||||
|
||||
var dictLocs []uint64
|
||||
|
||||
fieldsSame, fieldsInv := mergeFields(segments)
|
||||
fieldsMap := mapFields(fieldsInv)
|
||||
var fieldsSame bool
|
||||
fieldsSame, fieldsInv = mergeFields(segments)
|
||||
fieldsMap = mapFields(fieldsInv)
|
||||
|
||||
numDocs = computeNewDocCount(segments, drops)
|
||||
if numDocs > 0 {
|
||||
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
|
||||
fieldsMap, fieldsInv, fieldsSame, numDocs, cr)
|
||||
if err != nil {
|
||||
return nil, 0, 0, 0, 0, err
|
||||
return nil, 0, 0, 0, 0, nil, nil, nil, err
|
||||
}
|
||||
|
||||
dictLocs, docValueOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap,
|
||||
newDocNums, numDocs, chunkFactor, cr)
|
||||
if err != nil {
|
||||
return nil, 0, 0, 0, 0, err
|
||||
return nil, 0, 0, 0, 0, nil, nil, nil, err
|
||||
}
|
||||
} else {
|
||||
dictLocs = make([]uint64, len(fieldsInv))
|
||||
|
@ -126,17 +126,18 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
|
|||
|
||||
fieldsIndexOffset, err = persistFields(fieldsInv, cr, dictLocs)
|
||||
if err != nil {
|
||||
return nil, 0, 0, 0, 0, err
|
||||
return nil, 0, 0, 0, 0, nil, nil, nil, err
|
||||
}
|
||||
|
||||
return newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, nil
|
||||
return newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs, fieldsInv, fieldsMap, nil
|
||||
}
|
||||
|
||||
// mapFields takes the fieldsInv list and builds the map
|
||||
// mapFields takes the fieldsInv list and returns a map of fieldName
|
||||
// to fieldID+1
|
||||
func mapFields(fields []string) map[string]uint16 {
|
||||
rv := make(map[string]uint16, len(fields))
|
||||
for i, fieldName := range fields {
|
||||
rv[fieldName] = uint16(i)
|
||||
rv[fieldName] = uint16(i) + 1
|
||||
}
|
||||
return rv
|
||||
}
|
||||
|
@ -338,7 +339,7 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
|
|||
bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions()))
|
||||
}
|
||||
args := bufLoc[0:5]
|
||||
args[0] = uint64(fieldsMap[loc.Field()])
|
||||
args[0] = uint64(fieldsMap[loc.Field()] - 1)
|
||||
args[1] = loc.Pos()
|
||||
args[2] = loc.Start()
|
||||
args[3] = loc.End()
|
||||
|
@ -499,7 +500,7 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
|
|||
poss[i] = poss[i][:0]
|
||||
}
|
||||
err := segment.VisitDocument(docNum, func(field string, typ byte, value []byte, pos []uint64) bool {
|
||||
fieldID := int(fieldsMap[field])
|
||||
fieldID := int(fieldsMap[field]) - 1
|
||||
vals[fieldID] = append(vals[fieldID], value)
|
||||
typs[fieldID] = append(typs[fieldID], typ)
|
||||
poss[fieldID] = append(poss[fieldID], pos)
|
||||
|
@ -615,21 +616,21 @@ func mergeFields(segments []*SegmentBase) (bool, []string) {
|
|||
segment0Fields = segments[0].Fields()
|
||||
}
|
||||
|
||||
fieldsMap := map[string]struct{}{}
|
||||
fieldsExist := map[string]struct{}{}
|
||||
for _, segment := range segments {
|
||||
fields := segment.Fields()
|
||||
for fieldi, field := range fields {
|
||||
fieldsMap[field] = struct{}{}
|
||||
fieldsExist[field] = struct{}{}
|
||||
if len(segment0Fields) != len(fields) || segment0Fields[fieldi] != field {
|
||||
fieldsSame = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rv := make([]string, 0, len(fieldsMap))
|
||||
rv := make([]string, 0, len(fieldsExist))
|
||||
// ensure _id stays first
|
||||
rv = append(rv, "_id")
|
||||
for k := range fieldsMap {
|
||||
for k := range fieldsExist {
|
||||
if k != "_id" {
|
||||
rv = append(rv, k)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue