0
0
Fork 0

Merge pull request #779 from steveyen/wip-in-mem-seg-merging

merging of in-memory segments during persistSnapshot
This commit is contained in:
Steve Yen 2018-02-23 14:02:02 -08:00 committed by GitHub
commit 19080c1ae5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 205 additions and 28 deletions

View File

@ -247,6 +247,8 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
})
newSnapshot.offsets = append(newSnapshot.offsets, running)
newSnapshot.AddRef() // 1 ref for the nextMerge.notify response
// swap in new segment
rootPrev := s.root
s.root = newSnapshot
@ -257,7 +259,8 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
_ = rootPrev.DecRef()
}
// notify merger we incorporated this
// notify requester that we incorporated this
nextMerge.notify <- newSnapshot
close(nextMerge.notify)
}

View File

@ -15,6 +15,7 @@
package scorch
import (
"bytes"
"fmt"
"os"
"sync/atomic"
@ -102,7 +103,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
}
// process tasks in serial for now
var notifications []notificationChan
var notifications []chan *IndexSnapshot
for _, task := range resultMergePlan.Tasks {
oldMap := make(map[uint64]*SegmentSnapshot)
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
@ -136,7 +137,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
old: oldMap,
oldNewDocNums: make(map[uint64][]uint64),
new: segment,
notify: make(notificationChan),
notify: make(chan *IndexSnapshot, 1),
}
notifications = append(notifications, sm.notify)
for i, segNewDocNums := range newDocNums {
@ -155,7 +156,10 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
select {
case <-s.closeCh:
return nil
case <-notification:
case newSnapshot := <-notification:
if newSnapshot != nil {
_ = newSnapshot.DecRef()
}
}
}
return nil
@ -166,5 +170,72 @@ type segmentMerge struct {
old map[uint64]*SegmentSnapshot
oldNewDocNums map[uint64][]uint64
new segment.Segment
notify notificationChan
notify chan *IndexSnapshot
}
// perform a merging of the given SegmentBase instances into a new,
// persisted segment, and synchronously introduce that new segment
// into the root
func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
sbs []*zap.SegmentBase, sbsDrops []*roaring.Bitmap, sbsIndexes []int,
chunkFactor uint32) (uint64, *IndexSnapshot, uint64, error) {
var br bytes.Buffer
cr := zap.NewCountHashWriter(&br)
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset,
docValueOffset, dictLocs, fieldsInv, fieldsMap, err :=
zap.MergeToWriter(sbs, sbsDrops, chunkFactor, cr)
if err != nil {
return 0, nil, 0, err
}
sb, err := zap.InitSegmentBase(br.Bytes(), cr.Sum32(), chunkFactor,
fieldsMap, fieldsInv, numDocs, storedIndexOffset, fieldsIndexOffset,
docValueOffset, dictLocs)
if err != nil {
return 0, nil, 0, err
}
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
filename := zapFileName(newSegmentID)
path := s.path + string(os.PathSeparator) + filename
err = zap.PersistSegmentBase(sb, path)
if err != nil {
return 0, nil, 0, err
}
segment, err := zap.Open(path)
if err != nil {
return 0, nil, 0, err
}
sm := &segmentMerge{
id: newSegmentID,
old: make(map[uint64]*SegmentSnapshot),
oldNewDocNums: make(map[uint64][]uint64),
new: segment,
notify: make(chan *IndexSnapshot, 1),
}
for i, idx := range sbsIndexes {
ss := snapshot.segment[idx]
sm.old[ss.id] = ss
sm.oldNewDocNums[ss.id] = newDocNums[i]
}
select { // send to introducer
case <-s.closeCh:
_ = segment.DecRef()
return 0, nil, 0, nil // TODO: return ErrInterruptedClosed?
case s.merges <- sm:
}
select { // wait for introduction to complete
case <-s.closeCh:
return 0, nil, 0, nil // TODO: return ErrInterruptedClosed?
case newSnapshot := <-sm.notify:
return numDocs, newSnapshot, newSegmentID, nil
}
}

View File

@ -145,7 +145,100 @@ OUTER:
}
}
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) {
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
persisted, err := s.persistSnapshotMaybeMerge(snapshot)
if err != nil {
return err
}
if persisted {
return nil
}
return s.persistSnapshotDirect(snapshot)
}
// DefaultMinSegmentsForInMemoryMerge represents the default number of
// in-memory zap segments that persistSnapshotMaybeMerge() needs to
// see in an IndexSnapshot before it decides to merge and persist
// those segments
var DefaultMinSegmentsForInMemoryMerge = 2
// persistSnapshotMaybeMerge examines the snapshot and might merge and
// persist the in-memory zap segments if there are enough of them
func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
bool, error) {
// collect the in-memory zap segments (SegmentBase instances)
var sbs []*zap.SegmentBase
var sbsDrops []*roaring.Bitmap
var sbsIndexes []int
for i, segmentSnapshot := range snapshot.segment {
if sb, ok := segmentSnapshot.segment.(*zap.SegmentBase); ok {
sbs = append(sbs, sb)
sbsDrops = append(sbsDrops, segmentSnapshot.deleted)
sbsIndexes = append(sbsIndexes, i)
}
}
if len(sbs) < DefaultMinSegmentsForInMemoryMerge {
return false, nil
}
_, newSnapshot, newSegmentID, err := s.mergeSegmentBases(
snapshot, sbs, sbsDrops, sbsIndexes, DefaultChunkFactor)
if err != nil {
return false, err
}
if newSnapshot == nil {
return false, nil
}
defer func() {
_ = newSnapshot.DecRef()
}()
mergedSegmentIDs := map[uint64]struct{}{}
for _, idx := range sbsIndexes {
mergedSegmentIDs[snapshot.segment[idx].id] = struct{}{}
}
// construct a snapshot that's logically equivalent to the input
// snapshot, but with merged segments replaced by the new segment
equiv := &IndexSnapshot{
parent: snapshot.parent,
segment: make([]*SegmentSnapshot, 0, len(snapshot.segment)),
internal: snapshot.internal,
epoch: snapshot.epoch,
}
// copy to the equiv the segments that weren't replaced
for _, segment := range snapshot.segment {
if _, wasMerged := mergedSegmentIDs[segment.id]; !wasMerged {
equiv.segment = append(equiv.segment, segment)
}
}
// append to the equiv the new segment
for _, segment := range newSnapshot.segment {
if segment.id == newSegmentID {
equiv.segment = append(equiv.segment, &SegmentSnapshot{
id: newSegmentID,
segment: segment.segment,
deleted: nil, // nil since merging handled deletions
})
break
}
}
err = s.persistSnapshotDirect(equiv)
if err != nil {
return false, err
}
return true, nil
}
func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
// start a write transaction
tx, err := s.rootBolt.Begin(true)
if err != nil {

View File

@ -633,12 +633,21 @@ func NewSegmentBase(memSegment *mem.Segment, chunkFactor uint32) (*SegmentBase,
return nil, err
}
return InitSegmentBase(br.Bytes(), cr.Sum32(), chunkFactor,
memSegment.FieldsMap, memSegment.FieldsInv, numDocs,
storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs)
}
func InitSegmentBase(mem []byte, memCRC uint32, chunkFactor uint32,
fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64,
storedIndexOffset uint64, fieldsIndexOffset uint64, docValueOffset uint64,
dictLocs []uint64) (*SegmentBase, error) {
sb := &SegmentBase{
mem: br.Bytes(),
memCRC: cr.Sum32(),
mem: mem,
memCRC: memCRC,
chunkFactor: chunkFactor,
fieldsMap: memSegment.FieldsMap,
fieldsInv: memSegment.FieldsInv,
fieldsMap: fieldsMap,
fieldsInv: fieldsInv,
numDocs: numDocs,
storedIndexOffset: storedIndexOffset,
fieldsIndexOffset: fieldsIndexOffset,
@ -647,7 +656,7 @@ func NewSegmentBase(memSegment *mem.Segment, chunkFactor uint32) (*SegmentBase,
fieldDvIterMap: make(map[uint16]*docValueIterator),
}
err = sb.loadDvIterators()
err := sb.loadDvIterators()
if err != nil {
return nil, err
}

View File

@ -60,7 +60,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
// wrap it for counting (tracking offsets)
cr := NewCountHashWriter(br)
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, err :=
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, _, _, _, err :=
MergeToWriter(segmentBases, drops, chunkFactor, cr)
if err != nil {
cleanup()
@ -99,26 +99,26 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
chunkFactor uint32, cr *CountHashWriter) (
newDocNums [][]uint64,
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64,
dictLocs []uint64, fieldsInv []string, fieldsMap map[string]uint16,
err error) {
docValueOffset = uint64(fieldNotUninverted)
var dictLocs []uint64
fieldsSame, fieldsInv := mergeFields(segments)
fieldsMap := mapFields(fieldsInv)
var fieldsSame bool
fieldsSame, fieldsInv = mergeFields(segments)
fieldsMap = mapFields(fieldsInv)
numDocs = computeNewDocCount(segments, drops)
if numDocs > 0 {
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
fieldsMap, fieldsInv, fieldsSame, numDocs, cr)
if err != nil {
return nil, 0, 0, 0, 0, err
return nil, 0, 0, 0, 0, nil, nil, nil, err
}
dictLocs, docValueOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap,
newDocNums, numDocs, chunkFactor, cr)
if err != nil {
return nil, 0, 0, 0, 0, err
return nil, 0, 0, 0, 0, nil, nil, nil, err
}
} else {
dictLocs = make([]uint64, len(fieldsInv))
@ -126,17 +126,18 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
fieldsIndexOffset, err = persistFields(fieldsInv, cr, dictLocs)
if err != nil {
return nil, 0, 0, 0, 0, err
return nil, 0, 0, 0, 0, nil, nil, nil, err
}
return newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, nil
return newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs, fieldsInv, fieldsMap, nil
}
// mapFields takes the fieldsInv list and builds the map
// mapFields takes the fieldsInv list and returns a map of fieldName
// to fieldID+1
func mapFields(fields []string) map[string]uint16 {
rv := make(map[string]uint16, len(fields))
for i, fieldName := range fields {
rv[fieldName] = uint16(i)
rv[fieldName] = uint16(i) + 1
}
return rv
}
@ -338,7 +339,7 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions()))
}
args := bufLoc[0:5]
args[0] = uint64(fieldsMap[loc.Field()])
args[0] = uint64(fieldsMap[loc.Field()] - 1)
args[1] = loc.Pos()
args[2] = loc.Start()
args[3] = loc.End()
@ -499,7 +500,7 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
poss[i] = poss[i][:0]
}
err := segment.VisitDocument(docNum, func(field string, typ byte, value []byte, pos []uint64) bool {
fieldID := int(fieldsMap[field])
fieldID := int(fieldsMap[field]) - 1
vals[fieldID] = append(vals[fieldID], value)
typs[fieldID] = append(typs[fieldID], typ)
poss[fieldID] = append(poss[fieldID], pos)
@ -615,21 +616,21 @@ func mergeFields(segments []*SegmentBase) (bool, []string) {
segment0Fields = segments[0].Fields()
}
fieldsMap := map[string]struct{}{}
fieldsExist := map[string]struct{}{}
for _, segment := range segments {
fields := segment.Fields()
for fieldi, field := range fields {
fieldsMap[field] = struct{}{}
fieldsExist[field] = struct{}{}
if len(segment0Fields) != len(fields) || segment0Fields[fieldi] != field {
fieldsSame = false
}
}
}
rv := make([]string, 0, len(fieldsMap))
rv := make([]string, 0, len(fieldsExist))
// ensure _id stays first
rv = append(rv, "_id")
for k := range fieldsMap {
for k := range fieldsExist {
if k != "_id" {
rv = append(rv, k)
}