0
0
Fork 0

scorch zap merge file cleanup on error, and some minor prealloc's

This commit is contained in:
Steve Yen 2018-01-24 09:22:10 -08:00
parent 29d526a7c2
commit d389e2bb40
1 changed files with 21 additions and 8 deletions

View File

@ -29,10 +29,10 @@ import (
"github.com/golang/snappy"
)
// Merge takes a slice of zap segments, bit masks describing which documents
// from the may be dropped, and creates a new segment containing the remaining
// data. This new segment is built at the specified path, with the provided
// chunkFactor.
// Merge takes a slice of zap segments and bit masks describing which
// documents may be dropped, and creates a new segment containing the
// remaining data. This new segment is built at the specified path,
// with the provided chunkFactor.
func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
chunkFactor uint32) ([][]uint64, error) {
flag := os.O_RDWR | os.O_CREATE
@ -42,6 +42,11 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
return nil, err
}
cleanup := func() {
_ = f.Close()
_ = os.Remove(path)
}
// buffer the output
br := bufio.NewWriter(f)
@ -50,22 +55,25 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
fieldsInv := mergeFields(segments)
fieldsMap := mapFields(fieldsInv)
newSegDocCount := computeNewDocCount(segments, drops)
var newDocNums [][]uint64
var storedIndexOffset uint64
fieldDvLocsOffset := uint64(fieldNotUninverted)
var dictLocs []uint64
newSegDocCount := computeNewDocCount(segments, drops)
if newSegDocCount > 0 {
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
fieldsMap, fieldsInv, newSegDocCount, cr)
if err != nil {
cleanup()
return nil, err
}
dictLocs, fieldDvLocsOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap,
newDocNums, newSegDocCount, chunkFactor, cr)
if err != nil {
cleanup()
return nil, err
}
} else {
@ -74,27 +82,32 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
fieldsIndexOffset, err := persistFields(fieldsInv, cr, dictLocs)
if err != nil {
cleanup()
return nil, err
}
err = persistFooter(newSegDocCount, storedIndexOffset,
fieldsIndexOffset, fieldDvLocsOffset, chunkFactor, cr.Sum32(), cr)
if err != nil {
cleanup()
return nil, err
}
err = br.Flush()
if err != nil {
cleanup()
return nil, err
}
err = f.Sync()
if err != nil {
cleanup()
return nil, err
}
err = f.Close()
if err != nil {
cleanup()
return nil, err
}
@ -103,7 +116,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
// mapFields takes the fieldsInv list and builds the map
func mapFields(fields []string) map[string]uint16 {
rv := make(map[string]uint16)
rv := make(map[string]uint16, len(fields))
for i, fieldName := range fields {
rv[fieldName] = uint16(i)
}
@ -327,7 +340,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
rv1[fieldID] = dictOffset
// update the doc value
var docNumbers docIDRange
docNumbers := make(docIDRange, 0, len(docTermMap))
for k := range docTermMap {
docNumbers = append(docNumbers, k)
}
@ -353,7 +366,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
}
fieldDvLocsOffset = uint64(w.Count())
buf := make([]byte, binary.MaxVarintLen64)
buf := bufMaxVarintLen64
for _, offset := range fieldDvLocs {
n := binary.PutUvarint(buf, uint64(offset))
_, err := w.Write(buf[:n])