0
0
Fork 0

Merge pull request #817 from steveyen/zap-no-longer-uses-mem-segment

scorch zap no longer uses mem segment
This commit is contained in:
Steve Yen 2018-03-12 07:54:10 -07:00 committed by GitHub
commit 6df6a036d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 792 additions and 593 deletions

View File

@ -28,7 +28,6 @@ import (
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
"github.com/blevesearch/bleve/index/scorch/segment/zap"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/registry"
@ -289,7 +288,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
var newSegment segment.Segment
if len(analysisResults) > 0 {
newSegment, err = zap.NewSegmentBase(mem.NewFromAnalyzedDocs(analysisResults), DefaultChunkFactor)
newSegment, err = zap.AnalysisResultsToSegmentBase(analysisResults, DefaultChunkFactor)
if err != nil {
return err
}

View File

@ -16,16 +16,10 @@ package zap
import (
"bufio"
"bytes"
"encoding/binary"
"math"
"os"
"sort"
"github.com/Smerity/govarint"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
"github.com/couchbase/vellum"
"github.com/golang/snappy"
)
const version uint32 = 4
@ -82,186 +76,6 @@ func PersistSegmentBase(sb *SegmentBase, path string) error {
return nil
}
// PersistSegment takes the in-memory segment and persists it to
// the specified path in the zap file format.
func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) error {
flag := os.O_RDWR | os.O_CREATE
f, err := os.OpenFile(path, flag, 0600)
if err != nil {
return err
}
cleanup := func() {
_ = f.Close()
_ = os.Remove(path)
}
// buffer the output
br := bufio.NewWriter(f)
// wrap it for counting (tracking offsets)
cr := NewCountHashWriter(br)
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, _, err :=
persistBase(memSegment, cr, chunkFactor)
if err != nil {
cleanup()
return err
}
err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset,
chunkFactor, cr.Sum32(), cr)
if err != nil {
cleanup()
return err
}
err = br.Flush()
if err != nil {
cleanup()
return err
}
err = f.Sync()
if err != nil {
cleanup()
return err
}
err = f.Close()
if err != nil {
cleanup()
return err
}
return nil
}
func persistBase(memSegment *mem.Segment, cr *CountHashWriter, chunkFactor uint32) (
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64,
dictLocs []uint64, err error) {
docValueOffset = uint64(fieldNotUninverted)
if len(memSegment.Stored) > 0 {
storedIndexOffset, err = persistStored(memSegment, cr)
if err != nil {
return 0, 0, 0, 0, nil, err
}
freqOffsets, locOffsets, err := persistPostingDetails(memSegment, cr, chunkFactor)
if err != nil {
return 0, 0, 0, 0, nil, err
}
postingsListLocs, err := persistPostingsLocs(memSegment, cr)
if err != nil {
return 0, 0, 0, 0, nil, err
}
postingsLocs, err := persistPostingsLists(memSegment, cr, postingsListLocs, freqOffsets, locOffsets)
if err != nil {
return 0, 0, 0, 0, nil, err
}
dictLocs, err = persistDictionary(memSegment, cr, postingsLocs)
if err != nil {
return 0, 0, 0, 0, nil, err
}
docValueOffset, err = persistFieldDocValues(memSegment, cr, chunkFactor)
if err != nil {
return 0, 0, 0, 0, nil, err
}
} else {
dictLocs = make([]uint64, len(memSegment.FieldsInv))
}
fieldsIndexOffset, err = persistFields(memSegment.FieldsInv, cr, dictLocs)
if err != nil {
return 0, 0, 0, 0, nil, err
}
return uint64(len(memSegment.Stored)), storedIndexOffset, fieldsIndexOffset, docValueOffset,
dictLocs, nil
}
func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) {
var curr int
var metaBuf bytes.Buffer
var data, compressed []byte
metaEncoder := govarint.NewU64Base128Encoder(&metaBuf)
docNumOffsets := make(map[int]uint64, len(memSegment.Stored))
for docNum, storedValues := range memSegment.Stored {
if docNum != 0 {
// reset buffer if necessary
curr = 0
metaBuf.Reset()
data = data[:0]
compressed = compressed[:0]
}
st := memSegment.StoredTypes[docNum]
sp := memSegment.StoredPos[docNum]
// encode fields in order
for fieldID := range memSegment.FieldsInv {
if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok {
stf := st[uint16(fieldID)]
spf := sp[uint16(fieldID)]
var err2 error
curr, data, err2 = persistStoredFieldValues(fieldID,
storedFieldValues, stf, spf, curr, metaEncoder, data)
if err2 != nil {
return 0, err2
}
}
}
metaEncoder.Close()
metaBytes := metaBuf.Bytes()
// compress the data
compressed = snappy.Encode(compressed, data)
// record where we're about to start writing
docNumOffsets[docNum] = uint64(w.Count())
// write out the meta len and compressed data len
_, err := writeUvarints(w, uint64(len(metaBytes)), uint64(len(compressed)))
if err != nil {
return 0, err
}
// now write the meta
_, err = w.Write(metaBytes)
if err != nil {
return 0, err
}
// now write the compressed data
_, err = w.Write(compressed)
if err != nil {
return 0, err
}
}
// return value is the start of the stored index
rv := uint64(w.Count())
// now write out the stored doc index
for docNum := range memSegment.Stored {
err := binary.Write(w, binary.BigEndian, docNumOffsets[docNum])
if err != nil {
return 0, err
}
}
return rv, nil
}
func persistStoredFieldValues(fieldID int,
storedFieldValues [][]byte, stf []byte, spf [][]uint64,
curr int, metaEncoder *govarint.Base128Encoder, data []byte) (
@ -307,308 +121,6 @@ func persistStoredFieldValues(fieldID int,
return curr, data, nil
}
func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFactor uint32) ([]uint64, []uint64, error) {
freqOffsets := make([]uint64, 0, len(memSegment.Postings))
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
for postingID := range memSegment.Postings {
if postingID != 0 {
tfEncoder.Reset()
}
freqs := memSegment.Freqs[postingID]
norms := memSegment.Norms[postingID]
postingsListItr := memSegment.Postings[postingID].Iterator()
var offset int
for postingsListItr.HasNext() {
docNum := uint64(postingsListItr.Next())
// put freq & norm
err := tfEncoder.Add(docNum, freqs[offset], uint64(math.Float32bits(norms[offset])))
if err != nil {
return nil, nil, err
}
offset++
}
// record where this postings freq info starts
freqOffsets = append(freqOffsets, uint64(w.Count()))
tfEncoder.Close()
_, err := tfEncoder.Write(w)
if err != nil {
return nil, nil, err
}
}
// now do it again for the locations
locOffsets := make([]uint64, 0, len(memSegment.Postings))
locEncoder := newChunkedIntCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
for postingID := range memSegment.Postings {
if postingID != 0 {
locEncoder.Reset()
}
freqs := memSegment.Freqs[postingID]
locfields := memSegment.Locfields[postingID]
locpos := memSegment.Locpos[postingID]
locstarts := memSegment.Locstarts[postingID]
locends := memSegment.Locends[postingID]
locarraypos := memSegment.Locarraypos[postingID]
postingsListItr := memSegment.Postings[postingID].Iterator()
var offset int
var locOffset int
for postingsListItr.HasNext() {
docNum := uint64(postingsListItr.Next())
n := int(freqs[offset])
for i := 0; i < n; i++ {
if len(locfields) > 0 {
err := locEncoder.Add(docNum, uint64(locfields[locOffset]),
locpos[locOffset], locstarts[locOffset], locends[locOffset],
uint64(len(locarraypos[locOffset])))
if err != nil {
return nil, nil, err
}
// put each array position
err = locEncoder.Add(docNum, locarraypos[locOffset]...)
if err != nil {
return nil, nil, err
}
}
locOffset++
}
offset++
}
// record where this postings loc info starts
locOffsets = append(locOffsets, uint64(w.Count()))
locEncoder.Close()
_, err := locEncoder.Write(w)
if err != nil {
return nil, nil, err
}
}
return freqOffsets, locOffsets, nil
}
func persistPostingsLocs(memSegment *mem.Segment, w *CountHashWriter) (rv []uint64, err error) {
rv = make([]uint64, 0, len(memSegment.PostingsLocs))
reuseBufVarint := make([]byte, binary.MaxVarintLen64)
for postingID := range memSegment.PostingsLocs {
// record where we start this posting loc
rv = append(rv, uint64(w.Count()))
// write out the length and bitmap
_, err = writeRoaringWithLen(memSegment.PostingsLocs[postingID], w, reuseBufVarint)
if err != nil {
return nil, err
}
}
return rv, nil
}
func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter,
postingsListLocs, freqOffsets, locOffsets []uint64) (rv []uint64, err error) {
rv = make([]uint64, 0, len(memSegment.Postings))
reuseBufVarint := make([]byte, binary.MaxVarintLen64)
for postingID := range memSegment.Postings {
// record where we start this posting list
rv = append(rv, uint64(w.Count()))
// write out the term info, loc info, and loc posting list offset
_, err = writeUvarints(w, freqOffsets[postingID],
locOffsets[postingID], postingsListLocs[postingID])
if err != nil {
return nil, err
}
// write out the length and bitmap
_, err = writeRoaringWithLen(memSegment.Postings[postingID], w, reuseBufVarint)
if err != nil {
return nil, err
}
}
return rv, nil
}
func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs []uint64) ([]uint64, error) {
rv := make([]uint64, 0, len(memSegment.DictKeys))
varintBuf := make([]byte, binary.MaxVarintLen64)
var buffer bytes.Buffer
builder, err := vellum.New(&buffer, nil)
if err != nil {
return nil, err
}
for fieldID, fieldTerms := range memSegment.DictKeys {
dict := memSegment.Dicts[fieldID]
// now walk the dictionary in order of fieldTerms (already sorted)
for _, fieldTerm := range fieldTerms {
postingID := dict[fieldTerm] - 1
postingsAddr := postingsLocs[postingID]
err = builder.Insert([]byte(fieldTerm), postingsAddr)
if err != nil {
return nil, err
}
}
err = builder.Close()
if err != nil {
return nil, err
}
// record where this dictionary starts
rv = append(rv, uint64(w.Count()))
vellumData := buffer.Bytes()
// write out the length of the vellum data
n := binary.PutUvarint(varintBuf, uint64(len(vellumData)))
_, err = w.Write(varintBuf[:n])
if err != nil {
return nil, err
}
// write this vellum to disk
_, err = w.Write(vellumData)
if err != nil {
return nil, err
}
// reset buffer and vellum builder
buffer.Reset()
err = builder.Reset(&buffer)
if err != nil {
return nil, err
}
}
return rv, nil
}
type docIDRange []uint64
func (a docIDRange) Len() int { return len(a) }
func (a docIDRange) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a docIDRange) Less(i, j int) bool { return a[i] < a[j] }
func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
chunkFactor uint32) (map[uint16]uint64, error) {
fieldChunkOffsets := make(map[uint16]uint64, len(memSegment.FieldsInv))
fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
var postings *mem.PostingsList
var postingsItr *mem.PostingsIterator
for fieldID := range memSegment.DocValueFields {
field := memSegment.FieldsInv[fieldID]
docTermMap := make(map[uint64][]byte, 0)
dict, err := memSegment.Dictionary(field)
if err != nil {
return nil, err
}
dictItr := dict.Iterator()
next, err := dictItr.Next()
for err == nil && next != nil {
var err1 error
postings, err1 = dict.(*mem.Dictionary).InitPostingsList(next.Term, nil, postings)
if err1 != nil {
return nil, err1
}
postingsItr = postings.InitIterator(postingsItr)
nextPosting, err2 := postingsItr.Next()
for err2 == nil && nextPosting != nil {
docNum := nextPosting.Number()
docTermMap[docNum] = append(append(docTermMap[docNum], []byte(next.Term)...), termSeparator)
nextPosting, err2 = postingsItr.Next()
}
if err2 != nil {
return nil, err2
}
next, err = dictItr.Next()
}
if err != nil {
return nil, err
}
// sort wrt to docIDs
docNumbers := make(docIDRange, 0, len(docTermMap))
for k := range docTermMap {
docNumbers = append(docNumbers, k)
}
sort.Sort(docNumbers)
for _, docNum := range docNumbers {
err = fdvEncoder.Add(docNum, docTermMap[docNum])
if err != nil {
return nil, err
}
}
fieldChunkOffsets[fieldID] = uint64(w.Count())
err = fdvEncoder.Close()
if err != nil {
return nil, err
}
// persist the doc value details for this field
_, err = fdvEncoder.Write(w)
if err != nil {
return nil, err
}
// reseting encoder for the next field
fdvEncoder.Reset()
}
return fieldChunkOffsets, nil
}
func persistFieldDocValues(memSegment *mem.Segment, w *CountHashWriter,
chunkFactor uint32) (uint64, error) {
fieldDvOffsets, err := persistDocValues(memSegment, w, chunkFactor)
if err != nil {
return 0, err
}
fieldDocValuesOffset := uint64(w.Count())
buf := make([]byte, binary.MaxVarintLen64)
offset := uint64(0)
ok := true
for fieldID := range memSegment.FieldsInv {
// if the field isn't configured for docValue, then mark
// the offset accordingly
if offset, ok = fieldDvOffsets[uint16(fieldID)]; !ok {
offset = fieldNotUninverted
}
n := binary.PutUvarint(buf, uint64(offset))
_, err := w.Write(buf[:n])
if err != nil {
return 0, err
}
}
return fieldDocValuesOffset, nil
}
func NewSegmentBase(memSegment *mem.Segment, chunkFactor uint32) (*SegmentBase, error) {
var br bytes.Buffer
cr := NewCountHashWriter(&br)
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs, err :=
persistBase(memSegment, cr, chunkFactor)
if err != nil {
return nil, err
}
return InitSegmentBase(br.Bytes(), cr.Sum32(), chunkFactor,
memSegment.FieldsMap, memSegment.FieldsInv, numDocs,
storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs)
}
func InitSegmentBase(mem []byte, memCRC uint32, chunkFactor uint32,
fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64,
storedIndexOffset uint64, fieldsIndexOffset uint64, docValueOffset uint64,

View File

@ -21,20 +21,22 @@ import (
"github.com/blevesearch/bleve/analysis"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
)
func TestBuild(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment := buildMemSegment()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
sb, err := buildTestSegment()
if err != nil {
t.Fatal(err)
}
err = PersistSegmentBase(sb, "/tmp/scorch.zap")
if err != nil {
t.Fatal(err)
}
}
func buildMemSegment() *mem.Segment {
func buildTestSegment() (*SegmentBase, error) {
doc := &document.Document{
ID: "a",
Fields: []document.Field{
@ -120,11 +122,22 @@ func buildMemSegment() *mem.Segment {
}
}
return mem.NewFromAnalyzedDocs(results)
return AnalysisResultsToSegmentBase(results, 1024)
}
func buildMemSegmentMulti() *mem.Segment {
func buildTestSegmentMulti() (*SegmentBase, error) {
results := buildTestAnalysisResultsMulti()
return AnalysisResultsToSegmentBase(results, 1024)
}
func buildTestSegmentMultiWithChunkFactor(chunkFactor uint32) (*SegmentBase, error) {
results := buildTestAnalysisResultsMulti()
return AnalysisResultsToSegmentBase(results, chunkFactor)
}
func buildTestAnalysisResultsMulti() []*index.AnalysisResult {
doc := &document.Document{
ID: "a",
Fields: []document.Field{
@ -282,13 +295,11 @@ func buildMemSegmentMulti() *mem.Segment {
}
}
segment := mem.NewFromAnalyzedDocs(results)
return segment
return results
}
func buildMemSegmentWithDefaultFieldMapping() (*mem.Segment, []string) {
func buildTestSegmentWithDefaultFieldMapping(chunkFactor uint32) (
*SegmentBase, []string, error) {
doc := &document.Document{
ID: "a",
Fields: []document.Field{
@ -371,5 +382,7 @@ func buildMemSegmentWithDefaultFieldMapping() (*mem.Segment, []string) {
}
}
return mem.NewFromAnalyzedDocs(results), fields
sb, err := AnalysisResultsToSegmentBase(results, chunkFactor)
return sb, fields, err
}

View File

@ -22,10 +22,9 @@ import (
"github.com/blevesearch/bleve/analysis"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
)
func buildMemSegmentForDict() *mem.Segment {
func buildTestSegmentForDict() (*SegmentBase, error) {
doc := &document.Document{
ID: "a",
Fields: []document.Field{
@ -99,17 +98,15 @@ func buildMemSegmentForDict() *mem.Segment {
},
}
segment := mem.NewFromAnalyzedDocs(results)
return segment
return AnalysisResultsToSegmentBase(results, 1024)
}
func TestDictionary(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment := buildMemSegmentForDict()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
testSeg, _ := buildTestSegmentForDict()
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}

View File

@ -248,56 +248,15 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
tfEncoder.Close()
locEncoder.Close()
termCardinality := newRoaring.GetCardinality()
postingsOffset, err := writePostings(
newRoaring, newRoaringLocs, tfEncoder, locEncoder,
use1HitEncoding, w, bufMaxVarintLen64)
if err != nil {
return err
}
encodeAs1Hit, docNum1Hit, normBits1Hit := use1HitEncoding(termCardinality)
if encodeAs1Hit {
err = newVellum.Insert(term, FSTValEncode1Hit(docNum1Hit, normBits1Hit))
if err != nil {
return err
}
} else if termCardinality > 0 {
// this field/term has hits in the new segment
freqOffset := uint64(w.Count())
_, err := tfEncoder.Write(w)
if err != nil {
return err
}
locOffset := uint64(w.Count())
_, err = locEncoder.Write(w)
if err != nil {
return err
}
postingLocOffset := uint64(w.Count())
_, err = writeRoaringWithLen(newRoaringLocs, w, bufMaxVarintLen64)
if err != nil {
return err
}
postingOffset := uint64(w.Count())
// write out the start of the term info
n := binary.PutUvarint(bufMaxVarintLen64, freqOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return err
}
// write out the start of the loc info
n = binary.PutUvarint(bufMaxVarintLen64, locOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return err
}
// write out the start of the posting locs
n = binary.PutUvarint(bufMaxVarintLen64, postingLocOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return err
}
_, err = writeRoaringWithLen(newRoaring, w, bufMaxVarintLen64)
if err != nil {
return err
}
err = newVellum.Insert(term, postingOffset)
if postingsOffset > 0 {
err = newVellum.Insert(term, postingsOffset)
if err != nil {
return err
}
@ -460,6 +419,69 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
return rv, fieldDvLocsOffset, nil
}
func writePostings(postings, postingLocs *roaring.Bitmap,
tfEncoder, locEncoder *chunkedIntCoder,
use1HitEncoding func(uint64) (bool, uint64, uint64),
w *CountHashWriter, bufMaxVarintLen64 []byte) (
offset uint64, err error) {
termCardinality := postings.GetCardinality()
if termCardinality <= 0 {
return 0, nil
}
if use1HitEncoding != nil {
encodeAs1Hit, docNum1Hit, normBits1Hit := use1HitEncoding(termCardinality)
if encodeAs1Hit {
return FSTValEncode1Hit(docNum1Hit, normBits1Hit), nil
}
}
tfOffset := uint64(w.Count())
_, err = tfEncoder.Write(w)
if err != nil {
return 0, err
}
locOffset := uint64(w.Count())
_, err = locEncoder.Write(w)
if err != nil {
return 0, err
}
postingLocsOffset := uint64(w.Count())
_, err = writeRoaringWithLen(postingLocs, w, bufMaxVarintLen64)
if err != nil {
return 0, err
}
postingsOffset := uint64(w.Count())
n := binary.PutUvarint(bufMaxVarintLen64, tfOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return 0, err
}
n = binary.PutUvarint(bufMaxVarintLen64, locOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return 0, err
}
n = binary.PutUvarint(bufMaxVarintLen64, postingLocsOffset)
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return 0, err
}
_, err = writeRoaringWithLen(postings, w, bufMaxVarintLen64)
if err != nil {
return 0, err
}
return postingsOffset, nil
}
func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64,
w *CountHashWriter) (uint64, [][]uint64, error) {

View File

@ -26,7 +26,6 @@ import (
"github.com/blevesearch/bleve/analysis"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
)
func TestMerge(t *testing.T) {
@ -34,14 +33,14 @@ func TestMerge(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch2.zap")
_ = os.RemoveAll("/tmp/scorch3.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
testSeg, _ := buildTestSegmentMulti()
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatal(err)
}
memSegment2 := buildMemSegmentMulti2()
err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024)
testSeg2, _ := buildTestSegmentMulti2()
err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap")
if err != nil {
t.Fatal(err)
}
@ -121,8 +120,8 @@ func TestMergeWithEmptySegmentsFirst(t *testing.T) {
func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int) {
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
testSeg, _ := buildTestSegmentMulti()
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatal(err)
}
@ -148,8 +147,8 @@ func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int)
_ = os.RemoveAll("/tmp/" + fname)
emptySegment := mem.NewFromAnalyzedDocs([]*index.AnalysisResult{})
err = PersistSegment(emptySegment, "/tmp/"+fname, 1024)
emptySegment, _ := AnalysisResultsToSegmentBase([]*index.AnalysisResult{}, 1024)
err = PersistSegmentBase(emptySegment, "/tmp/"+fname)
if err != nil {
t.Fatal(err)
}
@ -462,8 +461,8 @@ func testMergeAndDrop(t *testing.T, docsToDrop []*roaring.Bitmap) {
_ = os.RemoveAll("/tmp/scorch.zap")
_ = os.RemoveAll("/tmp/scorch2.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
testSeg, _ := buildTestSegmentMulti()
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatal(err)
}
@ -478,8 +477,8 @@ func testMergeAndDrop(t *testing.T, docsToDrop []*roaring.Bitmap) {
}
}()
memSegment2 := buildMemSegmentMulti2()
err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024)
testSeg2, _ := buildTestSegmentMulti2()
err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap")
if err != nil {
t.Fatal(err)
}
@ -565,8 +564,8 @@ func testMergeWithUpdates(t *testing.T, segmentDocIds [][]string, docsToDrop []*
_ = os.RemoveAll("/tmp/" + fname)
memSegment := buildMemSegmentMultiHelper(docIds)
err := PersistSegment(memSegment, "/tmp/"+fname, 1024)
testSeg, _ := buildTestSegmentMultiHelper(docIds)
err := PersistSegmentBase(testSeg, "/tmp/"+fname)
if err != nil {
t.Fatal(err)
}
@ -616,11 +615,11 @@ func testMergeAndDropSegments(t *testing.T, segsToMerge []*Segment, docsToDrop [
testMergeWithSelf(t, segm.(*Segment), expectedNumDocs)
}
func buildMemSegmentMulti2() *mem.Segment {
return buildMemSegmentMultiHelper([]string{"c", "d"})
func buildTestSegmentMulti2() (*SegmentBase, error) {
return buildTestSegmentMultiHelper([]string{"c", "d"})
}
func buildMemSegmentMultiHelper(docIds []string) *mem.Segment {
func buildTestSegmentMultiHelper(docIds []string) (*SegmentBase, error) {
doc := &document.Document{
ID: "c",
Fields: []document.Field{
@ -778,9 +777,7 @@ func buildMemSegmentMultiHelper(docIds []string) *mem.Segment {
}
}
segment := mem.NewFromAnalyzedDocs(results)
return segment
return AnalysisResultsToSegmentBase(results, 1024)
}
func TestMergeBytesWritten(t *testing.T) {
@ -788,14 +785,14 @@ func TestMergeBytesWritten(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch2.zap")
_ = os.RemoveAll("/tmp/scorch3.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
testSeg, _ := buildTestSegmentMulti()
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatal(err)
}
memSegment2 := buildMemSegmentMulti2()
err = PersistSegment(memSegment2, "/tmp/scorch2.zap", 1024)
testSeg2, _ := buildTestSegmentMulti2()
err = PersistSegmentBase(testSeg2, "/tmp/scorch2.zap")
if err != nil {
t.Fatal(err)
}

View File

@ -0,0 +1,659 @@
// Copyright (c) 2018 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package zap
import (
"bytes"
"encoding/binary"
"math"
"sort"
"github.com/RoaringBitmap/roaring"
"github.com/Smerity/govarint"
"github.com/blevesearch/bleve/analysis"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/couchbase/vellum"
"github.com/golang/snappy"
)
// AnalysisResultsToSegmentBase produces an in-memory zap-encoded
// SegmentBase from analysis results
func AnalysisResultsToSegmentBase(results []*index.AnalysisResult,
chunkFactor uint32) (*SegmentBase, error) {
var br bytes.Buffer
s := interim{
results: results,
chunkFactor: chunkFactor,
w: NewCountHashWriter(&br),
FieldsMap: map[string]uint16{},
}
storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets,
err := s.convert()
if err != nil {
return nil, err
}
sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkFactor,
s.FieldsMap, s.FieldsInv, uint64(len(results)),
storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets)
return sb, err
}
// interim holds temporary working data used while converting from
// analysis results to a zap-encoded segment
type interim struct {
results []*index.AnalysisResult
chunkFactor uint32
w *CountHashWriter
// FieldsMap adds 1 to field id to avoid zero value issues
// name -> field id + 1
FieldsMap map[string]uint16
// FieldsInv is the inverse of FieldsMap
// field id -> name
FieldsInv []string
// Term dictionaries for each field
// field id -> term -> postings list id + 1
Dicts []map[string]uint64
// Terms for each field, where terms are sorted ascending
// field id -> []term
DictKeys [][]string
// Fields whose IncludeDocValues is true
// field id -> bool
IncludeDocValues []bool
// postings id -> bitmap of docNums
Postings []*roaring.Bitmap
// postings id -> bitmap of docNums that have locations
PostingsLocs []*roaring.Bitmap
// postings id -> freq/norm's, one for each docNum in postings
FreqNorms [][]interimFreqNorm
// postings id -> locs, one for each freq
Locs [][]interimLoc
buf0 bytes.Buffer
tmp0 []byte
tmp1 []byte
}
func (s *interim) grabBuf(size int) []byte {
buf := s.tmp0
if cap(buf) < size {
buf = make([]byte, size)
s.tmp0 = buf
}
return buf[0:size]
}
type interimStoredField struct {
vals [][]byte
typs []byte
arrayposs [][]uint64 // array positions
}
type interimFreqNorm struct {
freq uint64
norm float32
}
type interimLoc struct {
fieldID uint16
pos uint64
start uint64
end uint64
arrayposs []uint64
}
func (s *interim) convert() (uint64, uint64, uint64, []uint64, error) {
s.getOrDefineField("_id") // _id field is fieldID 0
for _, result := range s.results {
for _, field := range result.Document.CompositeFields {
s.getOrDefineField(field.Name())
}
for _, field := range result.Document.Fields {
s.getOrDefineField(field.Name())
}
}
sort.Strings(s.FieldsInv[1:]) // keep _id as first field
s.FieldsMap = make(map[string]uint16, len(s.FieldsInv))
for fieldID, fieldName := range s.FieldsInv {
s.FieldsMap[fieldName] = uint16(fieldID + 1)
}
s.IncludeDocValues = make([]bool, len(s.FieldsInv))
s.prepareDicts()
for _, dict := range s.DictKeys {
sort.Strings(dict)
}
s.processDocuments()
storedIndexOffset, err := s.writeStoredFields()
if err != nil {
return 0, 0, 0, nil, err
}
var fdvIndexOffset uint64
var dictOffsets []uint64
if len(s.results) > 0 {
fdvIndexOffset, dictOffsets, err = s.writeDicts()
if err != nil {
return 0, 0, 0, nil, err
}
} else {
dictOffsets = make([]uint64, len(s.FieldsInv))
}
fieldsIndexOffset, err := persistFields(s.FieldsInv, s.w, dictOffsets)
if err != nil {
return 0, 0, 0, nil, err
}
return storedIndexOffset, fieldsIndexOffset, fdvIndexOffset, dictOffsets, nil
}
func (s *interim) getOrDefineField(fieldName string) int {
fieldIDPlus1, exists := s.FieldsMap[fieldName]
if !exists {
fieldIDPlus1 = uint16(len(s.FieldsInv) + 1)
s.FieldsMap[fieldName] = fieldIDPlus1
s.FieldsInv = append(s.FieldsInv, fieldName)
s.Dicts = append(s.Dicts, make(map[string]uint64))
s.DictKeys = append(s.DictKeys, make([]string, 0))
}
return int(fieldIDPlus1 - 1)
}
// fill Dicts and DictKeys from analysis results
func (s *interim) prepareDicts() {
var pidNext int
numTermsPerPostingsList := make([]int, 0, 64) // key is postings list id
numLocsPerPostingsList := make([]int, 0, 64) // key is postings list id
var totTFs int
var totLocs int
visitField := func(fieldID uint16, tfs analysis.TokenFrequencies) {
dict := s.Dicts[fieldID]
dictKeys := s.DictKeys[fieldID]
for term, tf := range tfs {
pidPlus1, exists := dict[term]
if !exists {
pidNext++
pidPlus1 = uint64(pidNext)
dict[term] = pidPlus1
dictKeys = append(dictKeys, term)
numTermsPerPostingsList = append(numTermsPerPostingsList, 0)
numLocsPerPostingsList = append(numLocsPerPostingsList, 0)
}
pid := pidPlus1 - 1
numTermsPerPostingsList[pid] += 1
numLocsPerPostingsList[pid] += len(tf.Locations)
totLocs += len(tf.Locations)
}
totTFs += len(tfs)
s.DictKeys[fieldID] = dictKeys
}
for _, result := range s.results {
// walk each composite field
for _, field := range result.Document.CompositeFields {
fieldID := uint16(s.getOrDefineField(field.Name()))
_, tf := field.Analyze()
visitField(fieldID, tf)
}
// walk each field
for i, field := range result.Document.Fields {
fieldID := uint16(s.getOrDefineField(field.Name()))
tf := result.Analyzed[i]
visitField(fieldID, tf)
}
}
numPostingsLists := pidNext
s.Postings = make([]*roaring.Bitmap, numPostingsLists)
for i := 0; i < numPostingsLists; i++ {
s.Postings[i] = roaring.New()
}
s.PostingsLocs = make([]*roaring.Bitmap, numPostingsLists)
for i := 0; i < numPostingsLists; i++ {
s.PostingsLocs[i] = roaring.New()
}
s.FreqNorms = make([][]interimFreqNorm, numPostingsLists)
freqNormsBacking := make([]interimFreqNorm, totTFs)
for pid, numTerms := range numTermsPerPostingsList {
s.FreqNorms[pid] = freqNormsBacking[0:0]
freqNormsBacking = freqNormsBacking[numTerms:]
}
s.Locs = make([][]interimLoc, numPostingsLists)
locsBacking := make([]interimLoc, totLocs)
for pid, numLocs := range numLocsPerPostingsList {
s.Locs[pid] = locsBacking[0:0]
locsBacking = locsBacking[numLocs:]
}
}
func (s *interim) processDocuments() {
numFields := len(s.FieldsInv)
reuseFieldLens := make([]int, numFields)
reuseFieldTFs := make([]analysis.TokenFrequencies, numFields)
for docNum, result := range s.results {
for i := 0; i < numFields; i++ { // clear these for reuse
reuseFieldLens[i] = 0
reuseFieldTFs[i] = nil
}
s.processDocument(uint64(docNum), result,
reuseFieldLens, reuseFieldTFs)
}
}
func (s *interim) processDocument(docNum uint64,
result *index.AnalysisResult,
fieldLens []int, fieldTFs []analysis.TokenFrequencies) {
visitField := func(fieldID uint16, fieldName string,
ln int, tf analysis.TokenFrequencies) {
fieldLens[fieldID] += ln
existingFreqs := fieldTFs[fieldID]
if existingFreqs != nil {
existingFreqs.MergeAll(fieldName, tf)
} else {
fieldTFs[fieldID] = tf
}
}
// walk each composite field
for _, field := range result.Document.CompositeFields {
fieldID := uint16(s.getOrDefineField(field.Name()))
ln, tf := field.Analyze()
visitField(fieldID, field.Name(), ln, tf)
}
// walk each field
for i, field := range result.Document.Fields {
fieldID := uint16(s.getOrDefineField(field.Name()))
ln := result.Length[i]
tf := result.Analyzed[i]
visitField(fieldID, field.Name(), ln, tf)
}
// now that it's been rolled up into fieldTFs, walk that
for fieldID, tfs := range fieldTFs {
dict := s.Dicts[fieldID]
norm := float32(1.0 / math.Sqrt(float64(fieldLens[fieldID])))
for term, tf := range tfs {
pid := dict[term] - 1
bs := s.Postings[pid]
bs.AddInt(int(docNum))
s.FreqNorms[pid] = append(s.FreqNorms[pid],
interimFreqNorm{
freq: uint64(tf.Frequency()),
norm: norm,
})
if len(tf.Locations) > 0 {
locBS := s.PostingsLocs[pid]
locBS.AddInt(int(docNum))
locs := s.Locs[pid]
for _, loc := range tf.Locations {
var locf = uint16(fieldID)
if loc.Field != "" {
locf = uint16(s.getOrDefineField(loc.Field))
}
var arrayposs []uint64
if len(loc.ArrayPositions) > 0 {
arrayposs = loc.ArrayPositions
}
locs = append(locs, interimLoc{
fieldID: locf,
pos: uint64(loc.Position),
start: uint64(loc.Start),
end: uint64(loc.End),
arrayposs: arrayposs,
})
}
s.Locs[pid] = locs
}
}
}
}
func (s *interim) writeStoredFields() (
storedIndexOffset uint64, err error) {
metaBuf := &s.buf0
metaEncoder := govarint.NewU64Base128Encoder(metaBuf)
data, compressed := s.tmp0[:0], s.tmp1[:0]
defer func() { s.tmp0, s.tmp1 = data, compressed }()
// keyed by docNum
docStoredOffsets := make([]uint64, len(s.results))
// keyed by fieldID, for the current doc in the loop
docStoredFields := map[uint16]interimStoredField{}
for docNum, result := range s.results {
for fieldID := range docStoredFields { // reset for next doc
delete(docStoredFields, fieldID)
}
for _, field := range result.Document.Fields {
fieldID := uint16(s.getOrDefineField(field.Name()))
opts := field.Options()
if opts.IsStored() {
isf := docStoredFields[fieldID]
isf.vals = append(isf.vals, field.Value())
isf.typs = append(isf.typs, encodeFieldType(field))
isf.arrayposs = append(isf.arrayposs, field.ArrayPositions())
docStoredFields[fieldID] = isf
}
if opts.IncludeDocValues() {
s.IncludeDocValues[fieldID] = true
}
}
var curr int
metaBuf.Reset()
data = data[:0]
compressed = compressed[:0]
for fieldID := range s.FieldsInv {
isf, exists := docStoredFields[uint16(fieldID)]
if exists {
curr, data, err = persistStoredFieldValues(
fieldID, isf.vals, isf.typs, isf.arrayposs,
curr, metaEncoder, data)
if err != nil {
return 0, err
}
}
}
metaEncoder.Close()
metaBytes := metaBuf.Bytes()
compressed = snappy.Encode(compressed, data)
docStoredOffsets[docNum] = uint64(s.w.Count())
_, err := writeUvarints(s.w,
uint64(len(metaBytes)),
uint64(len(compressed)))
if err != nil {
return 0, err
}
_, err = s.w.Write(metaBytes)
if err != nil {
return 0, err
}
_, err = s.w.Write(compressed)
if err != nil {
return 0, err
}
}
storedIndexOffset = uint64(s.w.Count())
for _, docStoredOffset := range docStoredOffsets {
err = binary.Write(s.w, binary.BigEndian, docStoredOffset)
if err != nil {
return 0, err
}
}
return storedIndexOffset, nil
}
func (s *interim) writeDicts() (uint64, []uint64, error) {
dictOffsets := make([]uint64, len(s.FieldsInv))
fdvOffsets := make([]uint64, len(s.FieldsInv))
buf := s.grabBuf(binary.MaxVarintLen64)
tfEncoder := newChunkedIntCoder(uint64(s.chunkFactor), uint64(len(s.results)-1))
locEncoder := newChunkedIntCoder(uint64(s.chunkFactor), uint64(len(s.results)-1))
fdvEncoder := newChunkedContentCoder(uint64(s.chunkFactor), uint64(len(s.results)-1))
var docTermMap [][]byte
s.buf0.Reset()
builder, err := vellum.New(&s.buf0, nil)
if err != nil {
return 0, nil, err
}
for fieldID, terms := range s.DictKeys {
if cap(docTermMap) < len(s.results) {
docTermMap = make([][]byte, len(s.results))
} else {
docTermMap = docTermMap[0:len(s.results)]
for docNum := range docTermMap { // reset the docTermMap
docTermMap[docNum] = docTermMap[docNum][:0]
}
}
dict := s.Dicts[fieldID]
for _, term := range terms { // terms are already sorted
pid := dict[term] - 1
postingsBS := s.Postings[pid]
postingsLocsBS := s.PostingsLocs[pid]
freqNorms := s.FreqNorms[pid]
freqNormOffset := 0
locs := s.Locs[pid]
locOffset := 0
postingsItr := postingsBS.Iterator()
for postingsItr.HasNext() {
docNum := uint64(postingsItr.Next())
freqNorm := freqNorms[freqNormOffset]
err = tfEncoder.Add(docNum, freqNorm.freq,
uint64(math.Float32bits(freqNorm.norm)))
if err != nil {
return 0, nil, err
}
for i := uint64(0); i < freqNorm.freq; i++ {
if len(locs) > 0 {
loc := locs[locOffset]
err = locEncoder.Add(docNum, uint64(loc.fieldID),
loc.pos, loc.start, loc.end,
uint64(len(loc.arrayposs)))
if err != nil {
return 0, nil, err
}
err = locEncoder.Add(docNum, loc.arrayposs...)
if err != nil {
return 0, nil, err
}
}
locOffset++
}
freqNormOffset++
docTermMap[docNum] = append(
append(docTermMap[docNum], term...),
termSeparator)
}
tfEncoder.Close()
locEncoder.Close()
postingsOffset, err := writePostings(
postingsBS, postingsLocsBS, tfEncoder, locEncoder,
nil, s.w, buf)
if err != nil {
return 0, nil, err
}
if postingsOffset > uint64(0) {
err = builder.Insert([]byte(term), postingsOffset)
if err != nil {
return 0, nil, err
}
}
tfEncoder.Reset()
locEncoder.Reset()
}
err = builder.Close()
if err != nil {
return 0, nil, err
}
// record where this dictionary starts
dictOffsets[fieldID] = uint64(s.w.Count())
vellumData := s.buf0.Bytes()
// write out the length of the vellum data
n := binary.PutUvarint(buf, uint64(len(vellumData)))
_, err = s.w.Write(buf[:n])
if err != nil {
return 0, nil, err
}
// write this vellum to disk
_, err = s.w.Write(vellumData)
if err != nil {
return 0, nil, err
}
// reset vellum for reuse
s.buf0.Reset()
err = builder.Reset(&s.buf0)
if err != nil {
return 0, nil, err
}
// write the field doc values
if s.IncludeDocValues[fieldID] {
for docNum, docTerms := range docTermMap {
if len(docTerms) > 0 {
err = fdvEncoder.Add(uint64(docNum), docTerms)
if err != nil {
return 0, nil, err
}
}
}
err = fdvEncoder.Close()
if err != nil {
return 0, nil, err
}
fdvOffsets[fieldID] = uint64(s.w.Count())
_, err = fdvEncoder.Write(s.w)
if err != nil {
return 0, nil, err
}
fdvEncoder.Reset()
} else {
fdvOffsets[fieldID] = fieldNotUninverted
}
}
fdvIndexOffset := uint64(s.w.Count())
for _, fdvOffset := range fdvOffsets {
n := binary.PutUvarint(buf, fdvOffset)
_, err := s.w.Write(buf[:n])
if err != nil {
return 0, nil, err
}
}
return fdvIndexOffset, dictOffsets, nil
}
func encodeFieldType(f document.Field) byte {
fieldType := byte('x')
switch f.(type) {
case *document.TextField:
fieldType = 't'
case *document.NumericField:
fieldType = 'n'
case *document.DateTimeField:
fieldType = 'd'
case *document.BooleanField:
fieldType = 'b'
case *document.GeoPointField:
fieldType = 'g'
case *document.CompositeField:
fieldType = 'c'
}
return fieldType
}

View File

@ -28,8 +28,8 @@ import (
func TestOpen(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment := buildMemSegment()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
testSeg, _ := buildTestSegment()
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}
@ -328,8 +328,8 @@ func TestOpen(t *testing.T) {
func TestOpenMulti(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1024)
testSeg, _ := buildTestSegmentMulti()
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}
@ -428,8 +428,8 @@ func TestOpenMulti(t *testing.T) {
func TestOpenMultiWithTwoChunks(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1)
testSeg, _ := buildTestSegmentMultiWithChunkFactor(1)
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}
@ -523,8 +523,8 @@ func TestOpenMultiWithTwoChunks(t *testing.T) {
func TestSegmentVisitableDocValueFieldsList(t *testing.T) {
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment := buildMemSegmentMulti()
err := PersistSegment(memSegment, "/tmp/scorch.zap", 1)
testSeg, _ := buildTestSegmentMultiWithChunkFactor(1)
err := PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}
@ -551,8 +551,8 @@ func TestSegmentVisitableDocValueFieldsList(t *testing.T) {
}
_ = os.RemoveAll("/tmp/scorch.zap")
memSegment, expectedFields := buildMemSegmentWithDefaultFieldMapping()
err = PersistSegment(memSegment, "/tmp/scorch.zap", 1)
testSeg, expectedFields, _ := buildTestSegmentWithDefaultFieldMapping(1)
err = PersistSegmentBase(testSeg, "/tmp/scorch.zap")
if err != nil {
t.Fatalf("error persisting segment: %v", err)
}