Merge branch 'master' into mergeplanner_options
This commit is contained in:
commit
c45822347f
|
@ -100,8 +100,8 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
|||
// prepare new index snapshot
|
||||
newSnapshot := &IndexSnapshot{
|
||||
parent: s,
|
||||
segment: make([]*SegmentSnapshot, nsegs, nsegs+1),
|
||||
offsets: make([]uint64, nsegs, nsegs+1),
|
||||
segment: make([]*SegmentSnapshot, 0, nsegs+1),
|
||||
offsets: make([]uint64, 0, nsegs+1),
|
||||
internal: make(map[string][]byte, len(s.root.internal)),
|
||||
epoch: s.nextSnapshotEpoch,
|
||||
refs: 1,
|
||||
|
@ -124,24 +124,29 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
|||
return err
|
||||
}
|
||||
}
|
||||
newSnapshot.segment[i] = &SegmentSnapshot{
|
||||
|
||||
newss := &SegmentSnapshot{
|
||||
id: s.root.segment[i].id,
|
||||
segment: s.root.segment[i].segment,
|
||||
cachedDocs: s.root.segment[i].cachedDocs,
|
||||
}
|
||||
s.root.segment[i].segment.AddRef()
|
||||
|
||||
|
||||
// apply new obsoletions
|
||||
if s.root.segment[i].deleted == nil {
|
||||
newSnapshot.segment[i].deleted = delta
|
||||
newss.deleted = delta
|
||||
} else {
|
||||
newSnapshot.segment[i].deleted = roaring.Or(s.root.segment[i].deleted, delta)
|
||||
newss.deleted = roaring.Or(s.root.segment[i].deleted, delta)
|
||||
}
|
||||
|
||||
// check for live size before copying
|
||||
if newss.LiveSize() > 0 {
|
||||
newSnapshot.segment = append(newSnapshot.segment, newss)
|
||||
s.root.segment[i].segment.AddRef()
|
||||
newSnapshot.offsets = append(newSnapshot.offsets, running)
|
||||
running += s.root.segment[i].Count()
|
||||
}
|
||||
|
||||
newSnapshot.offsets[i] = running
|
||||
running += s.root.segment[i].Count()
|
||||
|
||||
}
|
||||
|
||||
// append new segment, if any, to end of the new index snapshot
|
||||
if next.data != nil {
|
||||
newSegmentSnapshot := &SegmentSnapshot{
|
||||
|
@ -193,6 +198,12 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
|||
// prepare new index snapshot
|
||||
currSize := len(s.root.segment)
|
||||
newSize := currSize + 1 - len(nextMerge.old)
|
||||
|
||||
// empty segments deletion
|
||||
if nextMerge.new == nil {
|
||||
newSize--
|
||||
}
|
||||
|
||||
newSnapshot := &IndexSnapshot{
|
||||
parent: s,
|
||||
segment: make([]*SegmentSnapshot, 0, newSize),
|
||||
|
@ -210,7 +221,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
|||
segmentID := s.root.segment[i].id
|
||||
if segSnapAtMerge, ok := nextMerge.old[segmentID]; ok {
|
||||
// this segment is going away, see if anything else was deleted since we started the merge
|
||||
if s.root.segment[i].deleted != nil {
|
||||
if segSnapAtMerge != nil && s.root.segment[i].deleted != nil {
|
||||
// assume all these deletes are new
|
||||
deletedSince := s.root.segment[i].deleted
|
||||
// if we already knew about some of them, remove
|
||||
|
@ -224,7 +235,13 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
|||
newSegmentDeleted.Add(uint32(newDocNum))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// clean up the old segment map to figure out the
|
||||
// obsolete segments wrt root in meantime, whatever
|
||||
// segments left behind in old map after processing
|
||||
// the root segments would be the obsolete segment set
|
||||
delete(nextMerge.old, segmentID)
|
||||
|
||||
} else if s.root.segment[i].LiveSize() > 0 {
|
||||
// this segment is staying
|
||||
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
|
||||
id: s.root.segment[i].id,
|
||||
|
@ -238,14 +255,35 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
|||
}
|
||||
}
|
||||
|
||||
// put new segment at end
|
||||
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
|
||||
id: nextMerge.id,
|
||||
segment: nextMerge.new, // take ownership for nextMerge.new's ref-count
|
||||
deleted: newSegmentDeleted,
|
||||
cachedDocs: &cachedDocs{cache: nil},
|
||||
})
|
||||
newSnapshot.offsets = append(newSnapshot.offsets, running)
|
||||
// before the newMerge introduction, need to clean the newly
|
||||
// merged segment wrt the current root segments, hence
|
||||
// applying the obsolete segment contents to newly merged segment
|
||||
for segID, ss := range nextMerge.old {
|
||||
obsoleted := ss.DocNumbersLive()
|
||||
if obsoleted != nil {
|
||||
obsoletedIter := obsoleted.Iterator()
|
||||
for obsoletedIter.HasNext() {
|
||||
oldDocNum := obsoletedIter.Next()
|
||||
newDocNum := nextMerge.oldNewDocNums[segID][oldDocNum]
|
||||
newSegmentDeleted.Add(uint32(newDocNum))
|
||||
}
|
||||
}
|
||||
}
|
||||
// In case where all the docs in the newly merged segment getting
|
||||
// deleted by the time we reach here, can skip the introduction.
|
||||
if nextMerge.new != nil &&
|
||||
nextMerge.new.Count() > newSegmentDeleted.GetCardinality() {
|
||||
// put new segment at end
|
||||
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
|
||||
id: nextMerge.id,
|
||||
segment: nextMerge.new, // take ownership for nextMerge.new's ref-count
|
||||
deleted: newSegmentDeleted,
|
||||
cachedDocs: &cachedDocs{cache: nil},
|
||||
})
|
||||
newSnapshot.offsets = append(newSnapshot.offsets, running)
|
||||
}
|
||||
|
||||
newSnapshot.AddRef() // 1 ref for the nextMerge.notify response
|
||||
|
||||
// swap in new segment
|
||||
rootPrev := s.root
|
||||
|
@ -257,7 +295,8 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
|
|||
_ = rootPrev.DecRef()
|
||||
}
|
||||
|
||||
// notify merger we incorporated this
|
||||
// notify requester that we incorporated this
|
||||
nextMerge.notify <- newSnapshot
|
||||
close(nextMerge.notify)
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,9 @@
|
|||
package scorch
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
|
||||
"fmt"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
|
@ -128,8 +130,12 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
|
|||
}
|
||||
|
||||
// process tasks in serial for now
|
||||
var notifications []notificationChan
|
||||
var notifications []chan *IndexSnapshot
|
||||
for _, task := range resultMergePlan.Tasks {
|
||||
if len(task.Segments) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
oldMap := make(map[uint64]*SegmentSnapshot)
|
||||
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
|
||||
segmentsToMerge := make([]*zap.Segment, 0, len(task.Segments))
|
||||
|
@ -138,36 +144,46 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
|
|||
if segSnapshot, ok := planSegment.(*SegmentSnapshot); ok {
|
||||
oldMap[segSnapshot.id] = segSnapshot
|
||||
if zapSeg, ok := segSnapshot.segment.(*zap.Segment); ok {
|
||||
segmentsToMerge = append(segmentsToMerge, zapSeg)
|
||||
docsToDrop = append(docsToDrop, segSnapshot.deleted)
|
||||
if segSnapshot.LiveSize() == 0 {
|
||||
oldMap[segSnapshot.id] = nil
|
||||
} else {
|
||||
segmentsToMerge = append(segmentsToMerge, zapSeg)
|
||||
docsToDrop = append(docsToDrop, segSnapshot.deleted)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
filename := zapFileName(newSegmentID)
|
||||
s.markIneligibleForRemoval(filename)
|
||||
path := s.path + string(os.PathSeparator) + filename
|
||||
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, DefaultChunkFactor)
|
||||
if err != nil {
|
||||
s.unmarkIneligibleForRemoval(filename)
|
||||
return fmt.Errorf("merging failed: %v", err)
|
||||
}
|
||||
segment, err := zap.Open(path)
|
||||
if err != nil {
|
||||
s.unmarkIneligibleForRemoval(filename)
|
||||
return err
|
||||
var oldNewDocNums map[uint64][]uint64
|
||||
var segment segment.Segment
|
||||
if len(segmentsToMerge) > 0 {
|
||||
filename := zapFileName(newSegmentID)
|
||||
s.markIneligibleForRemoval(filename)
|
||||
path := s.path + string(os.PathSeparator) + filename
|
||||
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024)
|
||||
if err != nil {
|
||||
s.unmarkIneligibleForRemoval(filename)
|
||||
return fmt.Errorf("merging failed: %v", err)
|
||||
}
|
||||
segment, err = zap.Open(path)
|
||||
if err != nil {
|
||||
s.unmarkIneligibleForRemoval(filename)
|
||||
return err
|
||||
}
|
||||
oldNewDocNums = make(map[uint64][]uint64)
|
||||
for i, segNewDocNums := range newDocNums {
|
||||
oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
|
||||
}
|
||||
}
|
||||
|
||||
sm := &segmentMerge{
|
||||
id: newSegmentID,
|
||||
old: oldMap,
|
||||
oldNewDocNums: make(map[uint64][]uint64),
|
||||
oldNewDocNums: oldNewDocNums,
|
||||
new: segment,
|
||||
notify: make(notificationChan),
|
||||
notify: make(chan *IndexSnapshot, 1),
|
||||
}
|
||||
notifications = append(notifications, sm.notify)
|
||||
for i, segNewDocNums := range newDocNums {
|
||||
sm.oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
|
||||
}
|
||||
|
||||
// give it to the introducer
|
||||
select {
|
||||
|
@ -181,7 +197,10 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
|
|||
select {
|
||||
case <-s.closeCh:
|
||||
return nil
|
||||
case <-notification:
|
||||
case newSnapshot := <-notification:
|
||||
if newSnapshot != nil {
|
||||
_ = newSnapshot.DecRef()
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -192,5 +211,72 @@ type segmentMerge struct {
|
|||
old map[uint64]*SegmentSnapshot
|
||||
oldNewDocNums map[uint64][]uint64
|
||||
new segment.Segment
|
||||
notify notificationChan
|
||||
notify chan *IndexSnapshot
|
||||
}
|
||||
|
||||
// perform a merging of the given SegmentBase instances into a new,
|
||||
// persisted segment, and synchronously introduce that new segment
|
||||
// into the root
|
||||
func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
|
||||
sbs []*zap.SegmentBase, sbsDrops []*roaring.Bitmap, sbsIndexes []int,
|
||||
chunkFactor uint32) (uint64, *IndexSnapshot, uint64, error) {
|
||||
var br bytes.Buffer
|
||||
|
||||
cr := zap.NewCountHashWriter(&br)
|
||||
|
||||
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset,
|
||||
docValueOffset, dictLocs, fieldsInv, fieldsMap, err :=
|
||||
zap.MergeToWriter(sbs, sbsDrops, chunkFactor, cr)
|
||||
if err != nil {
|
||||
return 0, nil, 0, err
|
||||
}
|
||||
|
||||
sb, err := zap.InitSegmentBase(br.Bytes(), cr.Sum32(), chunkFactor,
|
||||
fieldsMap, fieldsInv, numDocs, storedIndexOffset, fieldsIndexOffset,
|
||||
docValueOffset, dictLocs)
|
||||
if err != nil {
|
||||
return 0, nil, 0, err
|
||||
}
|
||||
|
||||
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
|
||||
|
||||
filename := zapFileName(newSegmentID)
|
||||
path := s.path + string(os.PathSeparator) + filename
|
||||
err = zap.PersistSegmentBase(sb, path)
|
||||
if err != nil {
|
||||
return 0, nil, 0, err
|
||||
}
|
||||
|
||||
segment, err := zap.Open(path)
|
||||
if err != nil {
|
||||
return 0, nil, 0, err
|
||||
}
|
||||
|
||||
sm := &segmentMerge{
|
||||
id: newSegmentID,
|
||||
old: make(map[uint64]*SegmentSnapshot),
|
||||
oldNewDocNums: make(map[uint64][]uint64),
|
||||
new: segment,
|
||||
notify: make(chan *IndexSnapshot, 1),
|
||||
}
|
||||
|
||||
for i, idx := range sbsIndexes {
|
||||
ss := snapshot.segment[idx]
|
||||
sm.old[ss.id] = ss
|
||||
sm.oldNewDocNums[ss.id] = newDocNums[i]
|
||||
}
|
||||
|
||||
select { // send to introducer
|
||||
case <-s.closeCh:
|
||||
_ = segment.DecRef()
|
||||
return 0, nil, 0, nil // TODO: return ErrInterruptedClosed?
|
||||
case s.merges <- sm:
|
||||
}
|
||||
|
||||
select { // wait for introduction to complete
|
||||
case <-s.closeCh:
|
||||
return 0, nil, 0, nil // TODO: return ErrInterruptedClosed?
|
||||
case newSnapshot := <-sm.notify:
|
||||
return numDocs, newSnapshot, newSegmentID, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -145,7 +145,100 @@ OUTER:
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) (err error) {
|
||||
func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
|
||||
persisted, err := s.persistSnapshotMaybeMerge(snapshot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if persisted {
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.persistSnapshotDirect(snapshot)
|
||||
}
|
||||
|
||||
// DefaultMinSegmentsForInMemoryMerge represents the default number of
|
||||
// in-memory zap segments that persistSnapshotMaybeMerge() needs to
|
||||
// see in an IndexSnapshot before it decides to merge and persist
|
||||
// those segments
|
||||
var DefaultMinSegmentsForInMemoryMerge = 2
|
||||
|
||||
// persistSnapshotMaybeMerge examines the snapshot and might merge and
|
||||
// persist the in-memory zap segments if there are enough of them
|
||||
func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
|
||||
bool, error) {
|
||||
// collect the in-memory zap segments (SegmentBase instances)
|
||||
var sbs []*zap.SegmentBase
|
||||
var sbsDrops []*roaring.Bitmap
|
||||
var sbsIndexes []int
|
||||
|
||||
for i, segmentSnapshot := range snapshot.segment {
|
||||
if sb, ok := segmentSnapshot.segment.(*zap.SegmentBase); ok {
|
||||
sbs = append(sbs, sb)
|
||||
sbsDrops = append(sbsDrops, segmentSnapshot.deleted)
|
||||
sbsIndexes = append(sbsIndexes, i)
|
||||
}
|
||||
}
|
||||
|
||||
if len(sbs) < DefaultMinSegmentsForInMemoryMerge {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
_, newSnapshot, newSegmentID, err := s.mergeSegmentBases(
|
||||
snapshot, sbs, sbsDrops, sbsIndexes, DefaultChunkFactor)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if newSnapshot == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = newSnapshot.DecRef()
|
||||
}()
|
||||
|
||||
mergedSegmentIDs := map[uint64]struct{}{}
|
||||
for _, idx := range sbsIndexes {
|
||||
mergedSegmentIDs[snapshot.segment[idx].id] = struct{}{}
|
||||
}
|
||||
|
||||
// construct a snapshot that's logically equivalent to the input
|
||||
// snapshot, but with merged segments replaced by the new segment
|
||||
equiv := &IndexSnapshot{
|
||||
parent: snapshot.parent,
|
||||
segment: make([]*SegmentSnapshot, 0, len(snapshot.segment)),
|
||||
internal: snapshot.internal,
|
||||
epoch: snapshot.epoch,
|
||||
}
|
||||
|
||||
// copy to the equiv the segments that weren't replaced
|
||||
for _, segment := range snapshot.segment {
|
||||
if _, wasMerged := mergedSegmentIDs[segment.id]; !wasMerged {
|
||||
equiv.segment = append(equiv.segment, segment)
|
||||
}
|
||||
}
|
||||
|
||||
// append to the equiv the new segment
|
||||
for _, segment := range newSnapshot.segment {
|
||||
if segment.id == newSegmentID {
|
||||
equiv.segment = append(equiv.segment, &SegmentSnapshot{
|
||||
id: newSegmentID,
|
||||
segment: segment.segment,
|
||||
deleted: nil, // nil since merging handled deletions
|
||||
})
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
err = s.persistSnapshotDirect(equiv)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
|
||||
// start a write transaction
|
||||
tx, err := s.rootBolt.Begin(true)
|
||||
if err != nil {
|
||||
|
|
|
@ -633,12 +633,21 @@ func NewSegmentBase(memSegment *mem.Segment, chunkFactor uint32) (*SegmentBase,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return InitSegmentBase(br.Bytes(), cr.Sum32(), chunkFactor,
|
||||
memSegment.FieldsMap, memSegment.FieldsInv, numDocs,
|
||||
storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs)
|
||||
}
|
||||
|
||||
func InitSegmentBase(mem []byte, memCRC uint32, chunkFactor uint32,
|
||||
fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64,
|
||||
storedIndexOffset uint64, fieldsIndexOffset uint64, docValueOffset uint64,
|
||||
dictLocs []uint64) (*SegmentBase, error) {
|
||||
sb := &SegmentBase{
|
||||
mem: br.Bytes(),
|
||||
memCRC: cr.Sum32(),
|
||||
mem: mem,
|
||||
memCRC: memCRC,
|
||||
chunkFactor: chunkFactor,
|
||||
fieldsMap: memSegment.FieldsMap,
|
||||
fieldsInv: memSegment.FieldsInv,
|
||||
fieldsMap: fieldsMap,
|
||||
fieldsInv: fieldsInv,
|
||||
numDocs: numDocs,
|
||||
storedIndexOffset: storedIndexOffset,
|
||||
fieldsIndexOffset: fieldsIndexOffset,
|
||||
|
@ -647,7 +656,7 @@ func NewSegmentBase(memSegment *mem.Segment, chunkFactor uint32) (*SegmentBase,
|
|||
fieldDvIterMap: make(map[uint16]*docValueIterator),
|
||||
}
|
||||
|
||||
err = sb.loadDvIterators()
|
||||
err := sb.loadDvIterators()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
|
|||
// wrap it for counting (tracking offsets)
|
||||
cr := NewCountHashWriter(br)
|
||||
|
||||
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, err :=
|
||||
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, _, _, _, err :=
|
||||
MergeToWriter(segmentBases, drops, chunkFactor, cr)
|
||||
if err != nil {
|
||||
cleanup()
|
||||
|
@ -99,26 +99,26 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
|
|||
chunkFactor uint32, cr *CountHashWriter) (
|
||||
newDocNums [][]uint64,
|
||||
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64,
|
||||
dictLocs []uint64, fieldsInv []string, fieldsMap map[string]uint16,
|
||||
err error) {
|
||||
docValueOffset = uint64(fieldNotUninverted)
|
||||
|
||||
var dictLocs []uint64
|
||||
|
||||
fieldsSame, fieldsInv := mergeFields(segments)
|
||||
fieldsMap := mapFields(fieldsInv)
|
||||
var fieldsSame bool
|
||||
fieldsSame, fieldsInv = mergeFields(segments)
|
||||
fieldsMap = mapFields(fieldsInv)
|
||||
|
||||
numDocs = computeNewDocCount(segments, drops)
|
||||
if numDocs > 0 {
|
||||
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
|
||||
fieldsMap, fieldsInv, fieldsSame, numDocs, cr)
|
||||
if err != nil {
|
||||
return nil, 0, 0, 0, 0, err
|
||||
return nil, 0, 0, 0, 0, nil, nil, nil, err
|
||||
}
|
||||
|
||||
dictLocs, docValueOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap,
|
||||
newDocNums, numDocs, chunkFactor, cr)
|
||||
if err != nil {
|
||||
return nil, 0, 0, 0, 0, err
|
||||
return nil, 0, 0, 0, 0, nil, nil, nil, err
|
||||
}
|
||||
} else {
|
||||
dictLocs = make([]uint64, len(fieldsInv))
|
||||
|
@ -126,17 +126,18 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
|
|||
|
||||
fieldsIndexOffset, err = persistFields(fieldsInv, cr, dictLocs)
|
||||
if err != nil {
|
||||
return nil, 0, 0, 0, 0, err
|
||||
return nil, 0, 0, 0, 0, nil, nil, nil, err
|
||||
}
|
||||
|
||||
return newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, nil
|
||||
return newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs, fieldsInv, fieldsMap, nil
|
||||
}
|
||||
|
||||
// mapFields takes the fieldsInv list and builds the map
|
||||
// mapFields takes the fieldsInv list and returns a map of fieldName
|
||||
// to fieldID+1
|
||||
func mapFields(fields []string) map[string]uint16 {
|
||||
rv := make(map[string]uint16, len(fields))
|
||||
for i, fieldName := range fields {
|
||||
rv[fieldName] = uint16(i)
|
||||
rv[fieldName] = uint16(i) + 1
|
||||
}
|
||||
return rv
|
||||
}
|
||||
|
@ -338,7 +339,7 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
|
|||
bufLoc = make([]uint64, 0, 5+len(loc.ArrayPositions()))
|
||||
}
|
||||
args := bufLoc[0:5]
|
||||
args[0] = uint64(fieldsMap[loc.Field()])
|
||||
args[0] = uint64(fieldsMap[loc.Field()] - 1)
|
||||
args[1] = loc.Pos()
|
||||
args[2] = loc.Start()
|
||||
args[3] = loc.End()
|
||||
|
@ -499,7 +500,7 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
|
|||
poss[i] = poss[i][:0]
|
||||
}
|
||||
err := segment.VisitDocument(docNum, func(field string, typ byte, value []byte, pos []uint64) bool {
|
||||
fieldID := int(fieldsMap[field])
|
||||
fieldID := int(fieldsMap[field]) - 1
|
||||
vals[fieldID] = append(vals[fieldID], value)
|
||||
typs[fieldID] = append(typs[fieldID], typ)
|
||||
poss[fieldID] = append(poss[fieldID], pos)
|
||||
|
@ -615,21 +616,21 @@ func mergeFields(segments []*SegmentBase) (bool, []string) {
|
|||
segment0Fields = segments[0].Fields()
|
||||
}
|
||||
|
||||
fieldsMap := map[string]struct{}{}
|
||||
fieldsExist := map[string]struct{}{}
|
||||
for _, segment := range segments {
|
||||
fields := segment.Fields()
|
||||
for fieldi, field := range fields {
|
||||
fieldsMap[field] = struct{}{}
|
||||
fieldsExist[field] = struct{}{}
|
||||
if len(segment0Fields) != len(fields) || segment0Fields[fieldi] != field {
|
||||
fieldsSame = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rv := make([]string, 0, len(fieldsMap))
|
||||
rv := make([]string, 0, len(fieldsExist))
|
||||
// ensure _id stays first
|
||||
rv = append(rv, "_id")
|
||||
for k := range fieldsMap {
|
||||
for k := range fieldsExist {
|
||||
if k != "_id" {
|
||||
rv = append(rv, k)
|
||||
}
|
||||
|
|
|
@ -149,10 +149,7 @@ func (s *Scorch) Rollback(to *RollbackPoint) error {
|
|||
|
||||
revert.snapshot = indexSnapshot
|
||||
revert.applied = make(chan error)
|
||||
|
||||
if !s.unsafeBatch {
|
||||
revert.persisted = make(chan error)
|
||||
}
|
||||
revert.persisted = make(chan error)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
@ -172,9 +169,5 @@ func (s *Scorch) Rollback(to *RollbackPoint) error {
|
|||
return fmt.Errorf("Rollback: failed with err: %v", err)
|
||||
}
|
||||
|
||||
if revert.persisted != nil {
|
||||
err = <-revert.persisted
|
||||
}
|
||||
|
||||
return err
|
||||
return <-revert.persisted
|
||||
}
|
||||
|
|
|
@ -837,6 +837,11 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
|||
docBackIndexRowErr = err
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if cerr := kvreader.Close(); err == nil && cerr != nil {
|
||||
docBackIndexRowErr = cerr
|
||||
}
|
||||
}()
|
||||
|
||||
for docID, doc := range batch.IndexOps {
|
||||
backIndexRow, err := backIndexRowForDoc(kvreader, index.IndexInternalID(docID))
|
||||
|
@ -847,12 +852,6 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
|||
|
||||
docBackIndexRowCh <- &docBackIndexRow{docID, doc, backIndexRow}
|
||||
}
|
||||
|
||||
err = kvreader.Close()
|
||||
if err != nil {
|
||||
docBackIndexRowErr = err
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
// wait for analysis result
|
||||
|
|
Loading…
Reference in New Issue