0
0
Fork 0

Merge branch 'master' of https://github.com/blevesearch/bleve into loadchunk_minor

This commit is contained in:
Sreekanth Sivasankaran 2018-03-13 11:59:29 +05:30
commit 5271b582bb
13 changed files with 1033 additions and 663 deletions

View File

@ -633,14 +633,14 @@ func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) {
return 0, err
}
if len(persistedEpochs) <= NumSnapshotsToKeep {
if len(persistedEpochs) <= s.numSnapshotsToKeep {
// we need to keep everything
return 0, nil
}
// make a map of epochs to protect from deletion
protectedEpochs := make(map[uint64]struct{}, NumSnapshotsToKeep)
for _, epoch := range persistedEpochs[0:NumSnapshotsToKeep] {
protectedEpochs := make(map[uint64]struct{}, s.numSnapshotsToKeep)
for _, epoch := range persistedEpochs[0:s.numSnapshotsToKeep] {
protectedEpochs[epoch] = struct{}{}
}

View File

@ -28,7 +28,6 @@ import (
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
"github.com/blevesearch/bleve/index/scorch/segment/zap"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/registry"
@ -58,6 +57,7 @@ type Scorch struct {
nextSnapshotEpoch uint64
eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC.
ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet.
numSnapshotsToKeep int
closeCh chan struct{}
introductions chan *segmentIntroduction
@ -191,6 +191,17 @@ func (s *Scorch) openBolt() error {
}
}
s.numSnapshotsToKeep = NumSnapshotsToKeep
if v, ok := s.config["numSnapshotsToKeep"]; ok {
var t int
if t, err = parseToInteger(v); err != nil {
return fmt.Errorf("numSnapshotsToKeep parse err: %v", err)
}
if t > 0 {
s.numSnapshotsToKeep = t
}
}
return nil
}
@ -289,7 +300,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
var newSegment segment.Segment
if len(analysisResults) > 0 {
newSegment, err = zap.NewSegmentBase(mem.NewFromAnalyzedDocs(analysisResults), DefaultChunkFactor)
newSegment, err = zap.AnalysisResultsToSegmentBase(analysisResults, DefaultChunkFactor)
if err != nil {
return err
}
@ -504,3 +515,15 @@ func (s *Scorch) unmarkIneligibleForRemoval(filename string) {
func init() {
registry.RegisterIndexType(Name, NewScorch)
}
func parseToInteger(i interface{}) (int, error) {
switch v := i.(type) {
case float64:
return int(v), nil
case int:
return v, nil
default:
return 0, fmt.Errorf("expects int or float64 value")
}
}

View File

@ -16,16 +16,10 @@ package zap
import (
"bufio"
"bytes"
"encoding/binary"
"math"
"os"
"sort"
"github.com/Smerity/govarint"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
"github.com/couchbase/vellum"
"github.com/golang/snappy"
)
const version uint32 = 4
@ -82,186 +76,6 @@ func PersistSegmentBase(sb *SegmentBase, path string) error {
return nil
}
// PersistSegment takes the in-memory segment and persists it to
// the specified path in the zap file format.
func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) error {
flag := os.O_RDWR | os.O_CREATE
f, err := os.OpenFile(path, flag, 0600)
if err != nil {
return err
}
cleanup := func() {
_ = f.Close()
_ = os.Remove(path)
}
// buffer the output
br := bufio.NewWriter(f)
// wrap it for counting (tracking offsets)
cr := NewCountHashWriter(br)
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, _, err :=
persistBase(memSegment, cr, chunkFactor)
if err != nil {
cleanup()
return err
}
err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset,
chunkFactor, cr.Sum32(), cr)
if err != nil {
cleanup()
return err
}
err = br.Flush()
if err != nil {
cleanup()
return err
}
err = f.Sync()
if err != nil {
cleanup()
return err
}
err = f.Close()
if err != nil {
cleanup()
return err
}
return nil
}
func persistBase(memSegment *mem.Segment, cr *CountHashWriter, chunkFactor uint32) (
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64,
dictLocs []uint64, err error) {
docValueOffset = uint64(fieldNotUninverted)
if len(memSegment.Stored) > 0 {
storedIndexOffset, err = persistStored(memSegment, cr)
if err != nil {
return 0, 0, 0, 0, nil, err
}
freqOffsets, locOffsets, err := persistPostingDetails(memSegment, cr, chunkFactor)
if err != nil {
return 0, 0, 0, 0, nil, err
}
postingsListLocs, err := persistPostingsLocs(memSegment, cr)
if err != nil {
return 0, 0, 0, 0, nil, err
}
postingsLocs, err := persistPostingsLists(memSegment, cr, postingsListLocs, freqOffsets, locOffsets)
if err != nil {
return 0, 0, 0, 0, nil, err
}
dictLocs, err = persistDictionary(memSegment, cr, postingsLocs)
if err != nil {
return 0, 0, 0, 0, nil, err
}
docValueOffset, err = persistFieldDocValues(memSegment, cr, chunkFactor)
if err != nil {
return 0, 0, 0, 0, nil, err
}
} else {
dictLocs = make([]uint64, len(memSegment.FieldsInv))
}
fieldsIndexOffset, err = persistFields(memSegment.FieldsInv, cr, dictLocs)
if err != nil {
return 0, 0, 0, 0, nil, err
}
return uint64(len(memSegment.Stored)), storedIndexOffset, fieldsIndexOffset, docValueOffset,
dictLocs, nil
}
func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) {
var curr int
var metaBuf bytes.Buffer
var data, compressed []byte
metaEncoder := govarint.NewU64Base128Encoder(&metaBuf)
docNumOffsets := make(map[int]uint64, len(memSegment.Stored))
for docNum, storedValues := range memSegment.Stored {
if docNum != 0 {
// reset buffer if necessary
curr = 0
metaBuf.Reset()
data = data[:0]
compressed = compressed[:0]
}
st := memSegment.StoredTypes[docNum]
sp := memSegment.StoredPos[docNum]
// encode fields in order
for fieldID := range memSegment.FieldsInv {
if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok {
stf := st[uint16(fieldID)]
spf := sp[uint16(fieldID)]
var err2 error
curr, data, err2 = persistStoredFieldValues(fieldID,
storedFieldValues, stf, spf, curr, metaEncoder, data)
if err2 != nil {
return 0, err2
}
}
}
metaEncoder.Close()
metaBytes := metaBuf.Bytes()
// compress the data
compressed = snappy.Encode(compressed, data)
// record where we're about to start writing
docNumOffsets[docNum] = uint64(w.Count())
// write out the meta len and compressed data len
_, err := writeUvarints(w, uint64(len(metaBytes)), uint64(len(compressed)))
if err != nil {
return 0, err
}
// now write the meta
_, err = w.Write(metaBytes)
if err != nil {
return 0, err
}
// now write the compressed data
_, err = w.Write(compressed)
if err != nil {
return 0, err
}
}
// return value is the start of the stored index
rv := uint64(w.Count())
// now write out the stored doc index
for docNum := range memSegment.Stored {
err := binary.Write(w, binary.BigEndian, docNumOffsets[docNum])
if err != nil {
return 0, err
}
}
return rv, nil
}
func persistStoredFieldValues(fieldID int,
storedFieldValues [][]byte, stf []byte, spf [][]uint64,
curr int, metaEncoder *govarint.Base128Encoder, data []byte) (
@ -307,308 +121,6 @@ func persistStoredFieldValues(fieldID int,
return curr, data, nil
}
func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) {
freqOffsets := make([]uint64, 0, len(memSegment.Postings))
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
for postingID := range memSegment.Postings {
if postingID != 0 {
tfEncoder.Reset()
}
freqs := memSegment.Freqs[postingID]
norms := memSegment.Norms[postingID]
postingsListItr := memSegment.Postings[postingID].Iterator()
var offset int
for postingsListItr.HasNext() {
docNum := uint64(postingsListItr.Next())
// put freq & norm
err := tfEncoder.Add(docNum, freqs[offset], uint64(math.Float32bits(norms[offset])))
if err != nil {
return nil, nil, err
}
offset++
}
// record where this postings freq info starts
freqOffsets = append(freqOffsets, uint64(w.Count()))
tfEncoder.Close()
_, err := tfEncoder.Write(w)
if err != nil {
return nil, nil, err
}
}
// now do it again for the locations
locOffsets := make([]uint64, 0, len(memSegment.Postings))
locEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
for postingID := range memSegment.Postings {
if postingID != 0 {
locEncoder.Reset()
}
freqs := memSegment.Freqs[postingID]
locfields := memSegment.Locfields[postingID]
locpos := memSegment.Locpos[postingID]
locstarts := memSegment.Locstarts[postingID]
locends := memSegment.Locends[postingID]
locarraypos := memSegment.Locarraypos[postingID]
postingsListItr := memSegment.Postings[postingID].Iterator()
var offset int
var locOffset int
for postingsListItr.HasNext() {
docNum := uint64(postingsListItr.Next())
n := int(freqs[offset])
for i := 0; i < n; i++ {
if len(locfields) > 0 {
err := locEncoder.Add(docNum, uint64(locfields[locOffset]),
locpos[locOffset], locstarts[locOffset], locends[locOffset],
uint64(len(locarraypos[locOffset])))
if err != nil {
return nil, nil, err
}
// put each array position
err = locEncoder.Add(docNum, locarraypos[locOffset]...)
if err != nil {
return nil, nil, err
}
}
locOffset++
}
offset++
}
// record where this postings loc info starts
locOffsets = append(locOffsets, uint64(w.Count()))
locEncoder.Close()
_, err := locEncoder.Write(w)
if err != nil {
return nil, nil, err
}
}
return freqOffsets, locOffsets, nil
}
func persistPostingsLocs(memSegment *mem.Segment, w *CountHashWriter) (rv []uint64, err error) {
rv = make([]uint64, 0, len(memSegment.PostingsLocs))
reuseBufVarint := make([]byte, binary.MaxVarintLen64)
for postingID := range memSegment.PostingsLocs {
// record where we start this posting loc
rv = append(rv, uint64(w.Count()))
// write out the length and bitmap
_, err = writeRoaringWithLen(memSegment.PostingsLocs[postingID], w, reuseBufVarint)
if err != nil {
return nil, err
}
}
return rv, nil
}
func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter,
postingsListLocs, freqOffsets, locOffsets []uint64) (rv []uint64, err error) {
rv = make([]uint64, 0, len(memSegment.Postings))
reuseBufVarint := make([]byte, binary.MaxVarintLen64)
for postingID := range memSegment.Postings {
// record where we start this posting list
rv = append(rv, uint64(w.Count()))
// write out the term info, loc info, and loc posting list offset
_, err = writeUvarints(w, freqOffsets[postingID],
locOffsets[postingID], postingsListLocs[postingID])
if err != nil {
return nil, err
}
// write out the length and bitmap
_, err = writeRoaringWithLen(memSegment.Postings[postingID], w, reuseBufVarint)
if err != nil {
return nil, err
}
}
return rv, nil
}
func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs []uint64) ([]uint64, error) {
rv := make([]uint64, 0, len(memSegment.DictKeys))
varintBuf := make([]byte, binary.MaxVarintLen64)
var buffer bytes.Buffer
builder, err := vellum.New(&buffer, nil)
if err != nil {
return nil, err
}
for fieldID, fieldTerms := range memSegment.DictKeys {
dict := memSegment.Dicts[fieldID]
// now walk the dictionary in order of fieldTerms (already sorted)
for _, fieldTerm := range fieldTerms {
postingID := dict[fieldTerm] - 1
postingsAddr := postingsLocs[postingID]
err = builder.Insert([]byte(fieldTerm), postingsAddr)
if err != nil {
return nil, err
}
}
err = builder.Close()
if err != nil {
return nil, err
}
// record where this dictionary starts
rv = append(rv, uint64(w.Count()))
vellumData := buffer.Bytes()
// write out the length of the vellum data
n := binary.PutUvarint(varintBuf, uint64(len(vellumData)))
_, err = w.Write(varintBuf[:n])
if err != nil {
return nil, err
}
// write this vellum to disk
_, err = w.Write(vellumData)
if err != nil {
return nil, err
}
// reset buffer and vellum builder
buffer.Reset()
err = builder.Reset(&buffer)
if err != nil {
return nil, err
}
}
return rv, nil
}
type docIDRange []uint64
func (a docIDRange) Len() int { return len(a) }
func (a docIDRange) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a docIDRange) Less(i, j int) bool { return a[i] < a[j] }
func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
chunkFactor uint32) (map[uint16]uint64, error) {
fieldChunkOffsets := make(map[uint16]uint64, len(memSegment.FieldsInv))
fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
var postings *mem.PostingsList
var postingsItr *mem.PostingsIterator
for fieldID := range memSegment.DocValueFields {
field := memSegment.FieldsInv[fieldID]
docTermMap := make(map[uint64][]byte, 0)
dict, err := memSegment.Dictionary(field)
if err != nil {
return nil, err
}
dictItr := dict.Iterator()
next, err := dictItr.Next()
for err == nil && next != nil {
var err1 error
postings, err1 = dict.(*mem.Dictionary).InitPostingsList(next.Term, nil, postings)
if err1 != nil {
return nil, err1
}
postingsItr = postings.InitIterator(postingsItr)
nextPosting, err2 := postingsItr.Next()
for err2 == nil && nextPosting != nil {
docNum := nextPosting.Number()
docTermMap[docNum] = append(append(docTermMap[docNum], []byte(next.Term)...), termSeparator)
nextPosting, err2 = postingsItr.Next()
}
if err2 != nil {
return nil, err2
}
next, err = dictItr.Next()
}
if err != nil {
return nil, err
}
// sort wrt to docIDs
docNumbers := make(docIDRange, 0, len(docTermMap))
for k := range docTermMap {
docNumbers = append(docNumbers, k)
}
sort.Sort(docNumbers)
for _, docNum := range docNumbers {
err = fdvEncoder.Add(docNum, docTermMap[docNum])
if err != nil {
return nil, err
}
}
fieldChunkOffsets[fieldID] = uint64(w.Count())
err = fdvEncoder.Close()
if err != nil {
return nil, err
}
// persist the doc value details for this field
_, err = fdvEncoder.Write(w)
if err != nil {
return nil, err
}
// reseting encoder for the next field
fdvEncoder.Reset()
}
return fieldChunkOffsets, nil
}
func persistFieldDocValues(memSegment *mem.Segment, w *CountHashWriter,
chunkFactor uint32) (uint64, error) {
fieldDvOffsets, err := persistDocValues(memSegment, w, chunkFactor)
if err != nil {
return 0, err
}
fieldDocValuesOffset := uint64(w.Count())
buf := make([]byte, binary.MaxVarintLen64)
offset := uint64(0)
ok := true
for fieldID := range memSegment.FieldsInv {
// if the field isn't configured for docValue, then mark
// the offset accordingly
if offset, ok = fieldDvOffsets[uint16(fieldID)]; !ok {
offset = fieldNotUninverted
}
n := binary.PutUvarint(buf, uint64(offset))
_, err := w.Write(buf[:n])
if err != nil {
return 0, err
}
}
return fieldDocValuesOffset, nil
}
func NewSegmentBase(memSegment *mem.Segment, chunkFactor uint32) (*SegmentBase, error) {
var br bytes.Buffer
cr := NewCountHashWriter(&br)
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs, err :=
persistBase(memSegment, cr, chunkFactor)
if err != nil {
return nil, err
}
return InitSegmentBase(br.Bytes(), cr.Sum32(), chunkFactor,
memSegment.FieldsMap, memSegment.FieldsInv, numDocs,
storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs)
}
func InitSegmentBase(mem []byte, memCRC uint32, chunkFactor uint32,
fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64,
storedIndexOffset uint64, fieldsIndexOffset uint64, docValueOffset uint64,

View File

@ -21,20 +21,22 @@ import (
"github.com/blevesearch/bleve/analysis"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
)
func TestBuild(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment := buildMemSegment()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
sb, err := buildTestSegment()
if err != nil {
t.Fatal(err)
}
err = PersistSegmentBase(sb, "/tmp/scorch.zap")
if err != nil {
t.Fatal(err)
}
}
func buildMemSegment() *mem.Segment {
func buildTestSegment() (*SegmentBase, error) {
doc := &document.Document{
ID: "a",
Fields: []document.Field{
@ -120,11 +122,22 @@ func buildMemSegment() *mem.Segment {
}
}
return mem.NewFromAnalyzedDocs(results)
return AnalysisResultsToSegmentBase(results, 1024)
}
func buildMemSegmentMulti() *mem.Segment {
func buildTestSegmentMulti() (*SegmentBase, error) {
results := buildTestAnalysisResultsMulti()
return AnalysisResultsToSegmentBase(results, 1024)
}
func buildTestSegmentMultiWithChunkFactor(chunkFactor uint32) (*SegmentBase, error) {
results := buildTestAnalysisResultsMulti()
return AnalysisResultsToSegmentBase(results, chunkFactor)
}
func buildTestAnalysisResultsMulti() []*index.AnalysisResult {
doc := &document.Document{
ID: "a",
Fields: []document.Field{
@ -282,13 +295,11 @@ func buildMemSegmentMulti() *mem.Segment {
}
}
segment := mem.NewFromAnalyzedDocs(results)
return segment
return results
}
func buildMemSegmentWithDefaultFieldMapping() (*mem.Segment, []string) {
func buildTestSegmentWithDefaultFieldMapping(chunkFactor uint32) (
*SegmentBase, []string, error) {
doc := &document.Document{
ID: "a",
Fields: []document.Field{
@ -371,5 +382,7 @@ func buildMemSegmentWithDefaultFieldMapping() (*mem.Segment, []string) {
}
}
return mem.NewFromAnalyzedDocs(results), fields
sb, err := AnalysisResultsToSegmentBase(results, chunkFactor)
return sb, fields, err
}

View File

@ -68,7 +68,19 @@ func (d *Dictionary) postingsListInit(rv *PostingsList, except *roaring.Bitmap)
if rv == nil {
rv = &PostingsList{}
} else {
postings := rv.postings
if postings != nil {
postings.Clear()
}
locBitmap := rv.locBitmap
if locBitmap != nil {
locBitmap.Clear()
}
*rv = PostingsList{} // clear the struct
rv.postings = postings
rv.locBitmap = locBitmap
}
rv.sb = d.sb
rv.except = except

View File

@ -22,10 +22,9 @@ import (
"github.com/blevesearch/bleve/analysis"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
)
func buildMemSegmentForDict() *mem.Segment {
func buildTestSegmentForDict() (*SegmentBase, error) {
doc := &document.Document{
ID: "a",
Fields: []document.Field{
@ -99,17 +98,15 @@ func buildMemSegmentForDict() *mem.Segment {
},
}
segment := mem.NewFromAnalyzedDocs(results)
return segment
return AnalysisResultsToSegmentBase(results, 1024)
}
func TestDictionary(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment := buildMemSegmentForDict()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
testSeg, _ := buildTestSegmentForDict()
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}

View File

@ -183,6 +183,9 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
return nil, 0, err
}
newRoaring := roaring.NewBitmap()
newRoaringLocs := roaring.NewBitmap()
// for each field
for fieldID, fieldName := range fieldsInv {
@ -222,8 +225,8 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
var prevTerm []byte
newRoaring := roaring.NewBitmap()
newRoaringLocs := roaring.NewBitmap()
newRoaring.Clear()
newRoaringLocs.Clear()
var lastDocNum, lastFreq, lastNorm uint64
@ -248,63 +251,22 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
tfEncoder.Close()
locEncoder.Close()
termCardinality := newRoaring.GetCardinality()
postingsOffset, err := writePostings(
newRoaring, newRoaringLocs, tfEncoder, locEncoder,
use1HitEncoding, w, bufMaxVarintLen64)
if err != nil {
return err
}
encodeAs1Hit, docNum1Hit, normBits1Hit := use1HitEncoding(termCardinality)
if encodeAs1Hit {
err = newVellum.Insert(term, FSTValEncode1Hit(docNum1Hit, normBits1Hit))
if err != nil {
return err
}
} else if termCardinality > 0 {
// this field/term has hits in the new segment
freqOffset := uint64(w.Count())
_, err := tfEncoder.Write(w)
if err != nil {
return err
}
locOffset := uint64(w.Count())
_, err = locEncoder.Write(w)
if err != nil {
return err
}
postingLocOffset := uint64(w.Count())
_, err = writeRoaringWithLen(newRoaringLocs, w, bufMaxVarintLen64)
if err != nil {
return err
}
postingOffset := uint64(w.Count())
// write out the start of the term info
n := binary.PutUvarint(bufMaxVarintLen64, freqOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return err
}
// write out the start of the loc info
n = binary.PutUvarint(bufMaxVarintLen64, locOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return err
}
// write out the start of the posting locs
n = binary.PutUvarint(bufMaxVarintLen64, postingLocOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return err
}
_, err = writeRoaringWithLen(newRoaring, w, bufMaxVarintLen64)
if err != nil {
return err
}
err = newVellum.Insert(term, postingOffset)
if postingsOffset > 0 {
err = newVellum.Insert(term, postingsOffset)
if err != nil {
return err
}
}
newRoaring = roaring.NewBitmap()
newRoaringLocs = roaring.NewBitmap()
newRoaring.Clear()
newRoaringLocs.Clear()
tfEncoder.Reset()
locEncoder.Reset()
@ -460,6 +422,69 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
return rv, fieldDvLocsOffset, nil
}
func writePostings(postings, postingLocs *roaring.Bitmap,
tfEncoder, locEncoder *chunkedIntCoder,
use1HitEncoding func(uint64) (bool, uint64, uint64),
w *CountHashWriter, bufMaxVarintLen64 []byte) (
offset uint64, err error) {
termCardinality := postings.GetCardinality()
if termCardinality <= 0 {
return 0, nil
}
if use1HitEncoding != nil {
encodeAs1Hit, docNum1Hit, normBits1Hit := use1HitEncoding(termCardinality)
if encodeAs1Hit {
return FSTValEncode1Hit(docNum1Hit, normBits1Hit), nil
}
}
tfOffset := uint64(w.Count())
_, err = tfEncoder.Write(w)
if err != nil {
return 0, err
}
locOffset := uint64(w.Count())
_, err = locEncoder.Write(w)
if err != nil {
return 0, err
}
postingLocsOffset := uint64(w.Count())
_, err = writeRoaringWithLen(postingLocs, w, bufMaxVarintLen64)
if err != nil {
return 0, err
}
postingsOffset := uint64(w.Count())
n := binary.PutUvarint(bufMaxVarintLen64, tfOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return 0, err
}
n = binary.PutUvarint(bufMaxVarintLen64, locOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return 0, err
}
n = binary.PutUvarint(bufMaxVarintLen64, postingLocsOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return 0, err
}
_, err = writeRoaringWithLen(postings, w, bufMaxVarintLen64)
if err != nil {
return 0, err
}
return postingsOffset, nil
}
func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64,
w *CountHashWriter) (uint64, [][]uint64, error) {

View File

@ -26,7 +26,6 @@ import (
"github.com/blevesearch/bleve/analysis"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
)
func TestMerge(t *testing.T) {
@ -34,14 +33,14 @@ func TestMerge(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch2.zap")
_ = os.RemoveAll("/tmp/scorch3.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
testSeg, _ := buildTestSegmentMulti()
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatal(err)
}
memSegment2 := buildMemSegmentMulti2()
err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024)
testSeg2, _ := buildTestSegmentMulti2()
err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap")
if err != nil {
t.Fatal(err)
}
@ -121,8 +120,8 @@ func TestMergeWithEmptySegmentsFirst(t *testing.T) {
func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int) {
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
testSeg, _ := buildTestSegmentMulti()
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatal(err)
}
@ -148,8 +147,8 @@ func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int)
_ = os.RemoveAll("/tmp/" + fname)
emptySegment := mem.NewFromAnalyzedDocs([]*index.AnalysisResult{})
err = PersistSegment(emptySegment, "/tmp/"+fname, 1024)
emptySegment, _ := AnalysisResultsToSegmentBase([]*index.AnalysisResult{}, 1024)
err = PersistSegmentBase(emptySegment, "/tmp/"+fname)
if err != nil {
t.Fatal(err)
}
@ -462,8 +461,8 @@ func testMergeAndDrop(t *testing.T, docsToDrop []*roaring.Bitmap) {
_ = os.RemoveAll("/tmp/scorch.zap")
_ = os.RemoveAll("/tmp/scorch2.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
testSeg, _ := buildTestSegmentMulti()
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatal(err)
}
@ -478,8 +477,8 @@ func testMergeAndDrop(t *testing.T, docsToDrop []*roaring.Bitmap) {
}
}()
memSegment2 := buildMemSegmentMulti2()
err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024)
testSeg2, _ := buildTestSegmentMulti2()
err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap")
if err != nil {
t.Fatal(err)
}
@ -565,8 +564,8 @@ func testMergeWithUpdates(t *testing.T, segmentDocIds [][]string, docsToDrop []*
_ = os.RemoveAll("/tmp/" + fname)
memSegment := buildMemSegmentMultiHelper(docIds)
err := PersistSegment(memSegment, "/tmp/"+fname, 1024)
testSeg, _ := buildTestSegmentMultiHelper(docIds)
err := PersistSegmentBase(testSeg, "/tmp/"+fname)
if err != nil {
t.Fatal(err)
}
@ -616,11 +615,11 @@ func testMergeAndDropSegments(t *testing.T, segsToMerge []*Segment, docsToDrop [
testMergeWithSelf(t, segm.(*Segment), expectedNumDocs)
}
func buildMemSegmentMulti2() *mem.Segment {
return buildMemSegmentMultiHelper([]string{"c", "d"})
func buildTestSegmentMulti2() (*SegmentBase, error) {
return buildTestSegmentMultiHelper([]string{"c", "d"})
}
func buildMemSegmentMultiHelper(docIds []string) *mem.Segment {
func buildTestSegmentMultiHelper(docIds []string) (*SegmentBase, error) {
doc := &document.Document{
ID: "c",
Fields: []document.Field{
@ -778,9 +777,7 @@ func buildMemSegmentMultiHelper(docIds []string) *mem.Segment {
}
}
segment := mem.NewFromAnalyzedDocs(results)
return segment
return AnalysisResultsToSegmentBase(results, 1024)
}
func TestMergeBytesWritten(t *testing.T) {
@ -788,14 +785,14 @@ func TestMergeBytesWritten(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch2.zap")
_ = os.RemoveAll("/tmp/scorch3.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
testSeg, _ := buildTestSegmentMulti()
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatal(err)
}
memSegment2 := buildMemSegmentMulti2()
err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024)
testSeg2, _ := buildTestSegmentMulti2()
err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap")
if err != nil {
t.Fatal(err)
}

View File

@ -0,0 +1,760 @@
// Copyright (c) 2018 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package zap
import (
"bytes"
"encoding/binary"
"math"
"sort"
"sync"
"github.com/RoaringBitmap/roaring"
"github.com/Smerity/govarint"
"github.com/blevesearch/bleve/analysis"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/couchbase/vellum"
"github.com/golang/snappy"
)
// AnalysisResultsToSegmentBase produces an in-memory zap-encoded
// SegmentBase from analysis results
func AnalysisResultsToSegmentBase(results []*index.AnalysisResult,
chunkFactor uint32) (*SegmentBase, error) {
var br bytes.Buffer
s := interimPool.Get().(*interim)
s.results = results
s.chunkFactor = chunkFactor
s.w = NewCountHashWriter(&br)
storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets,
err := s.convert()
if err != nil {
return nil, err
}
sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkFactor,
s.FieldsMap, s.FieldsInv, uint64(len(results)),
storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets)
interimPool.Put(s.cleanse())
return sb, err
}
var interimPool = sync.Pool{New: func() interface{} { return &interim{} }}
// interim holds temporary working data used while converting from
// analysis results to a zap-encoded segment
type interim struct {
results []*index.AnalysisResult
chunkFactor uint32
w *CountHashWriter
// FieldsMap adds 1 to field id to avoid zero value issues
// name -> field id + 1
FieldsMap map[string]uint16
// FieldsInv is the inverse of FieldsMap
// field id -> name
FieldsInv []string
// Term dictionaries for each field
// field id -> term -> postings list id + 1
Dicts []map[string]uint64
// Terms for each field, where terms are sorted ascending
// field id -> []term
DictKeys [][]string
// Fields whose IncludeDocValues is true
// field id -> bool
IncludeDocValues []bool
// postings id -> bitmap of docNums
Postings []*roaring.Bitmap
// postings id -> bitmap of docNums that have locations
PostingsLocs []*roaring.Bitmap
// postings id -> freq/norm's, one for each docNum in postings
FreqNorms [][]interimFreqNorm
freqNormsBacking []interimFreqNorm
// postings id -> locs, one for each freq
Locs [][]interimLoc
locsBacking []interimLoc
numTermsPerPostingsList []int // key is postings list id
numLocsPerPostingsList []int // key is postings list id
buf0 bytes.Buffer
tmp0 []byte
tmp1 []byte
}
func (s *interim) cleanse() *interim {
s.results = nil
s.chunkFactor = 0
s.w = nil
s.FieldsMap = nil
s.FieldsInv = s.FieldsInv[:0]
for i := range s.Dicts {
s.Dicts[i] = nil
}
s.Dicts = s.Dicts[:0]
for i := range s.DictKeys {
s.DictKeys[i] = s.DictKeys[i][:0]
}
s.DictKeys = s.DictKeys[:0]
for i := range s.IncludeDocValues {
s.IncludeDocValues[i] = false
}
s.IncludeDocValues = s.IncludeDocValues[:0]
for _, idn := range s.Postings {
idn.Clear()
}
s.Postings = s.Postings[:0]
for _, idn := range s.PostingsLocs {
idn.Clear()
}
s.PostingsLocs = s.PostingsLocs[:0]
s.FreqNorms = s.FreqNorms[:0]
for i := range s.freqNormsBacking {
s.freqNormsBacking[i] = interimFreqNorm{}
}
s.freqNormsBacking = s.freqNormsBacking[:0]
s.Locs = s.Locs[:0]
for i := range s.locsBacking {
s.locsBacking[i] = interimLoc{}
}
s.locsBacking = s.locsBacking[:0]
s.numTermsPerPostingsList = s.numTermsPerPostingsList[:0]
s.numLocsPerPostingsList = s.numLocsPerPostingsList[:0]
s.buf0.Reset()
s.tmp0 = s.tmp0[:0]
s.tmp1 = s.tmp1[:0]
return s
}
func (s *interim) grabBuf(size int) []byte {
buf := s.tmp0
if cap(buf) < size {
buf = make([]byte, size)
s.tmp0 = buf
}
return buf[0:size]
}
type interimStoredField struct {
vals [][]byte
typs []byte
arrayposs [][]uint64 // array positions
}
type interimFreqNorm struct {
freq uint64
norm float32
}
type interimLoc struct {
fieldID uint16
pos uint64
start uint64
end uint64
arrayposs []uint64
}
func (s *interim) convert() (uint64, uint64, uint64, []uint64, error) {
s.FieldsMap = map[string]uint16{}
s.getOrDefineField("_id") // _id field is fieldID 0
for _, result := range s.results {
for _, field := range result.Document.CompositeFields {
s.getOrDefineField(field.Name())
}
for _, field := range result.Document.Fields {
s.getOrDefineField(field.Name())
}
}
sort.Strings(s.FieldsInv[1:]) // keep _id as first field
for fieldID, fieldName := range s.FieldsInv {
s.FieldsMap[fieldName] = uint16(fieldID + 1)
}
if cap(s.IncludeDocValues) >= len(s.FieldsInv) {
s.IncludeDocValues = s.IncludeDocValues[:len(s.FieldsInv)]
} else {
s.IncludeDocValues = make([]bool, len(s.FieldsInv))
}
s.prepareDicts()
for _, dict := range s.DictKeys {
sort.Strings(dict)
}
s.processDocuments()
storedIndexOffset, err := s.writeStoredFields()
if err != nil {
return 0, 0, 0, nil, err
}
var fdvIndexOffset uint64
var dictOffsets []uint64
if len(s.results) > 0 {
fdvIndexOffset, dictOffsets, err = s.writeDicts()
if err != nil {
return 0, 0, 0, nil, err
}
} else {
dictOffsets = make([]uint64, len(s.FieldsInv))
}
fieldsIndexOffset, err := persistFields(s.FieldsInv, s.w, dictOffsets)
if err != nil {
return 0, 0, 0, nil, err
}
return storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets, nil
}
func (s *interim) getOrDefineField(fieldName string) int {
fieldIDPlus1, exists := s.FieldsMap[fieldName]
if !exists {
fieldIDPlus1 = uint16(len(s.FieldsInv) + 1)
s.FieldsMap[fieldName] = fieldIDPlus1
s.FieldsInv = append(s.FieldsInv, fieldName)
s.Dicts = append(s.Dicts, make(map[string]uint64))
n := len(s.DictKeys)
if n < cap(s.DictKeys) {
s.DictKeys = s.DictKeys[:n+1]
s.DictKeys[n] = s.DictKeys[n][:0]
} else {
s.DictKeys = append(s.DictKeys, []string(nil))
}
}
return int(fieldIDPlus1 - 1)
}
// fill Dicts and DictKeys from analysis results
func (s *interim) prepareDicts() {
var pidNext int
var totTFs int
var totLocs int
visitField := func(fieldID uint16, tfs analysis.TokenFrequencies) {
dict := s.Dicts[fieldID]
dictKeys := s.DictKeys[fieldID]
for term, tf := range tfs {
pidPlus1, exists := dict[term]
if !exists {
pidNext++
pidPlus1 = uint64(pidNext)
dict[term] = pidPlus1
dictKeys = append(dictKeys, term)
s.numTermsPerPostingsList = append(s.numTermsPerPostingsList, 0)
s.numLocsPerPostingsList = append(s.numLocsPerPostingsList, 0)
}
pid := pidPlus1 - 1
s.numTermsPerPostingsList[pid] += 1
s.numLocsPerPostingsList[pid] += len(tf.Locations)
totLocs += len(tf.Locations)
}
totTFs += len(tfs)
s.DictKeys[fieldID] = dictKeys
}
for _, result := range s.results {
// walk each composite field
for _, field := range result.Document.CompositeFields {
fieldID := uint16(s.getOrDefineField(field.Name()))
_, tf := field.Analyze()
visitField(fieldID, tf)
}
// walk each field
for i, field := range result.Document.Fields {
fieldID := uint16(s.getOrDefineField(field.Name()))
tf := result.Analyzed[i]
visitField(fieldID, tf)
}
}
numPostingsLists := pidNext
if cap(s.Postings) >= numPostingsLists {
s.Postings = s.Postings[:numPostingsLists]
} else {
postings := make([]*roaring.Bitmap, numPostingsLists)
copy(postings, s.Postings[:cap(s.Postings)])
for i := 0; i < numPostingsLists; i++ {
if postings[i] == nil {
postings[i] = roaring.New()
}
}
s.Postings = postings
}
if cap(s.PostingsLocs) >= numPostingsLists {
s.PostingsLocs = s.PostingsLocs[:numPostingsLists]
} else {
postingsLocs := make([]*roaring.Bitmap, numPostingsLists)
copy(postingsLocs, s.PostingsLocs[:cap(s.PostingsLocs)])
for i := 0; i < numPostingsLists; i++ {
if postingsLocs[i] == nil {
postingsLocs[i] = roaring.New()
}
}
s.PostingsLocs = postingsLocs
}
if cap(s.FreqNorms) >= numPostingsLists {
s.FreqNorms = s.FreqNorms[:numPostingsLists]
} else {
s.FreqNorms = make([][]interimFreqNorm, numPostingsLists)
}
if cap(s.freqNormsBacking) >= totTFs {
s.freqNormsBacking = s.freqNormsBacking[:totTFs]
} else {
s.freqNormsBacking = make([]interimFreqNorm, totTFs)
}
freqNormsBacking := s.freqNormsBacking
for pid, numTerms := range s.numTermsPerPostingsList {
s.FreqNorms[pid] = freqNormsBacking[0:0]
freqNormsBacking = freqNormsBacking[numTerms:]
}
if cap(s.Locs) >= numPostingsLists {
s.Locs = s.Locs[:numPostingsLists]
} else {
s.Locs = make([][]interimLoc, numPostingsLists)
}
if cap(s.locsBacking) >= totLocs {
s.locsBacking = s.locsBacking[:totLocs]
} else {
s.locsBacking = make([]interimLoc, totLocs)
}
locsBacking := s.locsBacking
for pid, numLocs := range s.numLocsPerPostingsList {
s.Locs[pid] = locsBacking[0:0]
locsBacking = locsBacking[numLocs:]
}
}
func (s *interim) processDocuments() {
numFields := len(s.FieldsInv)
reuseFieldLens := make([]int, numFields)
reuseFieldTFs := make([]analysis.TokenFrequencies, numFields)
for docNum, result := range s.results {
for i := 0; i < numFields; i++ { // clear these for reuse
reuseFieldLens[i] = 0
reuseFieldTFs[i] = nil
}
s.processDocument(uint64(docNum), result,
reuseFieldLens, reuseFieldTFs)
}
}
func (s *interim) processDocument(docNum uint64,
result *index.AnalysisResult,
fieldLens []int, fieldTFs []analysis.TokenFrequencies) {
visitField := func(fieldID uint16, fieldName string,
ln int, tf analysis.TokenFrequencies) {
fieldLens[fieldID] += ln
existingFreqs := fieldTFs[fieldID]
if existingFreqs != nil {
existingFreqs.MergeAll(fieldName, tf)
} else {
fieldTFs[fieldID] = tf
}
}
// walk each composite field
for _, field := range result.Document.CompositeFields {
fieldID := uint16(s.getOrDefineField(field.Name()))
ln, tf := field.Analyze()
visitField(fieldID, field.Name(), ln, tf)
}
// walk each field
for i, field := range result.Document.Fields {
fieldID := uint16(s.getOrDefineField(field.Name()))
ln := result.Length[i]
tf := result.Analyzed[i]
visitField(fieldID, field.Name(), ln, tf)
}
// now that it's been rolled up into fieldTFs, walk that
for fieldID, tfs := range fieldTFs {
dict := s.Dicts[fieldID]
norm := float32(1.0 / math.Sqrt(float64(fieldLens[fieldID])))
for term, tf := range tfs {
pid := dict[term] - 1
bs := s.Postings[pid]
bs.Add(uint32(docNum))
s.FreqNorms[pid] = append(s.FreqNorms[pid],
interimFreqNorm{
freq: uint64(tf.Frequency()),
norm: norm,
})
if len(tf.Locations) > 0 {
locBS := s.PostingsLocs[pid]
locBS.Add(uint32(docNum))
locs := s.Locs[pid]
for _, loc := range tf.Locations {
var locf = uint16(fieldID)
if loc.Field != "" {
locf = uint16(s.getOrDefineField(loc.Field))
}
var arrayposs []uint64
if len(loc.ArrayPositions) > 0 {
arrayposs = loc.ArrayPositions
}
locs = append(locs, interimLoc{
fieldID: locf,
pos: uint64(loc.Position),
start: uint64(loc.Start),
end: uint64(loc.End),
arrayposs: arrayposs,
})
}
s.Locs[pid] = locs
}
}
}
}
func (s *interim) writeStoredFields() (
storedIndexOffset uint64, err error) {
metaBuf := &s.buf0
metaEncoder := govarint.NewU64Base128Encoder(metaBuf)
data, compressed := s.tmp0[:0], s.tmp1[:0]
defer func() { s.tmp0, s.tmp1 = data, compressed }()
// keyed by docNum
docStoredOffsets := make([]uint64, len(s.results))
// keyed by fieldID, for the current doc in the loop
docStoredFields := map[uint16]interimStoredField{}
for docNum, result := range s.results {
for fieldID := range docStoredFields { // reset for next doc
delete(docStoredFields, fieldID)
}
for _, field := range result.Document.Fields {
fieldID := uint16(s.getOrDefineField(field.Name()))
opts := field.Options()
if opts.IsStored() {
isf := docStoredFields[fieldID]
isf.vals = append(isf.vals, field.Value())
isf.typs = append(isf.typs, encodeFieldType(field))
isf.arrayposs = append(isf.arrayposs, field.ArrayPositions())
docStoredFields[fieldID] = isf
}
if opts.IncludeDocValues() {
s.IncludeDocValues[fieldID] = true
}
}
var curr int
metaBuf.Reset()
data = data[:0]
compressed = compressed[:0]
for fieldID := range s.FieldsInv {
isf, exists := docStoredFields[uint16(fieldID)]
if exists {
curr, data, err = persistStoredFieldValues(
fieldID, isf.vals, isf.typs, isf.arrayposs,
curr, metaEncoder, data)
if err != nil {
return 0, err
}
}
}
metaEncoder.Close()
metaBytes := metaBuf.Bytes()
compressed = snappy.Encode(compressed, data)
docStoredOffsets[docNum] = uint64(s.w.Count())
_, err := writeUvarints(s.w,
uint64(len(metaBytes)),
uint64(len(compressed)))
if err != nil {
return 0, err
}
_, err = s.w.Write(metaBytes)
if err != nil {
return 0, err
}
_, err = s.w.Write(compressed)
if err != nil {
return 0, err
}
}
storedIndexOffset = uint64(s.w.Count())
for _, docStoredOffset := range docStoredOffsets {
err = binary.Write(s.w, binary.BigEndian, docStoredOffset)
if err != nil {
return 0, err
}
}
return storedIndexOffset, nil
}
func (s *interim) writeDicts() (uint64, []uint64, error) {
dictOffsets := make([]uint64, len(s.FieldsInv))
fdvOffsets := make([]uint64, len(s.FieldsInv))
buf := s.grabBuf(binary.MaxVarintLen64)
tfEncoder := newChunkedIntCoder(uint64(s.chunkFactor), uint64(len(s.results)-1))
locEncoder := newChunkedIntCoder(uint64(s.chunkFactor), uint64(len(s.results)-1))
fdvEncoder := newChunkedContentCoder(uint64(s.chunkFactor), uint64(len(s.results)-1))
var docTermMap [][]byte
s.buf0.Reset()
builder, err := vellum.New(&s.buf0, nil)
if err != nil {
return 0, nil, err
}
for fieldID, terms := range s.DictKeys {
if cap(docTermMap) < len(s.results) {
docTermMap = make([][]byte, len(s.results))
} else {
docTermMap = docTermMap[0:len(s.results)]
for docNum := range docTermMap { // reset the docTermMap
docTermMap[docNum] = docTermMap[docNum][:0]
}
}
dict := s.Dicts[fieldID]
for _, term := range terms { // terms are already sorted
pid := dict[term] - 1
postingsBS := s.Postings[pid]
postingsLocsBS := s.PostingsLocs[pid]
freqNorms := s.FreqNorms[pid]
freqNormOffset := 0
locs := s.Locs[pid]
locOffset := 0
postingsItr := postingsBS.Iterator()
for postingsItr.HasNext() {
docNum := uint64(postingsItr.Next())
freqNorm := freqNorms[freqNormOffset]
err = tfEncoder.Add(docNum, freqNorm.freq,
uint64(math.Float32bits(freqNorm.norm)))
if err != nil {
return 0, nil, err
}
for i := uint64(0); i < freqNorm.freq; i++ {
if len(locs) > 0 {
loc := locs[locOffset]
err = locEncoder.Add(docNum, uint64(loc.fieldID),
loc.pos, loc.start, loc.end,
uint64(len(loc.arrayposs)))
if err != nil {
return 0, nil, err
}
err = locEncoder.Add(docNum, loc.arrayposs...)
if err != nil {
return 0, nil, err
}
}
locOffset++
}
freqNormOffset++
docTermMap[docNum] = append(
append(docTermMap[docNum], term...),
termSeparator)
}
tfEncoder.Close()
locEncoder.Close()
postingsOffset, err := writePostings(
postingsBS, postingsLocsBS, tfEncoder, locEncoder,
nil, s.w, buf)
if err != nil {
return 0, nil, err
}
if postingsOffset > uint64(0) {
err = builder.Insert([]byte(term), postingsOffset)
if err != nil {
return 0, nil, err
}
}
tfEncoder.Reset()
locEncoder.Reset()
}
err = builder.Close()
if err != nil {
return 0, nil, err
}
// record where this dictionary starts
dictOffsets[fieldID] = uint64(s.w.Count())
vellumData := s.buf0.Bytes()
// write out the length of the vellum data
n := binary.PutUvarint(buf, uint64(len(vellumData)))
_, err = s.w.Write(buf[:n])
if err != nil {
return 0, nil, err
}
// write this vellum to disk
_, err = s.w.Write(vellumData)
if err != nil {
return 0, nil, err
}
// reset vellum for reuse
s.buf0.Reset()
err = builder.Reset(&s.buf0)
if err != nil {
return 0, nil, err
}
// write the field doc values
if s.IncludeDocValues[fieldID] {
for docNum, docTerms := range docTermMap {
if len(docTerms) > 0 {
err = fdvEncoder.Add(uint64(docNum), docTerms)
if err != nil {
return 0, nil, err
}
}
}
err = fdvEncoder.Close()
if err != nil {
return 0, nil, err
}
fdvOffsets[fieldID] = uint64(s.w.Count())
_, err = fdvEncoder.Write(s.w)
if err != nil {
return 0, nil, err
}
fdvEncoder.Reset()
} else {
fdvOffsets[fieldID] = fieldNotUninverted
}
}
fdvIndexOffset := uint64(s.w.Count())
for _, fdvOffset := range fdvOffsets {
n := binary.PutUvarint(buf, fdvOffset)
_, err := s.w.Write(buf[:n])
if err != nil {
return 0, nil, err
}
}
return fdvIndexOffset, dictOffsets, nil
}
func encodeFieldType(f document.Field) byte {
fieldType := byte('x')
switch f.(type) {
case *document.TextField:
fieldType = 't'
case *document.NumericField:
fieldType = 'n'
case *document.DateTimeField:
fieldType = 'd'
case *document.BooleanField:
fieldType = 'b'
case *document.GeoPointField:
fieldType = 'g'
case *document.CompositeField:
fieldType = 'c'
}
return fieldType
}

View File

@ -92,6 +92,8 @@ func under32Bits(x uint64) bool {
return x <= mask31Bits
}
const docNum1HitFinished = math.MaxUint64
// PostingsList is an in-memory represenation of a postings list
type PostingsList struct {
sb *SegmentBase
@ -102,8 +104,9 @@ type PostingsList struct {
postings *roaring.Bitmap
except *roaring.Bitmap
// when postingsOffset == freqOffset == 0, then the postings list
// represents a "1-hit" encoding, and has the following norm
// when normBits1Hit != 0, then this postings list came from a
// 1-hit encoding, and only the docNum1Hit & normBits1Hit apply
docNum1Hit uint64
normBits1Hit uint64
}
@ -117,6 +120,17 @@ func (p *PostingsList) Size() int {
return sizeInBytes
}
func (p *PostingsList) OrInto(receiver *roaring.Bitmap) {
if p.normBits1Hit != 0 {
receiver.Add(uint32(p.docNum1Hit))
return
}
if p.postings != nil {
receiver.Or(p.postings)
}
}
// Iterator returns an iterator for this postings list
func (p *PostingsList) Iterator() segment.PostingsIterator {
return p.iterator(nil)
@ -152,39 +166,47 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator {
}
rv.postings = p
if p.normBits1Hit != 0 {
// "1-hit" encoding
rv.docNum1Hit = p.docNum1Hit
rv.normBits1Hit = p.normBits1Hit
if p.except != nil && p.except.Contains(uint32(rv.docNum1Hit)) {
rv.docNum1Hit = docNum1HitFinished
}
return rv
}
// "general" encoding, check if empty
if p.postings == nil {
return rv
}
if p.freqOffset > 0 && p.locOffset > 0 {
// "general" encoding, so prepare the freq chunk details
var n uint64
var read int
var numFreqChunks uint64
numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
// prepare the freq chunk details
var n uint64
var read int
var numFreqChunks uint64
numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
n += uint64(read)
rv.freqChunkLens = make([]uint64, int(numFreqChunks))
for i := 0; i < int(numFreqChunks); i++ {
rv.freqChunkLens[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
n += uint64(read)
rv.freqChunkOffsets = make([]uint64, int(numFreqChunks))
for i := 0; i < int(numFreqChunks); i++ {
rv.freqChunkOffsets[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
n += uint64(read)
}
rv.freqChunkStart = p.freqOffset + n
// prepare the loc chunk details
n = 0
var numLocChunks uint64
numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
n += uint64(read)
rv.locChunkOffsets = make([]uint64, int(numLocChunks))
for i := 0; i < int(numLocChunks); i++ {
rv.locChunkOffsets[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
n += uint64(read)
}
rv.locChunkStart = p.locOffset + n
} else {
// "1-hit" encoding
rv.normBits1Hit = p.normBits1Hit
}
rv.freqChunkStart = p.freqOffset + n
// prepare the loc chunk details
n = 0
var numLocChunks uint64
numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
n += uint64(read)
rv.locChunkLens = make([]uint64, int(numLocChunks))
for i := 0; i < int(numLocChunks); i++ {
rv.locChunkLens[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
n += uint64(read)
}
rv.locChunkStart = p.locOffset + n
rv.locBitmap = p.locBitmap
@ -201,18 +223,20 @@ func (p *PostingsList) iterator(rv *PostingsIterator) *PostingsIterator {
// Count returns the number of items on this postings list
func (p *PostingsList) Count() uint64 {
if p.postings != nil {
n := p.postings.GetCardinality()
if p.except != nil {
e := p.except.GetCardinality()
if e > n {
e = n
}
return n - e
}
return n
var n uint64
if p.normBits1Hit != 0 {
n = 1
} else if p.postings != nil {
n = p.postings.GetCardinality()
}
return 0
var e uint64
if p.except != nil {
e = p.except.GetCardinality()
}
if n <= e {
return 0
}
return n - e
}
func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
@ -242,7 +266,9 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
locRoaringBytes := d.sb.mem[locBitmapOffset+uint64(read) : locBitmapOffset+uint64(read)+locBitmapLen]
rv.locBitmap = roaring.NewBitmap()
if rv.locBitmap == nil {
rv.locBitmap = roaring.NewBitmap()
}
_, err := rv.locBitmap.FromBuffer(locRoaringBytes)
if err != nil {
return fmt.Errorf("error loading roaring bitmap of locations with hits: %v", err)
@ -254,7 +280,9 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen]
rv.postings = roaring.NewBitmap()
if rv.postings == nil {
rv.postings = roaring.NewBitmap()
}
_, err = rv.postings.FromBuffer(roaringBytes)
if err != nil {
return fmt.Errorf("error loading roaring bitmap: %v", err)
@ -263,19 +291,10 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
return nil
}
var emptyRoaring = roaring.NewBitmap()
func (rv *PostingsList) init1Hit(fstVal uint64) error {
docNum, normBits := FSTValDecode1Hit(fstVal)
rv.locBitmap = emptyRoaring
rv.postings = roaring.NewBitmap()
rv.postings.Add(uint32(docNum))
// TODO: we can likely do better than allocating a roaring bitmap
// with just 1 entry, but for now reuse existing machinery
rv.docNum1Hit = docNum
rv.normBits1Hit = normBits
return nil
@ -308,6 +327,7 @@ type PostingsIterator struct {
next Posting // reused across Next() calls
nextLocs []Location // reused across Next() calls
docNum1Hit uint64
normBits1Hit uint64
buf []byte
@ -456,7 +476,7 @@ func (i *PostingsIterator) Next() (segment.Posting, error) {
}
rv.norm = math.Float32frombits(uint32(normBits))
if i.locBitmap.Contains(uint32(docNum)) {
if i.locBitmap != nil && i.locBitmap.Contains(uint32(docNum)) {
// read off 'freq' locations, into reused slices
if cap(i.nextLocs) >= int(rv.freq) {
i.nextLocs = i.nextLocs[0:rv.freq]
@ -509,7 +529,7 @@ func (i *PostingsIterator) nextBytes() (
endFreqNorm := len(i.currChunkFreqNorm) - i.freqNormReader.Len()
bytesFreqNorm = i.currChunkFreqNorm[startFreqNorm:endFreqNorm]
if i.locBitmap.Contains(uint32(docNum)) {
if i.locBitmap != nil && i.locBitmap.Contains(uint32(docNum)) {
startLoc := len(i.currChunkLoc) - i.locReader.Len()
for j := uint64(0); j < freq; j++ {
@ -529,6 +549,15 @@ func (i *PostingsIterator) nextBytes() (
// nextDocNum returns the next docNum on the postings list, and also
// sets up the currChunk / loc related fields of the iterator.
func (i *PostingsIterator) nextDocNum() (uint64, bool, error) {
if i.normBits1Hit != 0 {
if i.docNum1Hit == docNum1HitFinished {
return 0, false, nil
}
docNum := i.docNum1Hit
i.docNum1Hit = docNum1HitFinished // consume our 1-hit docNum
return docNum, true, nil
}
if i.actual == nil || !i.actual.HasNext() {
return 0, false, nil
}
@ -536,10 +565,6 @@ func (i *PostingsIterator) nextDocNum() (uint64, bool, error) {
n := i.actual.Next()
allN := i.all.Next()
if i.normBits1Hit != 0 {
return uint64(n), true, nil
}
nChunk := n / i.postings.sb.chunkFactor
allNChunk := allN / i.postings.sb.chunkFactor

View File

@ -341,15 +341,13 @@ func (s *SegmentBase) DocNumbers(ids []string) (*roaring.Bitmap, error) {
return nil, err
}
var postings *PostingsList
var postingsList *PostingsList
for _, id := range ids {
postings, err = idDict.postingsList([]byte(id), nil, postings)
postingsList, err = idDict.postingsList([]byte(id), nil, postingsList)
if err != nil {
return nil, err
}
if postings.postings != nil {
rv.Or(postings.postings)
}
postingsList.OrInto(rv)
}
}

View File

@ -28,8 +28,8 @@ import (
func TestOpen(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment := buildMemSegment()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
testSeg, _ := buildTestSegment()
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}
@ -328,8 +328,8 @@ func TestOpen(t *testing.T) {
func TestOpenMulti(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
testSeg, _ := buildTestSegmentMulti()
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}
@ -428,8 +428,8 @@ func TestOpenMulti(t *testing.T) {
func TestOpenMultiWithTwoChunks(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1)
testSeg, _ := buildTestSegmentMultiWithChunkFactor(1)
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}
@ -523,8 +523,8 @@ func TestOpenMultiWithTwoChunks(t *testing.T) {
func TestSegmentVisitableDocValueFieldsList(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1)
testSeg, _ := buildTestSegmentMultiWithChunkFactor(1)
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}
@ -551,8 +551,8 @@ func TestSegmentVisitableDocValueFieldsList(t *testing.T) {
}
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment, expectedFields := buildMemSegmentWithDefaultFieldMapping()
err = PersistSegment(memSegment, "/tmp/scorch.zap", 1)
testSeg, expectedFields, _ := buildTestSegmentWithDefaultFieldMapping(1)
err = PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}

8
vendor/manifest vendored
View File

@ -74,6 +74,14 @@
"branch": "master",
"notests": true
},
{
"importpath": "github.com/RoaringBitmap/roaring",
"repository": "https://github.com/RoaringBitmap/roaring",
"vcs": "",
"revision": "01d244c43a7e8d1191a4f369f5908ea9eb9bc9ac",
"branch": "master",
"notests": true
},
{
"importpath": "github.com/seiflotfy/cuckoofilter",
"repository": "https://github.com/seiflotfy/cuckoofilter",