major refactoring of posting details
This commit is contained in:
parent
6e2207c445
commit
85e15628ee
@ -47,6 +47,9 @@ func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (e
|
||||
cr := NewCountHashWriter(br)
|
||||
|
||||
var storedIndexOffset uint64
|
||||
var dictLocs []uint64
|
||||
if len(memSegment.Stored) > 0 {
|
||||
|
||||
storedIndexOffset, err = persistStored(memSegment, cr)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -70,12 +73,15 @@ func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (e
|
||||
return err
|
||||
}
|
||||
|
||||
var dictLocs []uint64
|
||||
dictLocs, err = persistDictionary(memSegment, cr, postingsLocs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
} else {
|
||||
dictLocs = make([]uint64, len(memSegment.FieldsInv))
|
||||
}
|
||||
|
||||
var fieldIndexStart uint64
|
||||
fieldIndexStart, err = persistFields(memSegment.FieldsInv, cr, dictLocs)
|
||||
if err != nil {
|
||||
@ -215,40 +221,19 @@ func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error)
|
||||
|
||||
func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) {
|
||||
var freqOffsets, locOfffsets []uint64
|
||||
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
|
||||
for postingID := range memSegment.Postings {
|
||||
if postingID != 0 {
|
||||
tfEncoder.Reset()
|
||||
}
|
||||
postingsListItr := memSegment.Postings[postingID].Iterator()
|
||||
|
||||
total := uint64(len(memSegment.Stored))/uint64(chunkFactor) + 1
|
||||
|
||||
var freqNormBuf []byte
|
||||
var offset int
|
||||
|
||||
var encodingBuf bytes.Buffer
|
||||
encoder := govarint.NewU64Base128Encoder(&encodingBuf)
|
||||
|
||||
chunkLens := make([]uint64, total)
|
||||
var currChunk uint64
|
||||
for postingsListItr.HasNext() {
|
||||
docNum := postingsListItr.Next()
|
||||
chunk := uint64(docNum) / uint64(chunkFactor)
|
||||
|
||||
if chunk != currChunk {
|
||||
// starting a new chunk
|
||||
if encoder != nil {
|
||||
// close out last
|
||||
encoder.Close()
|
||||
encodingBytes := encodingBuf.Bytes()
|
||||
chunkLens[currChunk] = uint64(len(encodingBytes))
|
||||
freqNormBuf = append(freqNormBuf, encodingBytes...)
|
||||
encodingBuf.Reset()
|
||||
encoder = govarint.NewU64Base128Encoder(&encodingBuf)
|
||||
}
|
||||
|
||||
currChunk = chunk
|
||||
}
|
||||
docNum := uint64(postingsListItr.Next())
|
||||
|
||||
// put freq
|
||||
_, err := encoder.PutU64(memSegment.Freqs[postingID][offset])
|
||||
err := tfEncoder.Add(docNum, memSegment.Freqs[postingID][offset])
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -256,7 +241,7 @@ func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFac
|
||||
// put norm
|
||||
norm := memSegment.Norms[postingID][offset]
|
||||
normBits := math.Float32bits(norm)
|
||||
_, err = encoder.PutU32(normBits)
|
||||
err = tfEncoder.Add(docNum, uint64(normBits))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -264,35 +249,11 @@ func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFac
|
||||
offset++
|
||||
}
|
||||
|
||||
// close out last chunk
|
||||
if encoder != nil {
|
||||
// fix me write freq/norms
|
||||
encoder.Close()
|
||||
encodingBytes := encodingBuf.Bytes()
|
||||
chunkLens[currChunk] = uint64(len(encodingBytes))
|
||||
freqNormBuf = append(freqNormBuf, encodingBytes...)
|
||||
}
|
||||
|
||||
// record where this postings freq info starts
|
||||
freqOffsets = append(freqOffsets, uint64(w.Count()))
|
||||
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
// write out the number of chunks
|
||||
n := binary.PutUvarint(buf, uint64(total))
|
||||
_, err := w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// write out the chunk lens
|
||||
for _, chunkLen := range chunkLens {
|
||||
n := binary.PutUvarint(buf, uint64(chunkLen))
|
||||
_, err = w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
// write out the data
|
||||
_, err = w.Write(freqNormBuf)
|
||||
tfEncoder.Close()
|
||||
_, err := tfEncoder.Write(w)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -300,61 +261,39 @@ func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFac
|
||||
}
|
||||
|
||||
// now do it again for the locations
|
||||
locEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
|
||||
for postingID := range memSegment.Postings {
|
||||
if postingID != 0 {
|
||||
locEncoder.Reset()
|
||||
}
|
||||
postingsListItr := memSegment.Postings[postingID].Iterator()
|
||||
|
||||
total := uint64(len(memSegment.Stored))/uint64(chunkFactor) + 1
|
||||
|
||||
var locBuf []byte
|
||||
var offset int
|
||||
var locOffset int
|
||||
|
||||
var encodingBuf bytes.Buffer
|
||||
encoder := govarint.NewU64Base128Encoder(&encodingBuf)
|
||||
|
||||
chunkLens := make([]uint64, total)
|
||||
var currChunk uint64
|
||||
for postingsListItr.HasNext() {
|
||||
docNum := postingsListItr.Next()
|
||||
chunk := uint64(docNum) / uint64(chunkFactor)
|
||||
|
||||
if chunk != currChunk {
|
||||
// starting a new chunk
|
||||
if encoder != nil {
|
||||
// close out last
|
||||
encoder.Close()
|
||||
encodingBytes := encodingBuf.Bytes()
|
||||
chunkLens[currChunk] = uint64(len(encodingBytes))
|
||||
locBuf = append(locBuf, encodingBytes...)
|
||||
encodingBuf.Reset()
|
||||
encoder = govarint.NewU64Base128Encoder(&encodingBuf)
|
||||
}
|
||||
currChunk = chunk
|
||||
}
|
||||
|
||||
docNum := uint64(postingsListItr.Next())
|
||||
for i := 0; i < int(memSegment.Freqs[postingID][offset]); i++ {
|
||||
|
||||
if len(memSegment.Locfields[postingID]) > 0 {
|
||||
// put field
|
||||
_, err := encoder.PutU64(uint64(memSegment.Locfields[postingID][locOffset]))
|
||||
err := locEncoder.Add(docNum, uint64(memSegment.Locfields[postingID][locOffset]))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// put pos
|
||||
_, err = encoder.PutU64(memSegment.Locpos[postingID][locOffset])
|
||||
|
||||
err = locEncoder.Add(docNum, memSegment.Locpos[postingID][locOffset])
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// put start
|
||||
_, err = encoder.PutU64(memSegment.Locstarts[postingID][locOffset])
|
||||
err = locEncoder.Add(docNum, memSegment.Locstarts[postingID][locOffset])
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// put end
|
||||
_, err = encoder.PutU64(memSegment.Locends[postingID][locOffset])
|
||||
err = locEncoder.Add(docNum, memSegment.Locends[postingID][locOffset])
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -363,58 +302,31 @@ func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFac
|
||||
num := len(memSegment.Locarraypos[postingID][locOffset])
|
||||
|
||||
// put the number of array positions to follow
|
||||
_, err = encoder.PutU64(uint64(num))
|
||||
err = locEncoder.Add(docNum, uint64(num))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// put each array position
|
||||
for j := 0; j < num; j++ {
|
||||
_, err = encoder.PutU64(memSegment.Locarraypos[postingID][locOffset][j])
|
||||
err = locEncoder.Add(docNum, memSegment.Locarraypos[postingID][locOffset][j])
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
locOffset++
|
||||
}
|
||||
offset++
|
||||
}
|
||||
|
||||
// close out last chunk
|
||||
if encoder != nil {
|
||||
// fix me write freq/norms
|
||||
encoder.Close()
|
||||
encodingBytes := encodingBuf.Bytes()
|
||||
chunkLens[currChunk] = uint64(len(encodingBytes))
|
||||
locBuf = append(locBuf, encodingBytes...)
|
||||
}
|
||||
|
||||
// record where this postings loc info starts
|
||||
locOfffsets = append(locOfffsets, uint64(w.Count()))
|
||||
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
// write out the number of chunks
|
||||
n := binary.PutUvarint(buf, uint64(total))
|
||||
_, err := w.Write(buf[:n])
|
||||
locEncoder.Close()
|
||||
_, err := locEncoder.Write(w)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// write out the chunk lens
|
||||
for _, chunkLen := range chunkLens {
|
||||
n := binary.PutUvarint(buf, uint64(chunkLen))
|
||||
_, err = w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
// write out the data
|
||||
_, err = w.Write(locBuf)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
}
|
||||
return freqOffsets, locOfffsets, nil
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
|
||||
|
||||
var newDocNums [][]uint64
|
||||
var storedIndexOffset uint64
|
||||
dictLocs := make([]uint64, len(fieldsInv))
|
||||
var dictLocs []uint64
|
||||
if newSegDocCount > 0 {
|
||||
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
|
||||
fieldsMap, fieldsInv, newSegDocCount, cr)
|
||||
@ -48,10 +48,12 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
|
||||
}
|
||||
|
||||
dictLocs, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap,
|
||||
newDocNums, newSegDocCount, cr)
|
||||
newDocNums, newSegDocCount, chunkFactor, cr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
dictLocs = make([]uint64, len(fieldsInv))
|
||||
}
|
||||
|
||||
var fieldsIndexOffset uint64
|
||||
@ -108,7 +110,8 @@ func computeNewDocCount(segments []*Segment, drops []*roaring.Bitmap) uint64 {
|
||||
}
|
||||
|
||||
func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
||||
fieldsInv []string, fieldsMap map[string]uint16, newDocNums [][]uint64, newSegDocCount uint64,
|
||||
fieldsInv []string, fieldsMap map[string]uint16, newDocNums [][]uint64,
|
||||
newSegDocCount uint64, chunkFactor uint32,
|
||||
w *CountHashWriter) ([]uint64, error) {
|
||||
|
||||
rv := make([]uint64, len(fieldsInv))
|
||||
@ -149,8 +152,8 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
||||
return 0
|
||||
})
|
||||
|
||||
tfEncoder := newChunkedIntCoder(1024, newSegDocCount-1)
|
||||
locEncoder := newChunkedIntCoder(1024, newSegDocCount-1)
|
||||
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
|
||||
locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
|
||||
for err == nil {
|
||||
term, _ := mergeItr.Current()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user