Merge branch 'master' into minor_docvalue_space_savings
This commit is contained in:
commit
53c3cab512
|
@ -111,6 +111,11 @@ func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions,
|
|||
if err != nil {
|
||||
return &mergePlannerOptions, err
|
||||
}
|
||||
|
||||
err = mergeplan.ValidateMergePlannerOptions(&mergePlannerOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &mergePlannerOptions, nil
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package mergeplan
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
@ -115,7 +116,15 @@ func (o *MergePlanOptions) RaiseToFloorSegmentSize(s int64) int64 {
|
|||
return o.FloorSegmentSize
|
||||
}
|
||||
|
||||
// Suggested default options.
|
||||
// MaxSegmentSizeLimit represents the maximum size of a segment,
|
||||
// this limit comes with hit-1 optimisation/max encoding limit uint31.
|
||||
const MaxSegmentSizeLimit = 1<<31 - 1
|
||||
|
||||
// ErrMaxSegmentSizeTooLarge is returned when the size of the segment
|
||||
// exceeds the MaxSegmentSizeLimit
|
||||
var ErrMaxSegmentSizeTooLarge = errors.New("MaxSegmentSize exceeds the size limit")
|
||||
|
||||
// DefaultMergePlanOptions suggests the default options.
|
||||
var DefaultMergePlanOptions = MergePlanOptions{
|
||||
MaxSegmentsPerTier: 10,
|
||||
MaxSegmentSize: 5000000,
|
||||
|
@ -367,3 +376,11 @@ func ToBarChart(prefix string, barMax int, segments []Segment, plan *MergePlan)
|
|||
|
||||
return strings.Join(rv, "\n")
|
||||
}
|
||||
|
||||
// ValidateMergePlannerOptions validates the merge planner options
|
||||
func ValidateMergePlannerOptions(options *MergePlanOptions) error {
|
||||
if options.MaxSegmentSize > MaxSegmentSizeLimit {
|
||||
return ErrMaxSegmentSizeTooLarge
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -17,10 +17,12 @@ package mergeplan
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Implements the Segment interface for testing,
|
||||
|
@ -401,6 +403,62 @@ func TestManySameSizedSegmentsWithDeletesBetweenMerges(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestValidateMergePlannerOptions(t *testing.T) {
|
||||
o := &MergePlanOptions{
|
||||
MaxSegmentSize: 1 << 32,
|
||||
MaxSegmentsPerTier: 3,
|
||||
TierGrowth: 3.0,
|
||||
SegmentsPerMergeTask: 3,
|
||||
}
|
||||
err := ValidateMergePlannerOptions(o)
|
||||
if err != ErrMaxSegmentSizeTooLarge {
|
||||
t.Error("Validation expected to fail as the MaxSegmentSize exceeds limit")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlanMaxSegmentSizeLimit(t *testing.T) {
|
||||
o := &MergePlanOptions{
|
||||
MaxSegmentSize: 20,
|
||||
MaxSegmentsPerTier: 5,
|
||||
TierGrowth: 3.0,
|
||||
SegmentsPerMergeTask: 5,
|
||||
FloorSegmentSize: 5,
|
||||
}
|
||||
segments := makeLinearSegments(20)
|
||||
|
||||
s := rand.NewSource(time.Now().UnixNano())
|
||||
r := rand.New(s)
|
||||
|
||||
max := 20
|
||||
min := 5
|
||||
randomInRange := func() int64 {
|
||||
return int64(r.Intn(max-min) + min)
|
||||
}
|
||||
for i := 1; i < 20; i++ {
|
||||
o.MaxSegmentSize = randomInRange()
|
||||
plans, err := Plan(segments, o)
|
||||
if err != nil {
|
||||
t.Errorf("Plan failed, err: %v", err)
|
||||
}
|
||||
if len(plans.Tasks) == 0 {
|
||||
t.Errorf("expected some plans with tasks")
|
||||
}
|
||||
|
||||
for _, task := range plans.Tasks {
|
||||
var totalLiveSize int64
|
||||
for _, segs := range task.Segments {
|
||||
totalLiveSize += segs.LiveSize()
|
||||
|
||||
}
|
||||
if totalLiveSize >= o.MaxSegmentSize {
|
||||
t.Errorf("merged segments size: %d exceeding the MaxSegmentSize"+
|
||||
"limit: %d", totalLiveSize, o.MaxSegmentSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ----------------------------------------
|
||||
|
||||
type testCyclesSpec struct {
|
||||
|
|
|
@ -633,14 +633,14 @@ func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) {
|
|||
return 0, err
|
||||
}
|
||||
|
||||
if len(persistedEpochs) <= NumSnapshotsToKeep {
|
||||
if len(persistedEpochs) <= s.numSnapshotsToKeep {
|
||||
// we need to keep everything
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// make a map of epochs to protect from deletion
|
||||
protectedEpochs := make(map[uint64]struct{}, NumSnapshotsToKeep)
|
||||
for _, epoch := range persistedEpochs[0:NumSnapshotsToKeep] {
|
||||
protectedEpochs := make(map[uint64]struct{}, s.numSnapshotsToKeep)
|
||||
for _, epoch := range persistedEpochs[0:s.numSnapshotsToKeep] {
|
||||
protectedEpochs[epoch] = struct{}{}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,6 @@ import (
|
|||
"github.com/blevesearch/bleve/document"
|
||||
"github.com/blevesearch/bleve/index"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment/mem"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment/zap"
|
||||
"github.com/blevesearch/bleve/index/store"
|
||||
"github.com/blevesearch/bleve/registry"
|
||||
|
@ -58,6 +57,7 @@ type Scorch struct {
|
|||
nextSnapshotEpoch uint64
|
||||
eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC.
|
||||
ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet.
|
||||
numSnapshotsToKeep int
|
||||
|
||||
closeCh chan struct{}
|
||||
introductions chan *segmentIntroduction
|
||||
|
@ -191,6 +191,17 @@ func (s *Scorch) openBolt() error {
|
|||
}
|
||||
}
|
||||
|
||||
s.numSnapshotsToKeep = NumSnapshotsToKeep
|
||||
if v, ok := s.config["numSnapshotsToKeep"]; ok {
|
||||
var t int
|
||||
if t, err = parseToInteger(v); err != nil {
|
||||
return fmt.Errorf("numSnapshotsToKeep parse err: %v", err)
|
||||
}
|
||||
if t > 0 {
|
||||
s.numSnapshotsToKeep = t
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -289,7 +300,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
|
|||
|
||||
var newSegment segment.Segment
|
||||
if len(analysisResults) > 0 {
|
||||
newSegment, err = zap.NewSegmentBase(mem.NewFromAnalyzedDocs(analysisResults), DefaultChunkFactor)
|
||||
newSegment, err = zap.AnalysisResultsToSegmentBase(analysisResults, DefaultChunkFactor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -504,3 +515,15 @@ func (s *Scorch) unmarkIneligibleForRemoval(filename string) {
|
|||
func init() {
|
||||
registry.RegisterIndexType(Name, NewScorch)
|
||||
}
|
||||
|
||||
func parseToInteger(i interface{}) (int, error) {
|
||||
switch v := i.(type) {
|
||||
case float64:
|
||||
return int(v), nil
|
||||
case int:
|
||||
return v, nil
|
||||
|
||||
default:
|
||||
return 0, fmt.Errorf("expects int or float64 value")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,16 +16,10 @@ package zap
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"math"
|
||||
"os"
|
||||
"sort"
|
||||
|
||||
"github.com/Smerity/govarint"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment/mem"
|
||||
"github.com/couchbase/vellum"
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
const version uint32 = 6
|
||||
|
@ -82,186 +76,6 @@ func PersistSegmentBase(sb *SegmentBase, path string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// PersistSegment takes the in-memory segment and persists it to
|
||||
// the specified path in the zap file format.
|
||||
func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) error {
|
||||
flag := os.O_RDWR | os.O_CREATE
|
||||
|
||||
f, err := os.OpenFile(path, flag, 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cleanup := func() {
|
||||
_ = f.Close()
|
||||
_ = os.Remove(path)
|
||||
}
|
||||
|
||||
// buffer the output
|
||||
br := bufio.NewWriter(f)
|
||||
|
||||
// wrap it for counting (tracking offsets)
|
||||
cr := NewCountHashWriter(br)
|
||||
|
||||
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, _, err :=
|
||||
persistBase(memSegment, cr, chunkFactor)
|
||||
if err != nil {
|
||||
cleanup()
|
||||
return err
|
||||
}
|
||||
|
||||
err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset,
|
||||
chunkFactor, cr.Sum32(), cr)
|
||||
if err != nil {
|
||||
cleanup()
|
||||
return err
|
||||
}
|
||||
|
||||
err = br.Flush()
|
||||
if err != nil {
|
||||
cleanup()
|
||||
return err
|
||||
}
|
||||
|
||||
err = f.Sync()
|
||||
if err != nil {
|
||||
cleanup()
|
||||
return err
|
||||
}
|
||||
|
||||
err = f.Close()
|
||||
if err != nil {
|
||||
cleanup()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func persistBase(memSegment *mem.Segment, cr *CountHashWriter, chunkFactor uint32) (
|
||||
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64,
|
||||
dictLocs []uint64, err error) {
|
||||
docValueOffset = uint64(fieldNotUninverted)
|
||||
|
||||
if len(memSegment.Stored) > 0 {
|
||||
storedIndexOffset, err = persistStored(memSegment, cr)
|
||||
if err != nil {
|
||||
return 0, 0, 0, 0, nil, err
|
||||
}
|
||||
|
||||
freqOffsets, locOffsets, err := persistPostingDetails(memSegment, cr, chunkFactor)
|
||||
if err != nil {
|
||||
return 0, 0, 0, 0, nil, err
|
||||
}
|
||||
|
||||
postingsListLocs, err := persistPostingsLocs(memSegment, cr)
|
||||
if err != nil {
|
||||
return 0, 0, 0, 0, nil, err
|
||||
}
|
||||
|
||||
postingsLocs, err := persistPostingsLists(memSegment, cr, postingsListLocs, freqOffsets, locOffsets)
|
||||
if err != nil {
|
||||
return 0, 0, 0, 0, nil, err
|
||||
}
|
||||
|
||||
dictLocs, err = persistDictionary(memSegment, cr, postingsLocs)
|
||||
if err != nil {
|
||||
return 0, 0, 0, 0, nil, err
|
||||
}
|
||||
|
||||
docValueOffset, err = persistFieldDocValues(memSegment, cr, chunkFactor)
|
||||
if err != nil {
|
||||
return 0, 0, 0, 0, nil, err
|
||||
}
|
||||
} else {
|
||||
dictLocs = make([]uint64, len(memSegment.FieldsInv))
|
||||
}
|
||||
|
||||
fieldsIndexOffset, err = persistFields(memSegment.FieldsInv, cr, dictLocs)
|
||||
if err != nil {
|
||||
return 0, 0, 0, 0, nil, err
|
||||
}
|
||||
|
||||
return uint64(len(memSegment.Stored)), storedIndexOffset, fieldsIndexOffset, docValueOffset,
|
||||
dictLocs, nil
|
||||
}
|
||||
|
||||
func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) {
|
||||
var curr int
|
||||
var metaBuf bytes.Buffer
|
||||
var data, compressed []byte
|
||||
|
||||
metaEncoder := govarint.NewU64Base128Encoder(&metaBuf)
|
||||
|
||||
docNumOffsets := make(map[int]uint64, len(memSegment.Stored))
|
||||
|
||||
for docNum, storedValues := range memSegment.Stored {
|
||||
if docNum != 0 {
|
||||
// reset buffer if necessary
|
||||
curr = 0
|
||||
metaBuf.Reset()
|
||||
data = data[:0]
|
||||
compressed = compressed[:0]
|
||||
}
|
||||
|
||||
st := memSegment.StoredTypes[docNum]
|
||||
sp := memSegment.StoredPos[docNum]
|
||||
|
||||
// encode fields in order
|
||||
for fieldID := range memSegment.FieldsInv {
|
||||
if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok {
|
||||
stf := st[uint16(fieldID)]
|
||||
spf := sp[uint16(fieldID)]
|
||||
|
||||
var err2 error
|
||||
curr, data, err2 = persistStoredFieldValues(fieldID,
|
||||
storedFieldValues, stf, spf, curr, metaEncoder, data)
|
||||
if err2 != nil {
|
||||
return 0, err2
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metaEncoder.Close()
|
||||
metaBytes := metaBuf.Bytes()
|
||||
|
||||
// compress the data
|
||||
compressed = snappy.Encode(compressed, data)
|
||||
|
||||
// record where we're about to start writing
|
||||
docNumOffsets[docNum] = uint64(w.Count())
|
||||
|
||||
// write out the meta len and compressed data len
|
||||
_, err := writeUvarints(w, uint64(len(metaBytes)), uint64(len(compressed)))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// now write the meta
|
||||
_, err = w.Write(metaBytes)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// now write the compressed data
|
||||
_, err = w.Write(compressed)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
// return value is the start of the stored index
|
||||
rv := uint64(w.Count())
|
||||
// now write out the stored doc index
|
||||
for docNum := range memSegment.Stored {
|
||||
err := binary.Write(w, binary.BigEndian, docNumOffsets[docNum])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
func persistStoredFieldValues(fieldID int,
|
||||
storedFieldValues [][]byte, stf []byte, spf [][]uint64,
|
||||
curr int, metaEncoder *govarint.Base128Encoder, data []byte) (
|
||||
|
@ -307,308 +121,6 @@ func persistStoredFieldValues(fieldID int,
|
|||
return curr, data, nil
|
||||
}
|
||||
|
||||
func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) {
|
||||
freqOffsets := make([]uint64, 0, len(memSegment.Postings))
|
||||
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
|
||||
for postingID := range memSegment.Postings {
|
||||
if postingID != 0 {
|
||||
tfEncoder.Reset()
|
||||
}
|
||||
freqs := memSegment.Freqs[postingID]
|
||||
norms := memSegment.Norms[postingID]
|
||||
postingsListItr := memSegment.Postings[postingID].Iterator()
|
||||
var offset int
|
||||
for postingsListItr.HasNext() {
|
||||
docNum := uint64(postingsListItr.Next())
|
||||
|
||||
// put freq & norm
|
||||
err := tfEncoder.Add(docNum, freqs[offset], uint64(math.Float32bits(norms[offset])))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
offset++
|
||||
}
|
||||
|
||||
// record where this postings freq info starts
|
||||
freqOffsets = append(freqOffsets, uint64(w.Count()))
|
||||
|
||||
tfEncoder.Close()
|
||||
_, err := tfEncoder.Write(w)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// now do it again for the locations
|
||||
locOffsets := make([]uint64, 0, len(memSegment.Postings))
|
||||
locEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
|
||||
for postingID := range memSegment.Postings {
|
||||
if postingID != 0 {
|
||||
locEncoder.Reset()
|
||||
}
|
||||
freqs := memSegment.Freqs[postingID]
|
||||
locfields := memSegment.Locfields[postingID]
|
||||
locpos := memSegment.Locpos[postingID]
|
||||
locstarts := memSegment.Locstarts[postingID]
|
||||
locends := memSegment.Locends[postingID]
|
||||
locarraypos := memSegment.Locarraypos[postingID]
|
||||
postingsListItr := memSegment.Postings[postingID].Iterator()
|
||||
var offset int
|
||||
var locOffset int
|
||||
for postingsListItr.HasNext() {
|
||||
docNum := uint64(postingsListItr.Next())
|
||||
n := int(freqs[offset])
|
||||
for i := 0; i < n; i++ {
|
||||
if len(locfields) > 0 {
|
||||
err := locEncoder.Add(docNum, uint64(locfields[locOffset]),
|
||||
locpos[locOffset], locstarts[locOffset], locends[locOffset],
|
||||
uint64(len(locarraypos[locOffset])))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// put each array position
|
||||
err = locEncoder.Add(docNum, locarraypos[locOffset]...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
locOffset++
|
||||
}
|
||||
offset++
|
||||
}
|
||||
|
||||
// record where this postings loc info starts
|
||||
locOffsets = append(locOffsets, uint64(w.Count()))
|
||||
|
||||
locEncoder.Close()
|
||||
_, err := locEncoder.Write(w)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return freqOffsets, locOffsets, nil
|
||||
}
|
||||
|
||||
func persistPostingsLocs(memSegment *mem.Segment, w *CountHashWriter) (rv []uint64, err error) {
|
||||
rv = make([]uint64, 0, len(memSegment.PostingsLocs))
|
||||
reuseBufVarint := make([]byte, binary.MaxVarintLen64)
|
||||
for postingID := range memSegment.PostingsLocs {
|
||||
// record where we start this posting loc
|
||||
rv = append(rv, uint64(w.Count()))
|
||||
// write out the length and bitmap
|
||||
_, err = writeRoaringWithLen(memSegment.PostingsLocs[postingID], w, reuseBufVarint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter,
|
||||
postingsListLocs, freqOffsets, locOffsets []uint64) (rv []uint64, err error) {
|
||||
rv = make([]uint64, 0, len(memSegment.Postings))
|
||||
reuseBufVarint := make([]byte, binary.MaxVarintLen64)
|
||||
for postingID := range memSegment.Postings {
|
||||
// record where we start this posting list
|
||||
rv = append(rv, uint64(w.Count()))
|
||||
|
||||
// write out the term info, loc info, and loc posting list offset
|
||||
_, err = writeUvarints(w, freqOffsets[postingID],
|
||||
locOffsets[postingID], postingsListLocs[postingID])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// write out the length and bitmap
|
||||
_, err = writeRoaringWithLen(memSegment.Postings[postingID], w, reuseBufVarint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs []uint64) ([]uint64, error) {
|
||||
rv := make([]uint64, 0, len(memSegment.DictKeys))
|
||||
|
||||
varintBuf := make([]byte, binary.MaxVarintLen64)
|
||||
|
||||
var buffer bytes.Buffer
|
||||
builder, err := vellum.New(&buffer, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for fieldID, fieldTerms := range memSegment.DictKeys {
|
||||
|
||||
dict := memSegment.Dicts[fieldID]
|
||||
// now walk the dictionary in order of fieldTerms (already sorted)
|
||||
for _, fieldTerm := range fieldTerms {
|
||||
postingID := dict[fieldTerm] - 1
|
||||
postingsAddr := postingsLocs[postingID]
|
||||
err = builder.Insert([]byte(fieldTerm), postingsAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
err = builder.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// record where this dictionary starts
|
||||
rv = append(rv, uint64(w.Count()))
|
||||
|
||||
vellumData := buffer.Bytes()
|
||||
|
||||
// write out the length of the vellum data
|
||||
n := binary.PutUvarint(varintBuf, uint64(len(vellumData)))
|
||||
_, err = w.Write(varintBuf[:n])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// write this vellum to disk
|
||||
_, err = w.Write(vellumData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// reset buffer and vellum builder
|
||||
buffer.Reset()
|
||||
err = builder.Reset(&buffer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
type docIDRange []uint64
|
||||
|
||||
func (a docIDRange) Len() int { return len(a) }
|
||||
func (a docIDRange) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a docIDRange) Less(i, j int) bool { return a[i] < a[j] }
|
||||
|
||||
func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
|
||||
chunkFactor uint32) (map[uint16]uint64, error) {
|
||||
fieldChunkOffsets := make(map[uint16]uint64, len(memSegment.FieldsInv))
|
||||
fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
|
||||
|
||||
var postings *mem.PostingsList
|
||||
var postingsItr *mem.PostingsIterator
|
||||
|
||||
for fieldID := range memSegment.DocValueFields {
|
||||
field := memSegment.FieldsInv[fieldID]
|
||||
docTermMap := make(map[uint64][]byte, 0)
|
||||
dict, err := memSegment.Dictionary(field)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dictItr := dict.Iterator()
|
||||
next, err := dictItr.Next()
|
||||
for err == nil && next != nil {
|
||||
var err1 error
|
||||
postings, err1 = dict.(*mem.Dictionary).InitPostingsList(next.Term, nil, postings)
|
||||
if err1 != nil {
|
||||
return nil, err1
|
||||
}
|
||||
|
||||
postingsItr = postings.InitIterator(postingsItr)
|
||||
nextPosting, err2 := postingsItr.Next()
|
||||
for err2 == nil && nextPosting != nil {
|
||||
docNum := nextPosting.Number()
|
||||
docTermMap[docNum] = append(append(docTermMap[docNum], []byte(next.Term)...), termSeparator)
|
||||
nextPosting, err2 = postingsItr.Next()
|
||||
}
|
||||
if err2 != nil {
|
||||
return nil, err2
|
||||
}
|
||||
|
||||
next, err = dictItr.Next()
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// sort wrt to docIDs
|
||||
docNumbers := make(docIDRange, 0, len(docTermMap))
|
||||
for k := range docTermMap {
|
||||
docNumbers = append(docNumbers, k)
|
||||
}
|
||||
sort.Sort(docNumbers)
|
||||
|
||||
for _, docNum := range docNumbers {
|
||||
err = fdvEncoder.Add(docNum, docTermMap[docNum])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
fieldChunkOffsets[fieldID] = uint64(w.Count())
|
||||
err = fdvEncoder.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// persist the doc value details for this field
|
||||
_, err = fdvEncoder.Write(w)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// reseting encoder for the next field
|
||||
fdvEncoder.Reset()
|
||||
}
|
||||
|
||||
return fieldChunkOffsets, nil
|
||||
}
|
||||
|
||||
func persistFieldDocValues(memSegment *mem.Segment, w *CountHashWriter,
|
||||
chunkFactor uint32) (uint64, error) {
|
||||
fieldDvOffsets, err := persistDocValues(memSegment, w, chunkFactor)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
fieldDocValuesOffset := uint64(w.Count())
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
offset := uint64(0)
|
||||
ok := true
|
||||
for fieldID := range memSegment.FieldsInv {
|
||||
// if the field isn't configured for docValue, then mark
|
||||
// the offset accordingly
|
||||
if offset, ok = fieldDvOffsets[uint16(fieldID)]; !ok {
|
||||
offset = fieldNotUninverted
|
||||
}
|
||||
n := binary.PutUvarint(buf, uint64(offset))
|
||||
_, err := w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
return fieldDocValuesOffset, nil
|
||||
}
|
||||
|
||||
func NewSegmentBase(memSegment *mem.Segment, chunkFactor uint32) (*SegmentBase, error) {
|
||||
var br bytes.Buffer
|
||||
|
||||
cr := NewCountHashWriter(&br)
|
||||
|
||||
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs, err :=
|
||||
persistBase(memSegment, cr, chunkFactor)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return InitSegmentBase(br.Bytes(), cr.Sum32(), chunkFactor,
|
||||
memSegment.FieldsMap, memSegment.FieldsInv, numDocs,
|
||||
storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs)
|
||||
}
|
||||
|
||||
func InitSegmentBase(mem []byte, memCRC uint32, chunkFactor uint32,
|
||||
fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64,
|
||||
storedIndexOffset uint64, fieldsIndexOffset uint64, docValueOffset uint64,
|
||||
|
|
|
@ -21,20 +21,22 @@ import (
|
|||
"github.com/blevesearch/bleve/analysis"
|
||||
"github.com/blevesearch/bleve/document"
|
||||
"github.com/blevesearch/bleve/index"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment/mem"
|
||||
)
|
||||
|
||||
func TestBuild(t *testing.T) {
|
||||
_ = os.RemoveAll("/tmp/scorch.zap")
|
||||
|
||||
memSegment := buildMemSegment()
|
||||
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
|
||||
sb, err := buildTestSegment()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = PersistSegmentBase(sb, "/tmp/scorch.zap")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func buildMemSegment() *mem.Segment {
|
||||
func buildTestSegment() (*SegmentBase, error) {
|
||||
doc := &document.Document{
|
||||
ID: "a",
|
||||
Fields: []document.Field{
|
||||
|
@ -120,11 +122,22 @@ func buildMemSegment() *mem.Segment {
|
|||
}
|
||||
}
|
||||
|
||||
return mem.NewFromAnalyzedDocs(results)
|
||||
return AnalysisResultsToSegmentBase(results, 1024)
|
||||
}
|
||||
|
||||
func buildMemSegmentMulti() *mem.Segment {
|
||||
func buildTestSegmentMulti() (*SegmentBase, error) {
|
||||
results := buildTestAnalysisResultsMulti()
|
||||
|
||||
return AnalysisResultsToSegmentBase(results, 1024)
|
||||
}
|
||||
|
||||
func buildTestSegmentMultiWithChunkFactor(chunkFactor uint32) (*SegmentBase, error) {
|
||||
results := buildTestAnalysisResultsMulti()
|
||||
|
||||
return AnalysisResultsToSegmentBase(results, chunkFactor)
|
||||
}
|
||||
|
||||
func buildTestAnalysisResultsMulti() []*index.AnalysisResult {
|
||||
doc := &document.Document{
|
||||
ID: "a",
|
||||
Fields: []document.Field{
|
||||
|
@ -282,13 +295,11 @@ func buildMemSegmentMulti() *mem.Segment {
|
|||
}
|
||||
}
|
||||
|
||||
segment := mem.NewFromAnalyzedDocs(results)
|
||||
|
||||
return segment
|
||||
return results
|
||||
}
|
||||
|
||||
func buildMemSegmentWithDefaultFieldMapping() (*mem.Segment, []string) {
|
||||
|
||||
func buildTestSegmentWithDefaultFieldMapping(chunkFactor uint32) (
|
||||
*SegmentBase, []string, error) {
|
||||
doc := &document.Document{
|
||||
ID: "a",
|
||||
Fields: []document.Field{
|
||||
|
@ -371,5 +382,7 @@ func buildMemSegmentWithDefaultFieldMapping() (*mem.Segment, []string) {
|
|||
}
|
||||
}
|
||||
|
||||
return mem.NewFromAnalyzedDocs(results), fields
|
||||
sb, err := AnalysisResultsToSegmentBase(results, chunkFactor)
|
||||
|
||||
return sb, fields, err
|
||||
}
|
||||
|
|
|
@ -154,9 +154,11 @@ func (c *chunkedContentCoder) Write(w io.Writer) (int, error) {
|
|||
if err != nil {
|
||||
return tw, err
|
||||
}
|
||||
// write out the chunk lens
|
||||
for _, chunkLen := range c.chunkLens {
|
||||
n := binary.PutUvarint(buf, uint64(chunkLen))
|
||||
|
||||
chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens)
|
||||
// write out the chunk offsets
|
||||
for _, chunkOffset := range chunkOffsets {
|
||||
n := binary.PutUvarint(buf, chunkOffset)
|
||||
nw, err = w.Write(buf[:n])
|
||||
tw += nw
|
||||
if err != nil {
|
||||
|
|
|
@ -46,7 +46,7 @@ func TestChunkContentCoder(t *testing.T) {
|
|||
[]byte("scorch"),
|
||||
},
|
||||
|
||||
expected: string([]byte{0x02, 0x0b, 0x0b, 0x01, 0x00, 0x06, 0x06, 0x14,
|
||||
expected: string([]byte{0x02, 0x0b, 0x16, 0x01, 0x00, 0x06, 0x06, 0x14,
|
||||
0x75, 0x70, 0x73, 0x69, 0x64, 0x65, 0x01, 0x01, 0x06, 0x06,
|
||||
0x14, 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68}),
|
||||
},
|
||||
|
|
|
@ -68,7 +68,19 @@ func (d *Dictionary) postingsListInit(rv *PostingsList, except *roaring.Bitmap)
|
|||
if rv == nil {
|
||||
rv = &PostingsList{}
|
||||
} else {
|
||||
postings := rv.postings
|
||||
if postings != nil {
|
||||
postings.Clear()
|
||||
}
|
||||
locBitmap := rv.locBitmap
|
||||
if locBitmap != nil {
|
||||
locBitmap.Clear()
|
||||
}
|
||||
|
||||
*rv = PostingsList{} // clear the struct
|
||||
|
||||
rv.postings = postings
|
||||
rv.locBitmap = locBitmap
|
||||
}
|
||||
rv.sb = d.sb
|
||||
rv.except = except
|
||||
|
|
|
@ -22,10 +22,9 @@ import (
|
|||
"github.com/blevesearch/bleve/analysis"
|
||||
"github.com/blevesearch/bleve/document"
|
||||
"github.com/blevesearch/bleve/index"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment/mem"
|
||||
)
|
||||
|
||||
func buildMemSegmentForDict() *mem.Segment {
|
||||
func buildTestSegmentForDict() (*SegmentBase, error) {
|
||||
doc := &document.Document{
|
||||
ID: "a",
|
||||
Fields: []document.Field{
|
||||
|
@ -99,17 +98,15 @@ func buildMemSegmentForDict() *mem.Segment {
|
|||
},
|
||||
}
|
||||
|
||||
segment := mem.NewFromAnalyzedDocs(results)
|
||||
|
||||
return segment
|
||||
return AnalysisResultsToSegmentBase(results, 1024)
|
||||
}
|
||||
|
||||
func TestDictionary(t *testing.T) {
|
||||
|
||||
_ = os.RemoveAll("/tmp/scorch.zap")
|
||||
|
||||
memSegment := buildMemSegmentForDict()
|
||||
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
|
||||
testSeg, _ := buildTestSegmentForDict()
|
||||
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
|
||||
if err != nil {
|
||||
t.Fatalf("error persisting segment: %v", err)
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ type docValueIterator struct {
|
|||
field string
|
||||
curChunkNum uint64
|
||||
numChunks uint64
|
||||
chunkLens []uint64
|
||||
chunkOffsets []uint64
|
||||
dvDataLoc uint64
|
||||
curChunkHeader []MetaData
|
||||
curChunkData []byte // compressed data cache
|
||||
|
@ -47,7 +47,7 @@ type docValueIterator struct {
|
|||
func (di *docValueIterator) size() int {
|
||||
return reflectStaticSizedocValueIterator + size.SizeOfPtr +
|
||||
len(di.field) +
|
||||
len(di.chunkLens)*size.SizeOfUint64 +
|
||||
len(di.chunkOffsets)*size.SizeOfUint64 +
|
||||
len(di.curChunkHeader)*reflectStaticSizeMetaData +
|
||||
len(di.curChunkData)
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ func (s *SegmentBase) loadFieldDocValueIterator(field string,
|
|||
}
|
||||
|
||||
// read the number of chunks, chunk lengths
|
||||
var offset, clen uint64
|
||||
var offset, loc uint64
|
||||
numChunks, read := binary.Uvarint(s.mem[fieldDvLoc : fieldDvLoc+binary.MaxVarintLen64])
|
||||
if read <= 0 {
|
||||
return nil, fmt.Errorf("failed to read the field "+
|
||||
|
@ -78,16 +78,16 @@ func (s *SegmentBase) loadFieldDocValueIterator(field string,
|
|||
offset += uint64(read)
|
||||
|
||||
fdvIter := &docValueIterator{
|
||||
curChunkNum: math.MaxUint64,
|
||||
field: field,
|
||||
chunkLens: make([]uint64, int(numChunks)),
|
||||
curChunkNum: math.MaxUint64,
|
||||
field: field,
|
||||
chunkOffsets: make([]uint64, int(numChunks)),
|
||||
}
|
||||
for i := 0; i < int(numChunks); i++ {
|
||||
clen, read = binary.Uvarint(s.mem[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64])
|
||||
loc, read = binary.Uvarint(s.mem[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64])
|
||||
if read <= 0 {
|
||||
return nil, fmt.Errorf("corrupted chunk length during segment load")
|
||||
return nil, fmt.Errorf("corrupted chunk offset during segment load")
|
||||
}
|
||||
fdvIter.chunkLens[i] = clen
|
||||
fdvIter.chunkOffsets[i] = loc
|
||||
offset += uint64(read)
|
||||
}
|
||||
|
||||
|
@ -99,12 +99,11 @@ func (di *docValueIterator) loadDvChunk(chunkNumber,
|
|||
localDocNum uint64, s *SegmentBase) error {
|
||||
// advance to the chunk where the docValues
|
||||
// reside for the given docNum
|
||||
destChunkDataLoc := di.dvDataLoc
|
||||
for i := 0; i < int(chunkNumber); i++ {
|
||||
destChunkDataLoc += di.chunkLens[i]
|
||||
}
|
||||
destChunkDataLoc, curChunkEnd := di.dvDataLoc, di.dvDataLoc
|
||||
start, end := readChunkBoundary(int(chunkNumber), di.chunkOffsets)
|
||||
destChunkDataLoc += start
|
||||
curChunkEnd += end
|
||||
|
||||
curChunkSize := di.chunkLens[chunkNumber]
|
||||
// read the number of docs reside in the chunk
|
||||
numDocs, read := binary.Uvarint(s.mem[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64])
|
||||
if read <= 0 {
|
||||
|
@ -122,7 +121,7 @@ func (di *docValueIterator) loadDvChunk(chunkNumber,
|
|||
}
|
||||
|
||||
compressedDataLoc := chunkMetaLoc + offset
|
||||
dataLength := destChunkDataLoc + curChunkSize - compressedDataLoc
|
||||
dataLength := curChunkEnd - compressedDataLoc
|
||||
di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength]
|
||||
di.curChunkNum = chunkNumber
|
||||
return nil
|
||||
|
|
|
@ -111,10 +111,13 @@ func (c *chunkedIntCoder) Write(w io.Writer) (int, error) {
|
|||
}
|
||||
buf := c.buf
|
||||
|
||||
// write out the number of chunks & each chunkLen
|
||||
n := binary.PutUvarint(buf, uint64(len(c.chunkLens)))
|
||||
for _, chunkLen := range c.chunkLens {
|
||||
n += binary.PutUvarint(buf[n:], uint64(chunkLen))
|
||||
// convert the chunk lengths into chunk offsets
|
||||
chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens)
|
||||
|
||||
// write out the number of chunks & each chunk offsets
|
||||
n := binary.PutUvarint(buf, uint64(len(chunkOffsets)))
|
||||
for _, chunkOffset := range chunkOffsets {
|
||||
n += binary.PutUvarint(buf[n:], chunkOffset)
|
||||
}
|
||||
|
||||
tw, err := w.Write(buf[:n])
|
||||
|
@ -134,3 +137,36 @@ func (c *chunkedIntCoder) Write(w io.Writer) (int, error) {
|
|||
func (c *chunkedIntCoder) FinalSize() int {
|
||||
return len(c.final)
|
||||
}
|
||||
|
||||
// modifyLengthsToEndOffsets converts the chunk length array
|
||||
// to a chunk offset array. The readChunkBoundary
|
||||
// will figure out the start and end of every chunk from
|
||||
// these offsets. Starting offset of i'th index is stored
|
||||
// in i-1'th position except for 0'th index and ending offset
|
||||
// is stored at i'th index position.
|
||||
// For 0'th element, starting position is always zero.
|
||||
// eg:
|
||||
// Lens -> 5 5 5 5 => 5 10 15 20
|
||||
// Lens -> 0 5 0 5 => 0 5 5 10
|
||||
// Lens -> 0 0 0 5 => 0 0 0 5
|
||||
// Lens -> 5 0 0 0 => 5 5 5 5
|
||||
// Lens -> 0 5 0 0 => 0 5 5 5
|
||||
// Lens -> 0 0 5 0 => 0 0 5 5
|
||||
func modifyLengthsToEndOffsets(lengths []uint64) []uint64 {
|
||||
var runningOffset uint64
|
||||
var index, i int
|
||||
for i = 1; i <= len(lengths); i++ {
|
||||
runningOffset += lengths[i-1]
|
||||
lengths[index] = runningOffset
|
||||
index++
|
||||
}
|
||||
return lengths
|
||||
}
|
||||
|
||||
func readChunkBoundary(chunk int, offsets []uint64) (uint64, uint64) {
|
||||
var start uint64
|
||||
if chunk > 0 {
|
||||
start = offsets[chunk-1]
|
||||
}
|
||||
return start, offsets[chunk]
|
||||
}
|
||||
|
|
|
@ -46,8 +46,8 @@ func TestChunkIntCoder(t *testing.T) {
|
|||
[]uint64{3},
|
||||
[]uint64{7},
|
||||
},
|
||||
// 2 chunks, chunk-0 length 1, chunk-1 length 1, value 3, value 7
|
||||
expected: []byte{0x2, 0x1, 0x1, 0x3, 0x7},
|
||||
// 2 chunks, chunk-0 offset 1, chunk-1 offset 2, value 3, value 7
|
||||
expected: []byte{0x2, 0x1, 0x2, 0x3, 0x7},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -71,3 +71,199 @@ func TestChunkIntCoder(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestChunkLengthToOffsets(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
lengths []uint64
|
||||
expectedOffsets []uint64
|
||||
}{
|
||||
{
|
||||
lengths: []uint64{5, 5, 5, 5, 5},
|
||||
expectedOffsets: []uint64{5, 10, 15, 20, 25},
|
||||
},
|
||||
{
|
||||
lengths: []uint64{0, 5, 0, 5, 0},
|
||||
expectedOffsets: []uint64{0, 5, 5, 10, 10},
|
||||
},
|
||||
{
|
||||
lengths: []uint64{0, 0, 0, 0, 5},
|
||||
expectedOffsets: []uint64{0, 0, 0, 0, 5},
|
||||
},
|
||||
{
|
||||
lengths: []uint64{5, 0, 0, 0, 0},
|
||||
expectedOffsets: []uint64{5, 5, 5, 5, 5},
|
||||
},
|
||||
{
|
||||
lengths: []uint64{0, 5, 0, 0, 0},
|
||||
expectedOffsets: []uint64{0, 5, 5, 5, 5},
|
||||
},
|
||||
{
|
||||
lengths: []uint64{0, 0, 0, 5, 0},
|
||||
expectedOffsets: []uint64{0, 0, 0, 5, 5},
|
||||
},
|
||||
{
|
||||
lengths: []uint64{0, 0, 0, 5, 5},
|
||||
expectedOffsets: []uint64{0, 0, 0, 5, 10},
|
||||
},
|
||||
{
|
||||
lengths: []uint64{5, 5, 5, 0, 0},
|
||||
expectedOffsets: []uint64{5, 10, 15, 15, 15},
|
||||
},
|
||||
{
|
||||
lengths: []uint64{5},
|
||||
expectedOffsets: []uint64{5},
|
||||
},
|
||||
{
|
||||
lengths: []uint64{5, 5},
|
||||
expectedOffsets: []uint64{5, 10},
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
modifyLengthsToEndOffsets(test.lengths)
|
||||
if !reflect.DeepEqual(test.expectedOffsets, test.lengths) {
|
||||
t.Errorf("Test: %d failed, got %+v, expected %+v", i, test.lengths, test.expectedOffsets)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestChunkReadBoundaryFromOffsets(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
chunkNumber int
|
||||
offsets []uint64
|
||||
expectedStart uint64
|
||||
expectedEnd uint64
|
||||
}{
|
||||
{
|
||||
offsets: []uint64{5, 10, 15, 20, 25},
|
||||
chunkNumber: 4,
|
||||
expectedStart: 20,
|
||||
expectedEnd: 25,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{5, 10, 15, 20, 25},
|
||||
chunkNumber: 0,
|
||||
expectedStart: 0,
|
||||
expectedEnd: 5,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{5, 10, 15, 20, 25},
|
||||
chunkNumber: 2,
|
||||
expectedStart: 10,
|
||||
expectedEnd: 15,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{0, 5, 5, 10, 10},
|
||||
chunkNumber: 4,
|
||||
expectedStart: 10,
|
||||
expectedEnd: 10,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{0, 5, 5, 10, 10},
|
||||
chunkNumber: 1,
|
||||
expectedStart: 0,
|
||||
expectedEnd: 5,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{5, 5, 5, 5, 5},
|
||||
chunkNumber: 0,
|
||||
expectedStart: 0,
|
||||
expectedEnd: 5,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{5, 5, 5, 5, 5},
|
||||
chunkNumber: 4,
|
||||
expectedStart: 5,
|
||||
expectedEnd: 5,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{5, 5, 5, 5, 5},
|
||||
chunkNumber: 1,
|
||||
expectedStart: 5,
|
||||
expectedEnd: 5,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{0, 5, 5, 5, 5},
|
||||
chunkNumber: 1,
|
||||
expectedStart: 0,
|
||||
expectedEnd: 5,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{0, 5, 5, 5, 5},
|
||||
chunkNumber: 0,
|
||||
expectedStart: 0,
|
||||
expectedEnd: 0,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{0, 0, 0, 5, 5},
|
||||
chunkNumber: 2,
|
||||
expectedStart: 0,
|
||||
expectedEnd: 0,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{0, 0, 0, 5, 5},
|
||||
chunkNumber: 1,
|
||||
expectedStart: 0,
|
||||
expectedEnd: 0,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{0, 0, 0, 0, 5},
|
||||
chunkNumber: 4,
|
||||
expectedStart: 0,
|
||||
expectedEnd: 5,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{0, 0, 0, 0, 5},
|
||||
chunkNumber: 2,
|
||||
expectedStart: 0,
|
||||
expectedEnd: 0,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{5, 10, 15, 15, 15},
|
||||
chunkNumber: 0,
|
||||
expectedStart: 0,
|
||||
expectedEnd: 5,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{5, 10, 15, 15, 15},
|
||||
chunkNumber: 1,
|
||||
expectedStart: 5,
|
||||
expectedEnd: 10,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{5, 10, 15, 15, 15},
|
||||
chunkNumber: 2,
|
||||
expectedStart: 10,
|
||||
expectedEnd: 15,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{5, 10, 15, 15, 15},
|
||||
chunkNumber: 3,
|
||||
expectedStart: 15,
|
||||
expectedEnd: 15,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{5, 10, 15, 15, 15},
|
||||
chunkNumber: 4,
|
||||
expectedStart: 15,
|
||||
expectedEnd: 15,
|
||||
},
|
||||
{
|
||||
offsets: []uint64{5},
|
||||
chunkNumber: 0,
|
||||
expectedStart: 0,
|
||||
expectedEnd: 5,
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
s, e := readChunkBoundary(test.chunkNumber, test.offsets)
|
||||
if test.expectedStart != s || test.expectedEnd != e {
|
||||
t.Errorf("Test: %d failed for chunkNumber: %d got start: %d end: %d,"+
|
||||
" expected start: %d end: %d", i, test.chunkNumber, s, e,
|
||||
test.expectedStart, test.expectedEnd)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -183,6 +183,9 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
|
|||
return nil, 0, err
|
||||
}
|
||||
|
||||
newRoaring := roaring.NewBitmap()
|
||||
newRoaringLocs := roaring.NewBitmap()
|
||||
|
||||
// for each field
|
||||
for fieldID, fieldName := range fieldsInv {
|
||||
|
||||
|
@ -222,8 +225,8 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
|
|||
|
||||
var prevTerm []byte
|
||||
|
||||
newRoaring := roaring.NewBitmap()
|
||||
newRoaringLocs := roaring.NewBitmap()
|
||||
newRoaring.Clear()
|
||||
newRoaringLocs.Clear()
|
||||
|
||||
var lastDocNum, lastFreq, lastNorm uint64
|
||||
|
||||
|
@ -248,63 +251,22 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
|
|||
tfEncoder.Close()
|
||||
locEncoder.Close()
|
||||
|
||||
termCardinality := newRoaring.GetCardinality()
|
||||
postingsOffset, err := writePostings(
|
||||
newRoaring, newRoaringLocs, tfEncoder, locEncoder,
|
||||
use1HitEncoding, w, bufMaxVarintLen64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
encodeAs1Hit, docNum1Hit, normBits1Hit := use1HitEncoding(termCardinality)
|
||||
if encodeAs1Hit {
|
||||
err = newVellum.Insert(term, FSTValEncode1Hit(docNum1Hit, normBits1Hit))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if termCardinality > 0 {
|
||||
// this field/term has hits in the new segment
|
||||
freqOffset := uint64(w.Count())
|
||||
_, err := tfEncoder.Write(w)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
locOffset := uint64(w.Count())
|
||||
_, err = locEncoder.Write(w)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
postingLocOffset := uint64(w.Count())
|
||||
_, err = writeRoaringWithLen(newRoaringLocs, w, bufMaxVarintLen64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
postingOffset := uint64(w.Count())
|
||||
// write out the start of the term info
|
||||
n := binary.PutUvarint(bufMaxVarintLen64, freqOffset)
|
||||
_, err = w.Write(bufMaxVarintLen64[:n])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// write out the start of the loc info
|
||||
n = binary.PutUvarint(bufMaxVarintLen64, locOffset)
|
||||
_, err = w.Write(bufMaxVarintLen64[:n])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// write out the start of the posting locs
|
||||
n = binary.PutUvarint(bufMaxVarintLen64, postingLocOffset)
|
||||
_, err = w.Write(bufMaxVarintLen64[:n])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = writeRoaringWithLen(newRoaring, w, bufMaxVarintLen64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = newVellum.Insert(term, postingOffset)
|
||||
if postingsOffset > 0 {
|
||||
err = newVellum.Insert(term, postingsOffset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
newRoaring = roaring.NewBitmap()
|
||||
newRoaringLocs = roaring.NewBitmap()
|
||||
newRoaring.Clear()
|
||||
newRoaringLocs.Clear()
|
||||
|
||||
tfEncoder.Reset()
|
||||
locEncoder.Reset()
|
||||
|
@ -460,6 +422,69 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
|
|||
return rv, fieldDvLocsOffset, nil
|
||||
}
|
||||
|
||||
func writePostings(postings, postingLocs *roaring.Bitmap,
|
||||
tfEncoder, locEncoder *chunkedIntCoder,
|
||||
use1HitEncoding func(uint64) (bool, uint64, uint64),
|
||||
w *CountHashWriter, bufMaxVarintLen64 []byte) (
|
||||
offset uint64, err error) {
|
||||
termCardinality := postings.GetCardinality()
|
||||
if termCardinality <= 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
if use1HitEncoding != nil {
|
||||
encodeAs1Hit, docNum1Hit, normBits1Hit := use1HitEncoding(termCardinality)
|
||||
if encodeAs1Hit {
|
||||
return FSTValEncode1Hit(docNum1Hit, normBits1Hit), nil
|
||||
}
|
||||
}
|
||||
|
||||
tfOffset := uint64(w.Count())
|
||||
_, err = tfEncoder.Write(w)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
locOffset := uint64(w.Count())
|
||||
_, err = locEncoder.Write(w)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
postingLocsOffset := uint64(w.Count())
|
||||
_, err = writeRoaringWithLen(postingLocs, w, bufMaxVarintLen64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
postingsOffset := uint64(w.Count())
|
||||
|
||||
n := binary.PutUvarint(bufMaxVarintLen64, tfOffset)
|
||||
_, err = w.Write(bufMaxVarintLen64[:n])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
n = binary.PutUvarint(bufMaxVarintLen64, locOffset)
|
||||
_, err = w.Write(bufMaxVarintLen64[:n])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
n = binary.PutUvarint(bufMaxVarintLen64, postingLocsOffset)
|
||||
_, err = w.Write(bufMaxVarintLen64[:n])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
_, err = writeRoaringWithLen(postings, w, bufMaxVarintLen64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return postingsOffset, nil
|
||||
}
|
||||
|
||||
func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
|
||||
fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64,
|
||||
w *CountHashWriter) (uint64, [][]uint64, error) {
|
||||
|
|
|
@ -26,7 +26,6 @@ import (
|
|||
"github.com/blevesearch/bleve/analysis"
|
||||
"github.com/blevesearch/bleve/document"
|
||||
"github.com/blevesearch/bleve/index"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment/mem"
|
||||
)
|
||||
|
||||
func TestMerge(t *testing.T) {
|
||||
|
@ -34,14 +33,14 @@ func TestMerge(t *testing.T) {
|
|||
_ = os.RemoveAll("/tmp/scorch2.zap")
|
||||
_ = os.RemoveAll("/tmp/scorch3.zap")
|
||||
|
||||
memSegment := buildMemSegmentMulti()
|
||||
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
|
||||
testSeg, _ := buildTestSegmentMulti()
|
||||
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
memSegment2 := buildMemSegmentMulti2()
|
||||
err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024)
|
||||
testSeg2, _ := buildTestSegmentMulti2()
|
||||
err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -121,8 +120,8 @@ func TestMergeWithEmptySegmentsFirst(t *testing.T) {
|
|||
func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int) {
|
||||
_ = os.RemoveAll("/tmp/scorch.zap")
|
||||
|
||||
memSegment := buildMemSegmentMulti()
|
||||
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
|
||||
testSeg, _ := buildTestSegmentMulti()
|
||||
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -148,8 +147,8 @@ func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int)
|
|||
|
||||
_ = os.RemoveAll("/tmp/" + fname)
|
||||
|
||||
emptySegment := mem.NewFromAnalyzedDocs([]*index.AnalysisResult{})
|
||||
err = PersistSegment(emptySegment, "/tmp/"+fname, 1024)
|
||||
emptySegment, _ := AnalysisResultsToSegmentBase([]*index.AnalysisResult{}, 1024)
|
||||
err = PersistSegmentBase(emptySegment, "/tmp/"+fname)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -462,8 +461,8 @@ func testMergeAndDrop(t *testing.T, docsToDrop []*roaring.Bitmap) {
|
|||
_ = os.RemoveAll("/tmp/scorch.zap")
|
||||
_ = os.RemoveAll("/tmp/scorch2.zap")
|
||||
|
||||
memSegment := buildMemSegmentMulti()
|
||||
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
|
||||
testSeg, _ := buildTestSegmentMulti()
|
||||
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -478,8 +477,8 @@ func testMergeAndDrop(t *testing.T, docsToDrop []*roaring.Bitmap) {
|
|||
}
|
||||
}()
|
||||
|
||||
memSegment2 := buildMemSegmentMulti2()
|
||||
err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024)
|
||||
testSeg2, _ := buildTestSegmentMulti2()
|
||||
err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -565,8 +564,8 @@ func testMergeWithUpdates(t *testing.T, segmentDocIds [][]string, docsToDrop []*
|
|||
|
||||
_ = os.RemoveAll("/tmp/" + fname)
|
||||
|
||||
memSegment := buildMemSegmentMultiHelper(docIds)
|
||||
err := PersistSegment(memSegment, "/tmp/"+fname, 1024)
|
||||
testSeg, _ := buildTestSegmentMultiHelper(docIds)
|
||||
err := PersistSegmentBase(testSeg, "/tmp/"+fname)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -616,11 +615,11 @@ func testMergeAndDropSegments(t *testing.T, segsToMerge []*Segment, docsToDrop [
|
|||
testMergeWithSelf(t, segm.(*Segment), expectedNumDocs)
|
||||
}
|
||||
|
||||
func buildMemSegmentMulti2() *mem.Segment {
|
||||
return buildMemSegmentMultiHelper([]string{"c", "d"})
|
||||
func buildTestSegmentMulti2() (*SegmentBase, error) {
|
||||
return buildTestSegmentMultiHelper([]string{"c", "d"})
|
||||
}
|
||||
|
||||
func buildMemSegmentMultiHelper(docIds []string) *mem.Segment {
|
||||
func buildTestSegmentMultiHelper(docIds []string) (*SegmentBase, error) {
|
||||
doc := &document.Document{
|
||||
ID: "c",
|
||||
Fields: []document.Field{
|
||||
|
@ -778,9 +777,7 @@ func buildMemSegmentMultiHelper(docIds []string) *mem.Segment {
|
|||
}
|
||||
}
|
||||
|
||||
segment := mem.NewFromAnalyzedDocs(results)
|
||||
|
||||
return segment
|
||||
return AnalysisResultsToSegmentBase(results, 1024)
|
||||
}
|
||||
|
||||
func TestMergeBytesWritten(t *testing.T) {
|
||||
|
@ -788,14 +785,14 @@ func TestMergeBytesWritten(t *testing.T) {
|
|||
_ = os.RemoveAll("/tmp/scorch2.zap")
|
||||
_ = os.RemoveAll("/tmp/scorch3.zap")
|
||||
|
||||
memSegment := buildMemSegmentMulti()
|
||||
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
|
||||
testSeg, _ := buildTestSegmentMulti()
|
||||
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
memSegment2 := buildMemSegmentMulti2()
|
||||
err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024)
|
||||
testSeg2, _ := buildTestSegmentMulti2()
|
||||
err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,770 @@
|
|||
// Copyright (c) 2018 Couchbase, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package zap
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"math"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/Smerity/govarint"
|
||||
"github.com/blevesearch/bleve/analysis"
|
||||
"github.com/blevesearch/bleve/document"
|
||||
"github.com/blevesearch/bleve/index"
|
||||
"github.com/couchbase/vellum"
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
// AnalysisResultsToSegmentBase produces an in-memory zap-encoded
|
||||
// SegmentBase from analysis results
|
||||
func AnalysisResultsToSegmentBase(results []*index.AnalysisResult,
|
||||
chunkFactor uint32) (*SegmentBase, error) {
|
||||
var br bytes.Buffer
|
||||
|
||||
s := interimPool.Get().(*interim)
|
||||
|
||||
s.results = results
|
||||
s.chunkFactor = chunkFactor
|
||||
s.w = NewCountHashWriter(&br)
|
||||
|
||||
storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets,
|
||||
err := s.convert()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkFactor,
|
||||
s.FieldsMap, s.FieldsInv, uint64(len(results)),
|
||||
storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets)
|
||||
|
||||
if err == nil && s.reset() == nil {
|
||||
interimPool.Put(s)
|
||||
}
|
||||
|
||||
return sb, err
|
||||
}
|
||||
|
||||
var interimPool = sync.Pool{New: func() interface{} { return &interim{} }}
|
||||
|
||||
// interim holds temporary working data used while converting from
|
||||
// analysis results to a zap-encoded segment
|
||||
type interim struct {
|
||||
results []*index.AnalysisResult
|
||||
|
||||
chunkFactor uint32
|
||||
|
||||
w *CountHashWriter
|
||||
|
||||
// FieldsMap adds 1 to field id to avoid zero value issues
|
||||
// name -> field id + 1
|
||||
FieldsMap map[string]uint16
|
||||
|
||||
// FieldsInv is the inverse of FieldsMap
|
||||
// field id -> name
|
||||
FieldsInv []string
|
||||
|
||||
// Term dictionaries for each field
|
||||
// field id -> term -> postings list id + 1
|
||||
Dicts []map[string]uint64
|
||||
|
||||
// Terms for each field, where terms are sorted ascending
|
||||
// field id -> []term
|
||||
DictKeys [][]string
|
||||
|
||||
// Fields whose IncludeDocValues is true
|
||||
// field id -> bool
|
||||
IncludeDocValues []bool
|
||||
|
||||
// postings id -> bitmap of docNums
|
||||
Postings []*roaring.Bitmap
|
||||
|
||||
// postings id -> bitmap of docNums that have locations
|
||||
PostingsLocs []*roaring.Bitmap
|
||||
|
||||
// postings id -> freq/norm's, one for each docNum in postings
|
||||
FreqNorms [][]interimFreqNorm
|
||||
freqNormsBacking []interimFreqNorm
|
||||
|
||||
// postings id -> locs, one for each freq
|
||||
Locs [][]interimLoc
|
||||
locsBacking []interimLoc
|
||||
|
||||
numTermsPerPostingsList []int // key is postings list id
|
||||
numLocsPerPostingsList []int // key is postings list id
|
||||
|
||||
builder *vellum.Builder
|
||||
builderBuf bytes.Buffer
|
||||
|
||||
metaBuf bytes.Buffer
|
||||
|
||||
tmp0 []byte
|
||||
tmp1 []byte
|
||||
}
|
||||
|
||||
func (s *interim) reset() (err error) {
|
||||
s.results = nil
|
||||
s.chunkFactor = 0
|
||||
s.w = nil
|
||||
s.FieldsMap = nil
|
||||
s.FieldsInv = s.FieldsInv[:0]
|
||||
for i := range s.Dicts {
|
||||
s.Dicts[i] = nil
|
||||
}
|
||||
s.Dicts = s.Dicts[:0]
|
||||
for i := range s.DictKeys {
|
||||
s.DictKeys[i] = s.DictKeys[i][:0]
|
||||
}
|
||||
s.DictKeys = s.DictKeys[:0]
|
||||
for i := range s.IncludeDocValues {
|
||||
s.IncludeDocValues[i] = false
|
||||
}
|
||||
s.IncludeDocValues = s.IncludeDocValues[:0]
|
||||
for _, idn := range s.Postings {
|
||||
idn.Clear()
|
||||
}
|
||||
s.Postings = s.Postings[:0]
|
||||
for _, idn := range s.PostingsLocs {
|
||||
idn.Clear()
|
||||
}
|
||||
s.PostingsLocs = s.PostingsLocs[:0]
|
||||
s.FreqNorms = s.FreqNorms[:0]
|
||||
for i := range s.freqNormsBacking {
|
||||
s.freqNormsBacking[i] = interimFreqNorm{}
|
||||
}
|
||||
s.freqNormsBacking = s.freqNormsBacking[:0]
|
||||
s.Locs = s.Locs[:0]
|
||||
for i := range s.locsBacking {
|
||||
s.locsBacking[i] = interimLoc{}
|
||||
}
|
||||
s.locsBacking = s.locsBacking[:0]
|
||||
s.numTermsPerPostingsList = s.numTermsPerPostingsList[:0]
|
||||
s.numLocsPerPostingsList = s.numLocsPerPostingsList[:0]
|
||||
s.builderBuf.Reset()
|
||||
if s.builder != nil {
|
||||
err = s.builder.Reset(&s.builderBuf)
|
||||
}
|
||||
s.metaBuf.Reset()
|
||||
s.tmp0 = s.tmp0[:0]
|
||||
s.tmp1 = s.tmp1[:0]
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *interim) grabBuf(size int) []byte {
|
||||
buf := s.tmp0
|
||||
if cap(buf) < size {
|
||||
buf = make([]byte, size)
|
||||
s.tmp0 = buf
|
||||
}
|
||||
return buf[0:size]
|
||||
}
|
||||
|
||||
type interimStoredField struct {
|
||||
vals [][]byte
|
||||
typs []byte
|
||||
arrayposs [][]uint64 // array positions
|
||||
}
|
||||
|
||||
type interimFreqNorm struct {
|
||||
freq uint64
|
||||
norm float32
|
||||
}
|
||||
|
||||
type interimLoc struct {
|
||||
fieldID uint16
|
||||
pos uint64
|
||||
start uint64
|
||||
end uint64
|
||||
arrayposs []uint64
|
||||
}
|
||||
|
||||
func (s *interim) convert() (uint64, uint64, uint64, []uint64, error) {
|
||||
s.FieldsMap = map[string]uint16{}
|
||||
|
||||
s.getOrDefineField("_id") // _id field is fieldID 0
|
||||
|
||||
for _, result := range s.results {
|
||||
for _, field := range result.Document.CompositeFields {
|
||||
s.getOrDefineField(field.Name())
|
||||
}
|
||||
for _, field := range result.Document.Fields {
|
||||
s.getOrDefineField(field.Name())
|
||||
}
|
||||
}
|
||||
|
||||
sort.Strings(s.FieldsInv[1:]) // keep _id as first field
|
||||
|
||||
for fieldID, fieldName := range s.FieldsInv {
|
||||
s.FieldsMap[fieldName] = uint16(fieldID + 1)
|
||||
}
|
||||
|
||||
if cap(s.IncludeDocValues) >= len(s.FieldsInv) {
|
||||
s.IncludeDocValues = s.IncludeDocValues[:len(s.FieldsInv)]
|
||||
} else {
|
||||
s.IncludeDocValues = make([]bool, len(s.FieldsInv))
|
||||
}
|
||||
|
||||
s.prepareDicts()
|
||||
|
||||
for _, dict := range s.DictKeys {
|
||||
sort.Strings(dict)
|
||||
}
|
||||
|
||||
s.processDocuments()
|
||||
|
||||
storedIndexOffset, err := s.writeStoredFields()
|
||||
if err != nil {
|
||||
return 0, 0, 0, nil, err
|
||||
}
|
||||
|
||||
var fdvIndexOffset uint64
|
||||
var dictOffsets []uint64
|
||||
|
||||
if len(s.results) > 0 {
|
||||
fdvIndexOffset, dictOffsets, err = s.writeDicts()
|
||||
if err != nil {
|
||||
return 0, 0, 0, nil, err
|
||||
}
|
||||
} else {
|
||||
dictOffsets = make([]uint64, len(s.FieldsInv))
|
||||
}
|
||||
|
||||
fieldsIndexOffset, err := persistFields(s.FieldsInv, s.w, dictOffsets)
|
||||
if err != nil {
|
||||
return 0, 0, 0, nil, err
|
||||
}
|
||||
|
||||
return storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets, nil
|
||||
}
|
||||
|
||||
func (s *interim) getOrDefineField(fieldName string) int {
|
||||
fieldIDPlus1, exists := s.FieldsMap[fieldName]
|
||||
if !exists {
|
||||
fieldIDPlus1 = uint16(len(s.FieldsInv) + 1)
|
||||
s.FieldsMap[fieldName] = fieldIDPlus1
|
||||
s.FieldsInv = append(s.FieldsInv, fieldName)
|
||||
|
||||
s.Dicts = append(s.Dicts, make(map[string]uint64))
|
||||
|
||||
n := len(s.DictKeys)
|
||||
if n < cap(s.DictKeys) {
|
||||
s.DictKeys = s.DictKeys[:n+1]
|
||||
s.DictKeys[n] = s.DictKeys[n][:0]
|
||||
} else {
|
||||
s.DictKeys = append(s.DictKeys, []string(nil))
|
||||
}
|
||||
}
|
||||
|
||||
return int(fieldIDPlus1 - 1)
|
||||
}
|
||||
|
||||
// fill Dicts and DictKeys from analysis results
|
||||
func (s *interim) prepareDicts() {
|
||||
var pidNext int
|
||||
|
||||
var totTFs int
|
||||
var totLocs int
|
||||
|
||||
visitField := func(fieldID uint16, tfs analysis.TokenFrequencies) {
|
||||
dict := s.Dicts[fieldID]
|
||||
dictKeys := s.DictKeys[fieldID]
|
||||
|
||||
for term, tf := range tfs {
|
||||
pidPlus1, exists := dict[term]
|
||||
if !exists {
|
||||
pidNext++
|
||||
pidPlus1 = uint64(pidNext)
|
||||
|
||||
dict[term] = pidPlus1
|
||||
dictKeys = append(dictKeys, term)
|
||||
|
||||
s.numTermsPerPostingsList = append(s.numTermsPerPostingsList, 0)
|
||||
s.numLocsPerPostingsList = append(s.numLocsPerPostingsList, 0)
|
||||
}
|
||||
|
||||
pid := pidPlus1 - 1
|
||||
|
||||
s.numTermsPerPostingsList[pid] += 1
|
||||
s.numLocsPerPostingsList[pid] += len(tf.Locations)
|
||||
|
||||
totLocs += len(tf.Locations)
|
||||
}
|
||||
|
||||
totTFs += len(tfs)
|
||||
|
||||
s.DictKeys[fieldID] = dictKeys
|
||||
}
|
||||
|
||||
for _, result := range s.results {
|
||||
// walk each composite field
|
||||
for _, field := range result.Document.CompositeFields {
|
||||
fieldID := uint16(s.getOrDefineField(field.Name()))
|
||||
_, tf := field.Analyze()
|
||||
visitField(fieldID, tf)
|
||||
}
|
||||
|
||||
// walk each field
|
||||
for i, field := range result.Document.Fields {
|
||||
fieldID := uint16(s.getOrDefineField(field.Name()))
|
||||
tf := result.Analyzed[i]
|
||||
visitField(fieldID, tf)
|
||||
}
|
||||
}
|
||||
|
||||
numPostingsLists := pidNext
|
||||
|
||||
if cap(s.Postings) >= numPostingsLists {
|
||||
s.Postings = s.Postings[:numPostingsLists]
|
||||
} else {
|
||||
postings := make([]*roaring.Bitmap, numPostingsLists)
|
||||
copy(postings, s.Postings[:cap(s.Postings)])
|
||||
for i := 0; i < numPostingsLists; i++ {
|
||||
if postings[i] == nil {
|
||||
postings[i] = roaring.New()
|
||||
}
|
||||
}
|
||||
s.Postings = postings
|
||||
}
|
||||
|
||||
if cap(s.PostingsLocs) >= numPostingsLists {
|
||||
s.PostingsLocs = s.PostingsLocs[:numPostingsLists]
|
||||
} else {
|
||||
postingsLocs := make([]*roaring.Bitmap, numPostingsLists)
|
||||
copy(postingsLocs, s.PostingsLocs[:cap(s.PostingsLocs)])
|
||||
for i := 0; i < numPostingsLists; i++ {
|
||||
if postingsLocs[i] == nil {
|
||||
postingsLocs[i] = roaring.New()
|
||||
}
|
||||
}
|
||||
s.PostingsLocs = postingsLocs
|
||||
}
|
||||
|
||||
if cap(s.FreqNorms) >= numPostingsLists {
|
||||
s.FreqNorms = s.FreqNorms[:numPostingsLists]
|
||||
} else {
|
||||
s.FreqNorms = make([][]interimFreqNorm, numPostingsLists)
|
||||
}
|
||||
|
||||
if cap(s.freqNormsBacking) >= totTFs {
|
||||
s.freqNormsBacking = s.freqNormsBacking[:totTFs]
|
||||
} else {
|
||||
s.freqNormsBacking = make([]interimFreqNorm, totTFs)
|
||||
}
|
||||
|
||||
freqNormsBacking := s.freqNormsBacking
|
||||
for pid, numTerms := range s.numTermsPerPostingsList {
|
||||
s.FreqNorms[pid] = freqNormsBacking[0:0]
|
||||
freqNormsBacking = freqNormsBacking[numTerms:]
|
||||
}
|
||||
|
||||
if cap(s.Locs) >= numPostingsLists {
|
||||
s.Locs = s.Locs[:numPostingsLists]
|
||||
} else {
|
||||
s.Locs = make([][]interimLoc, numPostingsLists)
|
||||
}
|
||||
|
||||
if cap(s.locsBacking) >= totLocs {
|
||||
s.locsBacking = s.locsBacking[:totLocs]
|
||||
} else {
|
||||
s.locsBacking = make([]interimLoc, totLocs)
|
||||
}
|
||||
|
||||
locsBacking := s.locsBacking
|
||||
for pid, numLocs := range s.numLocsPerPostingsList {
|
||||
s.Locs[pid] = locsBacking[0:0]
|
||||
locsBacking = locsBacking[numLocs:]
|
||||
}
|
||||
}
|
||||
|
||||
func (s *interim) processDocuments() {
|
||||
numFields := len(s.FieldsInv)
|
||||
reuseFieldLens := make([]int, numFields)
|
||||
reuseFieldTFs := make([]analysis.TokenFrequencies, numFields)
|
||||
|
||||
for docNum, result := range s.results {
|
||||
for i := 0; i < numFields; i++ { // clear these for reuse
|
||||
reuseFieldLens[i] = 0
|
||||
reuseFieldTFs[i] = nil
|
||||
}
|
||||
|
||||
s.processDocument(uint64(docNum), result,
|
||||
reuseFieldLens, reuseFieldTFs)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *interim) processDocument(docNum uint64,
|
||||
result *index.AnalysisResult,
|
||||
fieldLens []int, fieldTFs []analysis.TokenFrequencies) {
|
||||
visitField := func(fieldID uint16, fieldName string,
|
||||
ln int, tf analysis.TokenFrequencies) {
|
||||
fieldLens[fieldID] += ln
|
||||
|
||||
existingFreqs := fieldTFs[fieldID]
|
||||
if existingFreqs != nil {
|
||||
existingFreqs.MergeAll(fieldName, tf)
|
||||
} else {
|
||||
fieldTFs[fieldID] = tf
|
||||
}
|
||||
}
|
||||
|
||||
// walk each composite field
|
||||
for _, field := range result.Document.CompositeFields {
|
||||
fieldID := uint16(s.getOrDefineField(field.Name()))
|
||||
ln, tf := field.Analyze()
|
||||
visitField(fieldID, field.Name(), ln, tf)
|
||||
}
|
||||
|
||||
// walk each field
|
||||
for i, field := range result.Document.Fields {
|
||||
fieldID := uint16(s.getOrDefineField(field.Name()))
|
||||
ln := result.Length[i]
|
||||
tf := result.Analyzed[i]
|
||||
visitField(fieldID, field.Name(), ln, tf)
|
||||
}
|
||||
|
||||
// now that it's been rolled up into fieldTFs, walk that
|
||||
for fieldID, tfs := range fieldTFs {
|
||||
dict := s.Dicts[fieldID]
|
||||
norm := float32(1.0 / math.Sqrt(float64(fieldLens[fieldID])))
|
||||
|
||||
for term, tf := range tfs {
|
||||
pid := dict[term] - 1
|
||||
bs := s.Postings[pid]
|
||||
bs.Add(uint32(docNum))
|
||||
|
||||
s.FreqNorms[pid] = append(s.FreqNorms[pid],
|
||||
interimFreqNorm{
|
||||
freq: uint64(tf.Frequency()),
|
||||
norm: norm,
|
||||
})
|
||||
|
||||
if len(tf.Locations) > 0 {
|
||||
locBS := s.PostingsLocs[pid]
|
||||
locBS.Add(uint32(docNum))
|
||||
|
||||
locs := s.Locs[pid]
|
||||
|
||||
for _, loc := range tf.Locations {
|
||||
var locf = uint16(fieldID)
|
||||
if loc.Field != "" {
|
||||
locf = uint16(s.getOrDefineField(loc.Field))
|
||||
}
|
||||
var arrayposs []uint64
|
||||
if len(loc.ArrayPositions) > 0 {
|
||||
arrayposs = loc.ArrayPositions
|
||||
}
|
||||
locs = append(locs, interimLoc{
|
||||
fieldID: locf,
|
||||
pos: uint64(loc.Position),
|
||||
start: uint64(loc.Start),
|
||||
end: uint64(loc.End),
|
||||
arrayposs: arrayposs,
|
||||
})
|
||||
}
|
||||
|
||||
s.Locs[pid] = locs
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *interim) writeStoredFields() (
|
||||
storedIndexOffset uint64, err error) {
|
||||
metaEncoder := govarint.NewU64Base128Encoder(&s.metaBuf)
|
||||
|
||||
data, compressed := s.tmp0[:0], s.tmp1[:0]
|
||||
defer func() { s.tmp0, s.tmp1 = data, compressed }()
|
||||
|
||||
// keyed by docNum
|
||||
docStoredOffsets := make([]uint64, len(s.results))
|
||||
|
||||
// keyed by fieldID, for the current doc in the loop
|
||||
docStoredFields := map[uint16]interimStoredField{}
|
||||
|
||||
for docNum, result := range s.results {
|
||||
for fieldID := range docStoredFields { // reset for next doc
|
||||
delete(docStoredFields, fieldID)
|
||||
}
|
||||
|
||||
for _, field := range result.Document.Fields {
|
||||
fieldID := uint16(s.getOrDefineField(field.Name()))
|
||||
|
||||
opts := field.Options()
|
||||
|
||||
if opts.IsStored() {
|
||||
isf := docStoredFields[fieldID]
|
||||
isf.vals = append(isf.vals, field.Value())
|
||||
isf.typs = append(isf.typs, encodeFieldType(field))
|
||||
isf.arrayposs = append(isf.arrayposs, field.ArrayPositions())
|
||||
docStoredFields[fieldID] = isf
|
||||
}
|
||||
|
||||
if opts.IncludeDocValues() {
|
||||
s.IncludeDocValues[fieldID] = true
|
||||
}
|
||||
}
|
||||
|
||||
var curr int
|
||||
|
||||
s.metaBuf.Reset()
|
||||
data = data[:0]
|
||||
compressed = compressed[:0]
|
||||
|
||||
for fieldID := range s.FieldsInv {
|
||||
isf, exists := docStoredFields[uint16(fieldID)]
|
||||
if exists {
|
||||
curr, data, err = persistStoredFieldValues(
|
||||
fieldID, isf.vals, isf.typs, isf.arrayposs,
|
||||
curr, metaEncoder, data)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metaEncoder.Close()
|
||||
metaBytes := s.metaBuf.Bytes()
|
||||
|
||||
compressed = snappy.Encode(compressed, data)
|
||||
|
||||
docStoredOffsets[docNum] = uint64(s.w.Count())
|
||||
|
||||
_, err := writeUvarints(s.w,
|
||||
uint64(len(metaBytes)),
|
||||
uint64(len(compressed)))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
_, err = s.w.Write(metaBytes)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
_, err = s.w.Write(compressed)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
storedIndexOffset = uint64(s.w.Count())
|
||||
|
||||
for _, docStoredOffset := range docStoredOffsets {
|
||||
err = binary.Write(s.w, binary.BigEndian, docStoredOffset)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
return storedIndexOffset, nil
|
||||
}
|
||||
|
||||
func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err error) {
|
||||
dictOffsets = make([]uint64, len(s.FieldsInv))
|
||||
|
||||
fdvOffsets := make([]uint64, len(s.FieldsInv))
|
||||
|
||||
buf := s.grabBuf(binary.MaxVarintLen64)
|
||||
|
||||
tfEncoder := newChunkedIntCoder(uint64(s.chunkFactor), uint64(len(s.results)-1))
|
||||
locEncoder := newChunkedIntCoder(uint64(s.chunkFactor), uint64(len(s.results)-1))
|
||||
fdvEncoder := newChunkedContentCoder(uint64(s.chunkFactor), uint64(len(s.results)-1))
|
||||
|
||||
var docTermMap [][]byte
|
||||
|
||||
if s.builder == nil {
|
||||
s.builder, err = vellum.New(&s.builderBuf, nil)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
for fieldID, terms := range s.DictKeys {
|
||||
if cap(docTermMap) < len(s.results) {
|
||||
docTermMap = make([][]byte, len(s.results))
|
||||
} else {
|
||||
docTermMap = docTermMap[0:len(s.results)]
|
||||
for docNum := range docTermMap { // reset the docTermMap
|
||||
docTermMap[docNum] = docTermMap[docNum][:0]
|
||||
}
|
||||
}
|
||||
|
||||
dict := s.Dicts[fieldID]
|
||||
|
||||
for _, term := range terms { // terms are already sorted
|
||||
pid := dict[term] - 1
|
||||
|
||||
postingsBS := s.Postings[pid]
|
||||
postingsLocsBS := s.PostingsLocs[pid]
|
||||
|
||||
freqNorms := s.FreqNorms[pid]
|
||||
freqNormOffset := 0
|
||||
|
||||
locs := s.Locs[pid]
|
||||
locOffset := 0
|
||||
|
||||
postingsItr := postingsBS.Iterator()
|
||||
for postingsItr.HasNext() {
|
||||
docNum := uint64(postingsItr.Next())
|
||||
|
||||
freqNorm := freqNorms[freqNormOffset]
|
||||
|
||||
err = tfEncoder.Add(docNum, freqNorm.freq,
|
||||
uint64(math.Float32bits(freqNorm.norm)))
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
for i := uint64(0); i < freqNorm.freq; i++ {
|
||||
if len(locs) > 0 {
|
||||
loc := locs[locOffset]
|
||||
|
||||
err = locEncoder.Add(docNum, uint64(loc.fieldID),
|
||||
loc.pos, loc.start, loc.end,
|
||||
uint64(len(loc.arrayposs)))
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
err = locEncoder.Add(docNum, loc.arrayposs...)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
locOffset++
|
||||
}
|
||||
|
||||
freqNormOffset++
|
||||
|
||||
docTermMap[docNum] = append(
|
||||
append(docTermMap[docNum], term...),
|
||||
termSeparator)
|
||||
}
|
||||
|
||||
tfEncoder.Close()
|
||||
locEncoder.Close()
|
||||
|
||||
postingsOffset, err := writePostings(
|
||||
postingsBS, postingsLocsBS, tfEncoder, locEncoder,
|
||||
nil, s.w, buf)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
if postingsOffset > uint64(0) {
|
||||
err = s.builder.Insert([]byte(term), postingsOffset)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
tfEncoder.Reset()
|
||||
locEncoder.Reset()
|
||||
}
|
||||
|
||||
err = s.builder.Close()
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
// record where this dictionary starts
|
||||
dictOffsets[fieldID] = uint64(s.w.Count())
|
||||
|
||||
vellumData := s.builderBuf.Bytes()
|
||||
|
||||
// write out the length of the vellum data
|
||||
n := binary.PutUvarint(buf, uint64(len(vellumData)))
|
||||
_, err = s.w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
// write this vellum to disk
|
||||
_, err = s.w.Write(vellumData)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
// reset vellum for reuse
|
||||
s.builderBuf.Reset()
|
||||
|
||||
err = s.builder.Reset(&s.builderBuf)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
// write the field doc values
|
||||
if s.IncludeDocValues[fieldID] {
|
||||
for docNum, docTerms := range docTermMap {
|
||||
if len(docTerms) > 0 {
|
||||
err = fdvEncoder.Add(uint64(docNum), docTerms)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
err = fdvEncoder.Close()
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
fdvOffsets[fieldID] = uint64(s.w.Count())
|
||||
|
||||
_, err = fdvEncoder.Write(s.w)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
fdvEncoder.Reset()
|
||||
} else {
|
||||
fdvOffsets[fieldID] = fieldNotUninverted
|
||||
}
|
||||
}
|
||||
|
||||
fdvIndexOffset = uint64(s.w.Count())
|
||||
|
||||
for _, fdvOffset := range fdvOffsets {
|
||||
n := binary.PutUvarint(buf, fdvOffset)
|
||||
_, err := s.w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return fdvIndexOffset, dictOffsets, nil
|
||||
}
|
||||
|
||||
func encodeFieldType(f document.Field) byte {
|
||||
fieldType := byte('x')
|
||||
switch f.(type) {
|
||||
case *document.TextField:
|
||||
fieldType = 't'
|
||||
case *document.NumericField:
|
||||
fieldType = 'n'
|
||||
case *document.DateTimeField:
|
||||
fieldType = 'd'
|
||||
case *document.BooleanField:
|
||||
fieldType = 'b'
|
||||
case *document.GeoPointField:
|
||||
fieldType = 'g'
|
||||
case *document.CompositeField:
|
||||
fieldType = 'c'
|
||||
}
|
||||
return fieldType
|
||||
}
|
|
@ -92,6 +92,8 @@ func under32Bits(x uint64) bool {
|
|||
return x <= mask31Bits
|
||||
}
|
||||
|
||||
const docNum1HitFinished = math.MaxUint64
|
||||
|
||||
// PostingsList is an in-memory represenation of a postings list
|
||||
type PostingsList struct {
|
||||
sb *SegmentBase
|
||||
|
@ -102,8 +104,9 @@ type PostingsList struct {
|
|||
postings *roaring.Bitmap
|
||||
except *roaring.Bitmap
|
||||
|
||||
// when postingsOffset == freqOffset == 0, then the postings list
|
||||
// represents a "1-hit" encoding, and has the following norm
|
||||
// when normBits1Hit != 0, then this postings list came from a
|
||||
// 1-hit encoding, and only the docNum1Hit & normBits1Hit apply
|
||||
docNum1Hit uint64
|
||||
normBits1Hit uint64
|
||||
}
|
||||
|
||||
|
@ -117,6 +120,17 @@ func (p *PostingsList) Size() int {
|
|||
return sizeInBytes
|
||||
}
|
||||
|
||||
func (p *PostingsList) OrInto(receiver *roaring.Bitmap) {
|
||||
if p.normBits1Hit != 0 {
|
||||
receiver.Add(uint32(p.docNum1Hit))
|
||||
return
|
||||
}
|
||||
|
||||
if p.postings != nil {
|
||||
receiver.Or(p.postings)
|
||||
}
|
||||
}
|
||||
|
||||
// Iterator returns an iterator for this postings list
|
||||
func (p *PostingsList) Iterator() segment.PostingsIterator {
|
||||
return p.iterator(nil)
|
||||
|
@ -152,39 +166,47 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator {
|
|||
}
|
||||
rv.postings = p
|
||||
|
||||
if p.normBits1Hit != 0 {
|
||||
// "1-hit" encoding
|
||||
rv.docNum1Hit = p.docNum1Hit
|
||||
rv.normBits1Hit = p.normBits1Hit
|
||||
|
||||
if p.except != nil && p.except.Contains(uint32(rv.docNum1Hit)) {
|
||||
rv.docNum1Hit = docNum1HitFinished
|
||||
}
|
||||
|
||||
return rv
|
||||
}
|
||||
|
||||
// "general" encoding, check if empty
|
||||
if p.postings == nil {
|
||||
return rv
|
||||
}
|
||||
|
||||
if p.freqOffset > 0 && p.locOffset > 0 {
|
||||
// "general" encoding, so prepare the freq chunk details
|
||||
var n uint64
|
||||
var read int
|
||||
var numFreqChunks uint64
|
||||
numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
|
||||
// prepare the freq chunk details
|
||||
var n uint64
|
||||
var read int
|
||||
var numFreqChunks uint64
|
||||
numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
rv.freqChunkOffsets = make([]uint64, int(numFreqChunks))
|
||||
for i := 0; i < int(numFreqChunks); i++ {
|
||||
rv.freqChunkOffsets[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
rv.freqChunkLens = make([]uint64, int(numFreqChunks))
|
||||
for i := 0; i < int(numFreqChunks); i++ {
|
||||
rv.freqChunkLens[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
}
|
||||
rv.freqChunkStart = p.freqOffset + n
|
||||
|
||||
// prepare the loc chunk details
|
||||
n = 0
|
||||
var numLocChunks uint64
|
||||
numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
rv.locChunkLens = make([]uint64, int(numLocChunks))
|
||||
for i := 0; i < int(numLocChunks); i++ {
|
||||
rv.locChunkLens[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
}
|
||||
rv.locChunkStart = p.locOffset + n
|
||||
} else {
|
||||
// "1-hit" encoding
|
||||
rv.normBits1Hit = p.normBits1Hit
|
||||
}
|
||||
rv.freqChunkStart = p.freqOffset + n
|
||||
|
||||
// prepare the loc chunk details
|
||||
n = 0
|
||||
var numLocChunks uint64
|
||||
numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
rv.locChunkOffsets = make([]uint64, int(numLocChunks))
|
||||
for i := 0; i < int(numLocChunks); i++ {
|
||||
rv.locChunkOffsets[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
|
||||
n += uint64(read)
|
||||
}
|
||||
rv.locChunkStart = p.locOffset + n
|
||||
|
||||
rv.locBitmap = p.locBitmap
|
||||
|
||||
|
@ -201,18 +223,20 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator {
|
|||
|
||||
// Count returns the number of items on this postings list
|
||||
func (p *PostingsList) Count() uint64 {
|
||||
if p.postings != nil {
|
||||
n := p.postings.GetCardinality()
|
||||
if p.except != nil {
|
||||
e := p.except.GetCardinality()
|
||||
if e > n {
|
||||
e = n
|
||||
}
|
||||
return n - e
|
||||
}
|
||||
return n
|
||||
var n uint64
|
||||
if p.normBits1Hit != 0 {
|
||||
n = 1
|
||||
} else if p.postings != nil {
|
||||
n = p.postings.GetCardinality()
|
||||
}
|
||||
return 0
|
||||
var e uint64
|
||||
if p.except != nil {
|
||||
e = p.except.GetCardinality()
|
||||
}
|
||||
if n <= e {
|
||||
return 0
|
||||
}
|
||||
return n - e
|
||||
}
|
||||
|
||||
func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
|
||||
|
@ -242,7 +266,9 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
|
|||
|
||||
locRoaringBytes := d.sb.mem[locBitmapOffset+uint64(read) : locBitmapOffset+uint64(read)+locBitmapLen]
|
||||
|
||||
rv.locBitmap = roaring.NewBitmap()
|
||||
if rv.locBitmap == nil {
|
||||
rv.locBitmap = roaring.NewBitmap()
|
||||
}
|
||||
_, err := rv.locBitmap.FromBuffer(locRoaringBytes)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error loading roaring bitmap of locations with hits: %v", err)
|
||||
|
@ -254,7 +280,9 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
|
|||
|
||||
roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen]
|
||||
|
||||
rv.postings = roaring.NewBitmap()
|
||||
if rv.postings == nil {
|
||||
rv.postings = roaring.NewBitmap()
|
||||
}
|
||||
_, err = rv.postings.FromBuffer(roaringBytes)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error loading roaring bitmap: %v", err)
|
||||
|
@ -263,19 +291,10 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
var emptyRoaring = roaring.NewBitmap()
|
||||
|
||||
func (rv *PostingsList) init1Hit(fstVal uint64) error {
|
||||
docNum, normBits := FSTValDecode1Hit(fstVal)
|
||||
|
||||
rv.locBitmap = emptyRoaring
|
||||
|
||||
rv.postings = roaring.NewBitmap()
|
||||
rv.postings.Add(uint32(docNum))
|
||||
|
||||
// TODO: we can likely do better than allocating a roaring bitmap
|
||||
// with just 1 entry, but for now reuse existing machinery
|
||||
|
||||
rv.docNum1Hit = docNum
|
||||
rv.normBits1Hit = normBits
|
||||
|
||||
return nil
|
||||
|
@ -297,17 +316,18 @@ type PostingsIterator struct {
|
|||
locDecoder *govarint.Base128Decoder
|
||||
locReader *bytes.Reader
|
||||
|
||||
freqChunkLens []uint64
|
||||
freqChunkStart uint64
|
||||
freqChunkOffsets []uint64
|
||||
freqChunkStart uint64
|
||||
|
||||
locChunkLens []uint64
|
||||
locChunkStart uint64
|
||||
locChunkOffsets []uint64
|
||||
locChunkStart uint64
|
||||
|
||||
locBitmap *roaring.Bitmap
|
||||
|
||||
next Posting // reused across Next() calls
|
||||
nextLocs []Location // reused across Next() calls
|
||||
|
||||
docNum1Hit uint64
|
||||
normBits1Hit uint64
|
||||
|
||||
buf []byte
|
||||
|
@ -317,8 +337,8 @@ func (i *PostingsIterator) Size() int {
|
|||
sizeInBytes := reflectStaticSizePostingsIterator + size.SizeOfPtr +
|
||||
len(i.currChunkFreqNorm) +
|
||||
len(i.currChunkLoc) +
|
||||
len(i.freqChunkLens)*size.SizeOfUint64 +
|
||||
len(i.locChunkLens)*size.SizeOfUint64 +
|
||||
len(i.freqChunkOffsets)*size.SizeOfUint64 +
|
||||
len(i.locChunkOffsets)*size.SizeOfUint64 +
|
||||
i.next.Size()
|
||||
|
||||
if i.locBitmap != nil {
|
||||
|
@ -333,16 +353,14 @@ func (i *PostingsIterator) Size() int {
|
|||
}
|
||||
|
||||
func (i *PostingsIterator) loadChunk(chunk int) error {
|
||||
if chunk >= len(i.freqChunkLens) || chunk >= len(i.locChunkLens) {
|
||||
return fmt.Errorf("tried to load chunk that doesn't exist %d/(%d %d)", chunk, len(i.freqChunkLens), len(i.locChunkLens))
|
||||
if chunk >= len(i.freqChunkOffsets) || chunk >= len(i.locChunkOffsets) {
|
||||
return fmt.Errorf("tried to load chunk that doesn't exist %d/(%d %d)", chunk, len(i.freqChunkOffsets), len(i.locChunkOffsets))
|
||||
}
|
||||
|
||||
// load freq chunk bytes
|
||||
start := i.freqChunkStart
|
||||
for j := 0; j < chunk; j++ {
|
||||
start += i.freqChunkLens[j]
|
||||
}
|
||||
end := start + i.freqChunkLens[chunk]
|
||||
end, start := i.freqChunkStart, i.freqChunkStart
|
||||
s, e := readChunkBoundary(chunk, i.freqChunkOffsets)
|
||||
start += s
|
||||
end += e
|
||||
i.currChunkFreqNorm = i.postings.sb.mem[start:end]
|
||||
if i.freqNormReader == nil {
|
||||
i.freqNormReader = bytes.NewReader(i.currChunkFreqNorm)
|
||||
|
@ -351,12 +369,10 @@ func (i *PostingsIterator) loadChunk(chunk int) error {
|
|||
i.freqNormReader.Reset(i.currChunkFreqNorm)
|
||||
}
|
||||
|
||||
// load loc chunk bytes
|
||||
start = i.locChunkStart
|
||||
for j := 0; j < chunk; j++ {
|
||||
start += i.locChunkLens[j]
|
||||
}
|
||||
end = start + i.locChunkLens[chunk]
|
||||
end, start = i.locChunkStart, i.locChunkStart
|
||||
s, e = readChunkBoundary(chunk, i.locChunkOffsets)
|
||||
start += s
|
||||
end += e
|
||||
i.currChunkLoc = i.postings.sb.mem[start:end]
|
||||
if i.locReader == nil {
|
||||
i.locReader = bytes.NewReader(i.currChunkLoc)
|
||||
|
@ -424,6 +440,8 @@ func (i *PostingsIterator) readLocation(l *Location) error {
|
|||
l.end = end
|
||||
if numArrayPos > 0 {
|
||||
l.ap = make([]uint64, int(numArrayPos))
|
||||
} else {
|
||||
l.ap = l.ap[:0]
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -460,7 +478,7 @@ func (i *PostingsIterator) Next() (segment.Posting, error) {
|
|||
}
|
||||
rv.norm = math.Float32frombits(uint32(normBits))
|
||||
|
||||
if i.locBitmap.Contains(uint32(docNum)) {
|
||||
if i.locBitmap != nil && i.locBitmap.Contains(uint32(docNum)) {
|
||||
// read off 'freq' locations, into reused slices
|
||||
if cap(i.nextLocs) >= int(rv.freq) {
|
||||
i.nextLocs = i.nextLocs[0:rv.freq]
|
||||
|
@ -513,7 +531,7 @@ func (i *PostingsIterator) nextBytes() (
|
|||
endFreqNorm := len(i.currChunkFreqNorm) - i.freqNormReader.Len()
|
||||
bytesFreqNorm = i.currChunkFreqNorm[startFreqNorm:endFreqNorm]
|
||||
|
||||
if i.locBitmap.Contains(uint32(docNum)) {
|
||||
if i.locBitmap != nil && i.locBitmap.Contains(uint32(docNum)) {
|
||||
startLoc := len(i.currChunkLoc) - i.locReader.Len()
|
||||
|
||||
for j := uint64(0); j < freq; j++ {
|
||||
|
@ -533,6 +551,15 @@ func (i *PostingsIterator) nextBytes() (
|
|||
// nextDocNum returns the next docNum on the postings list, and also
|
||||
// sets up the currChunk / loc related fields of the iterator.
|
||||
func (i *PostingsIterator) nextDocNum() (uint64, bool, error) {
|
||||
if i.normBits1Hit != 0 {
|
||||
if i.docNum1Hit == docNum1HitFinished {
|
||||
return 0, false, nil
|
||||
}
|
||||
docNum := i.docNum1Hit
|
||||
i.docNum1Hit = docNum1HitFinished // consume our 1-hit docNum
|
||||
return docNum, true, nil
|
||||
}
|
||||
|
||||
if i.actual == nil || !i.actual.HasNext() {
|
||||
return 0, false, nil
|
||||
}
|
||||
|
@ -540,10 +567,6 @@ func (i *PostingsIterator) nextDocNum() (uint64, bool, error) {
|
|||
n := i.actual.Next()
|
||||
allN := i.all.Next()
|
||||
|
||||
if i.normBits1Hit != 0 {
|
||||
return uint64(n), true, nil
|
||||
}
|
||||
|
||||
nChunk := n / i.postings.sb.chunkFactor
|
||||
allNChunk := allN / i.postings.sb.chunkFactor
|
||||
|
||||
|
|
|
@ -341,15 +341,13 @@ func (s *SegmentBase) DocNumbers(ids []string) (*roaring.Bitmap, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var postings *PostingsList
|
||||
var postingsList *PostingsList
|
||||
for _, id := range ids {
|
||||
postings, err = idDict.postingsList([]byte(id), nil, postings)
|
||||
postingsList, err = idDict.postingsList([]byte(id), nil, postingsList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if postings.postings != nil {
|
||||
rv.Or(postings.postings)
|
||||
}
|
||||
postingsList.OrInto(rv)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,8 +28,8 @@ import (
|
|||
func TestOpen(t *testing.T) {
|
||||
_ = os.RemoveAll("/tmp/scorch.zap")
|
||||
|
||||
memSegment := buildMemSegment()
|
||||
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
|
||||
testSeg, _ := buildTestSegment()
|
||||
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
|
||||
if err != nil {
|
||||
t.Fatalf("error persisting segment: %v", err)
|
||||
}
|
||||
|
@ -328,8 +328,8 @@ func TestOpen(t *testing.T) {
|
|||
func TestOpenMulti(t *testing.T) {
|
||||
_ = os.RemoveAll("/tmp/scorch.zap")
|
||||
|
||||
memSegment := buildMemSegmentMulti()
|
||||
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
|
||||
testSeg, _ := buildTestSegmentMulti()
|
||||
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
|
||||
if err != nil {
|
||||
t.Fatalf("error persisting segment: %v", err)
|
||||
}
|
||||
|
@ -428,8 +428,8 @@ func TestOpenMulti(t *testing.T) {
|
|||
func TestOpenMultiWithTwoChunks(t *testing.T) {
|
||||
_ = os.RemoveAll("/tmp/scorch.zap")
|
||||
|
||||
memSegment := buildMemSegmentMulti()
|
||||
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1)
|
||||
testSeg, _ := buildTestSegmentMultiWithChunkFactor(1)
|
||||
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
|
||||
if err != nil {
|
||||
t.Fatalf("error persisting segment: %v", err)
|
||||
}
|
||||
|
@ -523,8 +523,8 @@ func TestOpenMultiWithTwoChunks(t *testing.T) {
|
|||
func TestSegmentVisitableDocValueFieldsList(t *testing.T) {
|
||||
_ = os.RemoveAll("/tmp/scorch.zap")
|
||||
|
||||
memSegment := buildMemSegmentMulti()
|
||||
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1)
|
||||
testSeg, _ := buildTestSegmentMultiWithChunkFactor(1)
|
||||
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
|
||||
if err != nil {
|
||||
t.Fatalf("error persisting segment: %v", err)
|
||||
}
|
||||
|
@ -551,8 +551,8 @@ func TestSegmentVisitableDocValueFieldsList(t *testing.T) {
|
|||
}
|
||||
_ = os.RemoveAll("/tmp/scorch.zap")
|
||||
|
||||
memSegment, expectedFields := buildMemSegmentWithDefaultFieldMapping()
|
||||
err = PersistSegment(memSegment, "/tmp/scorch.zap", 1)
|
||||
testSeg, expectedFields, _ := buildTestSegmentWithDefaultFieldMapping(1)
|
||||
err = PersistSegmentBase(testSeg, "/tmp/scorch.zap")
|
||||
if err != nil {
|
||||
t.Fatalf("error persisting segment: %v", err)
|
||||
}
|
||||
|
|
|
@ -534,7 +534,8 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
|
|||
doc, err := indexReader.Document(hit.ID)
|
||||
if err == nil && doc != nil {
|
||||
if len(req.Fields) > 0 {
|
||||
for _, f := range req.Fields {
|
||||
fieldsToLoad := deDuplicate(req.Fields)
|
||||
for _, f := range fieldsToLoad {
|
||||
for _, docF := range doc.Fields {
|
||||
if f == "*" || docF.Name() == f {
|
||||
var value interface{}
|
||||
|
@ -830,3 +831,16 @@ func (f *indexImplFieldDict) Close() error {
|
|||
}
|
||||
return f.indexReader.Close()
|
||||
}
|
||||
|
||||
// helper function to remove duplicate entries from slice of strings
|
||||
func deDuplicate(fields []string) []string {
|
||||
entries := make(map[string]struct{})
|
||||
ret := []string{}
|
||||
for _, entry := range fields {
|
||||
if _, exists := entries[entry]; !exists {
|
||||
entries[entry] = struct{}{}
|
||||
ret = append(ret, entry)
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue