scorch zap.MergeToWriter() takes SegmentBases instead of Segments
This change turns zap.MergeToWriter() into a public func, so that it's now directly callable from outside packages (such as from scorch's top-level merger or persister). And, MergerToWriter() now takes input of SegmentBases instead of Segments, so that it can now work on either in-memory zap segments or file-based zap segments. This is yet another stepping stone towards in-memory merging of zap segments.
This commit is contained in:
parent
8c2520d55c
commit
a83ee0f364
|
@ -46,6 +46,11 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
|
||||||
_ = os.Remove(path)
|
_ = os.Remove(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
segmentBases := make([]*SegmentBase, len(segments))
|
||||||
|
for segmenti, segment := range segments {
|
||||||
|
segmentBases[segmenti] = &segment.SegmentBase
|
||||||
|
}
|
||||||
|
|
||||||
// buffer the output
|
// buffer the output
|
||||||
br := bufio.NewWriter(f)
|
br := bufio.NewWriter(f)
|
||||||
|
|
||||||
|
@ -53,7 +58,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
|
||||||
cr := NewCountHashWriter(br)
|
cr := NewCountHashWriter(br)
|
||||||
|
|
||||||
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, err :=
|
newDocNums, numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, err :=
|
||||||
mergeToWriter(segments, drops, chunkFactor, cr)
|
MergeToWriter(segmentBases, drops, chunkFactor, cr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cleanup()
|
cleanup()
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -87,7 +92,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
|
||||||
return newDocNums, nil
|
return newDocNums, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func mergeToWriter(segments []*Segment, drops []*roaring.Bitmap,
|
func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
|
||||||
chunkFactor uint32, cr *CountHashWriter) (
|
chunkFactor uint32, cr *CountHashWriter) (
|
||||||
newDocNums [][]uint64,
|
newDocNums [][]uint64,
|
||||||
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64,
|
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64,
|
||||||
|
@ -135,10 +140,10 @@ func mapFields(fields []string) map[string]uint16 {
|
||||||
|
|
||||||
// computeNewDocCount determines how many documents will be in the newly
|
// computeNewDocCount determines how many documents will be in the newly
|
||||||
// merged segment when obsoleted docs are dropped
|
// merged segment when obsoleted docs are dropped
|
||||||
func computeNewDocCount(segments []*Segment, drops []*roaring.Bitmap) uint64 {
|
func computeNewDocCount(segments []*SegmentBase, drops []*roaring.Bitmap) uint64 {
|
||||||
var newDocCount uint64
|
var newDocCount uint64
|
||||||
for segI, segment := range segments {
|
for segI, segment := range segments {
|
||||||
newDocCount += segment.NumDocs()
|
newDocCount += segment.numDocs
|
||||||
if drops[segI] != nil {
|
if drops[segI] != nil {
|
||||||
newDocCount -= drops[segI].GetCardinality()
|
newDocCount -= drops[segI].GetCardinality()
|
||||||
}
|
}
|
||||||
|
@ -146,7 +151,7 @@ func computeNewDocCount(segments []*Segment, drops []*roaring.Bitmap) uint64 {
|
||||||
return newDocCount
|
return newDocCount
|
||||||
}
|
}
|
||||||
|
|
||||||
func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
func persistMergedRest(segments []*SegmentBase, drops []*roaring.Bitmap,
|
||||||
fieldsInv []string, fieldsMap map[string]uint16, newDocNums [][]uint64,
|
fieldsInv []string, fieldsMap map[string]uint16, newDocNums [][]uint64,
|
||||||
newSegDocCount uint64, chunkFactor uint32,
|
newSegDocCount uint64, chunkFactor uint32,
|
||||||
w *CountHashWriter) ([]uint64, uint64, error) {
|
w *CountHashWriter) ([]uint64, uint64, error) {
|
||||||
|
@ -408,7 +413,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
||||||
|
|
||||||
const docDropped = math.MaxUint64
|
const docDropped = math.MaxUint64
|
||||||
|
|
||||||
func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap,
|
func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
|
||||||
fieldsMap map[string]uint16, fieldsInv []string, newSegDocCount uint64,
|
fieldsMap map[string]uint16, fieldsInv []string, newSegDocCount uint64,
|
||||||
w *CountHashWriter) (uint64, [][]uint64, error) {
|
w *CountHashWriter) (uint64, [][]uint64, error) {
|
||||||
var rv [][]uint64 // The remapped or newDocNums for each segment.
|
var rv [][]uint64 // The remapped or newDocNums for each segment.
|
||||||
|
@ -523,7 +528,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap,
|
||||||
}
|
}
|
||||||
|
|
||||||
// mergeFields builds a unified list of fields used across all the input segments
|
// mergeFields builds a unified list of fields used across all the input segments
|
||||||
func mergeFields(segments []*Segment) []string {
|
func mergeFields(segments []*SegmentBase) []string {
|
||||||
fieldsMap := map[string]struct{}{}
|
fieldsMap := map[string]struct{}{}
|
||||||
for _, segment := range segments {
|
for _, segment := range segments {
|
||||||
fields := segment.Fields()
|
fields := segment.Fields()
|
||||||
|
|
Loading…
Reference in New Issue