0
0
Fork 0
bleve/index/scorch/segment/bolt/build.go

519 lines
12 KiB
Go

// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bolt
import (
"bytes"
"encoding/binary"
"math"
"github.com/RoaringBitmap/roaring"
"github.com/Smerity/govarint"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
"github.com/boltdb/bolt"
"github.com/couchbaselabs/vellum"
"github.com/golang/snappy"
)
var fieldsBucket = []byte{'a'}
var dictBucket = []byte{'b'}
var postingsBucket = []byte{'c'}
var postingDetailsBucket = []byte{'d'}
var storedBucket = []byte{'e'}
var configBucket = []byte{'x'}
var indexLocsKey = []byte{'l'}
var freqNormKey = []byte{'a'}
var locKey = []byte{'b'}
var metaKey = []byte{'a'}
var dataKey = []byte{'b'}
var chunkKey = []byte{'c'}
var versionKey = []byte{'v'}
var version = 0
func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (err error) {
db, err := bolt.Open(path, 0777, nil)
if err != nil {
return err
}
defer func() {
if cerr := db.Close(); err == nil && cerr != nil {
err = cerr
}
}()
tx, err := db.Begin(true)
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
} else {
_ = tx.Rollback()
}
}()
err = persistFields(memSegment, tx)
if err != nil {
return err
}
err = persistDictionary(memSegment, tx)
if err != nil {
return err
}
err = persistPostings(memSegment, tx)
if err != nil {
return err
}
err = persistPostingsDetails(memSegment, tx, chunkFactor)
if err != nil {
return err
}
err = persistStored(memSegment, tx)
if err != nil {
return err
}
err = persistConfig(tx, chunkFactor)
if err != nil {
return err
}
return nil
}
// persistFields puts the fields as separate k/v pairs in the fields bucket
// makes very little attempt to squeeze a lot of perf because it is expected
// this is usually somewhat small, and when re-opened it will be read once and
// kept on the heap, and not read out of the file subsequently
func persistFields(memSegment *mem.Segment, tx *bolt.Tx) error {
bucket, err := tx.CreateBucket(fieldsBucket)
if err != nil {
return err
}
bucket.FillPercent = 1.0
// build/persist a bitset corresponding to the field locs array
indexLocs := roaring.NewBitmap()
for i, indexLoc := range memSegment.FieldsLoc {
if indexLoc {
indexLocs.AddInt(i)
}
}
var indexLocsBuffer bytes.Buffer
_, err = indexLocs.WriteTo(&indexLocsBuffer)
if err != nil {
return err
}
err = bucket.Put(indexLocsKey, indexLocsBuffer.Bytes())
if err != nil {
return err
}
// we use special varint which is still guaranteed to sort correctly
fieldBuf := make([]byte, 0, segment.MaxVarintSize)
for fieldID, fieldName := range memSegment.FieldsInv {
if fieldID != 0 {
// reset buffer if necessary
fieldBuf = fieldBuf[:0]
}
fieldBuf = segment.EncodeUvarintAscending(fieldBuf, uint64(fieldID))
err = bucket.Put(fieldBuf, []byte(fieldName))
if err != nil {
return err
}
}
return nil
}
func persistDictionary(memSegment *mem.Segment, tx *bolt.Tx) error {
bucket, err := tx.CreateBucket(dictBucket)
if err != nil {
return err
}
bucket.FillPercent = 1.0
// TODO consider whether or not there is benefit to building the vellums
// concurrently. While we have to insert them into the bolt in order,
// the (presumably) heavier lifting involved in building the FST could
// be done concurrently.
fieldBuf := make([]byte, 0, segment.MaxVarintSize)
for fieldID, fieldTerms := range memSegment.DictKeys {
if fieldID != 0 {
// reset buffers if necessary
fieldBuf = fieldBuf[:0]
}
// start a new vellum for this field
var buffer bytes.Buffer
builder, err := vellum.New(&buffer, nil)
if err != nil {
return err
}
dict := memSegment.Dicts[fieldID]
// now walk the dictionary in order of fieldTerms (already sorted)
for i := range fieldTerms {
err = builder.Insert([]byte(fieldTerms[i]), dict[fieldTerms[i]]-1)
if err != nil {
return err
}
}
err = builder.Close()
if err != nil {
return err
}
// put this FST into bolt
// we use special varint which is still guaranteed to sort correctly
fieldBuf = segment.EncodeUvarintAscending(fieldBuf, uint64(fieldID))
err = bucket.Put(fieldBuf, buffer.Bytes())
if err != nil {
return err
}
}
return nil
}
func persistPostings(memSegment *mem.Segment, tx *bolt.Tx) error {
bucket, err := tx.CreateBucket(postingsBucket)
if err != nil {
return err
}
bucket.FillPercent = 1.0
postingIDBuf := make([]byte, 0, segment.MaxVarintSize)
for postingID := range memSegment.Postings {
if postingID != 0 {
// reset buffers if necessary
postingIDBuf = postingIDBuf[:0]
}
postingIDBuf = segment.EncodeUvarintAscending(postingIDBuf, uint64(postingID))
var postingsBuf bytes.Buffer
_, err := memSegment.Postings[postingID].WriteTo(&postingsBuf)
if err != nil {
return err
}
err = bucket.Put(postingIDBuf, postingsBuf.Bytes())
if err != nil {
return err
}
}
return nil
}
func persistPostingsDetails(memSegment *mem.Segment, tx *bolt.Tx,
chunkFactor uint32) error {
bucket, err := tx.CreateBucket(postingDetailsBucket)
if err != nil {
return err
}
bucket.FillPercent = 1.0
postingIDBuf := make([]byte, 0, segment.MaxVarintSize)
for postingID := range memSegment.Postings {
if postingID != 0 {
// reset buffers if necessary
postingIDBuf = postingIDBuf[:0]
}
postingIDBuf = segment.EncodeUvarintAscending(postingIDBuf, uint64(postingID))
// make bucket for posting details
postingBucket, err := bucket.CreateBucket(postingIDBuf)
if err != nil {
return err
}
postingBucket.FillPercent = 1.0
err = persistPostingDetails(memSegment, postingBucket, postingID, chunkFactor)
if err != nil {
return err
}
}
return nil
}
func persistPostingDetails(memSegment *mem.Segment, postingBucket *bolt.Bucket,
postingID int, chunkFactor uint32) error {
// walk the postings list
var err error
var chunkBucket *bolt.Bucket
var currChunk uint32
chunkIDBuf := make([]byte, 0, segment.MaxVarintSize)
postingsListItr := memSegment.Postings[postingID].Iterator()
var encoder *govarint.Base128Encoder
var locEncoder *govarint.Base128Encoder
encodingBuf := &bytes.Buffer{}
locEncodingBuf := &bytes.Buffer{}
var offset int
var locOffset int
for postingsListItr.HasNext() {
docNum := postingsListItr.Next()
chunk := docNum / chunkFactor
// create new chunk bucket if necessary
if chunkBucket == nil || currChunk != chunk {
// close out last chunk
if chunkBucket != nil {
// fix me write freq/norms
encoder.Close()
err = chunkBucket.Put(freqNormKey, encodingBuf.Bytes())
if err != nil {
return err
}
locEncoder.Close()
err = chunkBucket.Put(locKey, locEncodingBuf.Bytes())
if err != nil {
return err
}
// reset for next
chunkIDBuf = chunkIDBuf[:0]
encodingBuf = &bytes.Buffer{}
locEncodingBuf = &bytes.Buffer{}
}
// prepare next chunk
chunkIDBuf = segment.EncodeUvarintAscending(chunkIDBuf, uint64(chunk))
chunkBucket, err = postingBucket.CreateBucket(chunkIDBuf)
if err != nil {
return err
}
chunkBucket.FillPercent = 1.0
currChunk = chunk
encoder = govarint.NewU64Base128Encoder(encodingBuf)
locEncoder = govarint.NewU64Base128Encoder(locEncodingBuf)
}
// put freq
_, err = encoder.PutU64(memSegment.Freqs[postingID][offset])
if err != nil {
return err
}
// put norm
norm := memSegment.Norms[postingID][offset]
normBits := math.Float32bits(norm)
_, err = encoder.PutU32(normBits)
if err != nil {
return err
}
// put locations
for i := 0; i < int(memSegment.Freqs[postingID][offset]); i++ {
if len(memSegment.Locfields[postingID]) > 0 {
// put field
_, err = locEncoder.PutU64(uint64(memSegment.Locfields[postingID][locOffset]))
if err != nil {
return err
}
// put pos
_, err = locEncoder.PutU64(memSegment.Locpos[postingID][locOffset])
if err != nil {
return err
}
// put start
_, err = locEncoder.PutU64(memSegment.Locstarts[postingID][locOffset])
if err != nil {
return err
}
// put end
_, err = locEncoder.PutU64(memSegment.Locends[postingID][locOffset])
if err != nil {
return err
}
// put array positions
num := len(memSegment.Locarraypos[postingID][locOffset])
// put the number of array positions to follow
_, err = locEncoder.PutU64(uint64(num))
if err != nil {
return err
}
// put each array position
for j := 0; j < num; j++ {
_, err = locEncoder.PutU64(memSegment.Locarraypos[postingID][locOffset][j])
if err != nil {
return err
}
}
}
locOffset++
}
offset++
}
// close out last chunk
if chunkBucket != nil {
// fix me write freq/norms
encoder.Close()
err = chunkBucket.Put(freqNormKey, encodingBuf.Bytes())
if err != nil {
return err
}
locEncoder.Close()
err = chunkBucket.Put(locKey, locEncodingBuf.Bytes())
if err != nil {
return err
}
}
return nil
}
func persistStored(memSegment *mem.Segment, tx *bolt.Tx) error {
bucket, err := tx.CreateBucket(storedBucket)
if err != nil {
return err
}
bucket.FillPercent = 1.0
var curr int
// we use special varint which is still guaranteed to sort correctly
docNumBuf := make([]byte, 0, segment.MaxVarintSize)
for docNum, storedValues := range memSegment.Stored {
var metaBuf bytes.Buffer
var data, compressed []byte
if docNum != 0 {
// reset buffer if necessary
docNumBuf = docNumBuf[:0]
curr = 0
}
// create doc sub-bucket
docNumBuf = segment.EncodeUvarintAscending(docNumBuf, uint64(docNum))
docBucket, err := bucket.CreateBucket(docNumBuf)
if err != nil {
return err
}
docBucket.FillPercent = 1.0
metaEncoder := govarint.NewU64Base128Encoder(&metaBuf)
// encode fields in order
for fieldID := range memSegment.FieldsInv {
if storedFieldValues, ok := storedValues[uint16(fieldID)]; ok {
// has stored values for this field
num := len(storedFieldValues)
// process each value
for i := 0; i < num; i++ {
// encode field
_, err2 := metaEncoder.PutU64(uint64(fieldID))
if err2 != nil {
return err2
}
// encode type
_, err2 = metaEncoder.PutU64(uint64(memSegment.StoredTypes[docNum][uint16(fieldID)][i]))
if err2 != nil {
return err2
}
// encode start offset
_, err2 = metaEncoder.PutU64(uint64(curr))
if err2 != nil {
return err2
}
// end len
_, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i])))
if err2 != nil {
return err2
}
// encode number of array pos
_, err2 = metaEncoder.PutU64(uint64(len(memSegment.StoredPos[docNum][uint16(fieldID)][i])))
if err2 != nil {
return err2
}
// encode all array positions
for j := 0; j < len(memSegment.StoredPos[docNum][uint16(fieldID)][i]); j++ {
_, err2 = metaEncoder.PutU64(memSegment.StoredPos[docNum][uint16(fieldID)][i][j])
if err2 != nil {
return err2
}
}
// append data
data = append(data, storedFieldValues[i]...)
// update curr
curr += len(storedFieldValues[i])
}
}
}
metaEncoder.Close()
err = docBucket.Put(metaKey, metaBuf.Bytes())
if err != nil {
return err
}
// compress data
compressed = snappy.Encode(compressed, data)
err = docBucket.Put(dataKey, compressed)
if err != nil {
return err
}
}
return nil
}
func persistConfig(tx *bolt.Tx, chunkFactor uint32) error {
bucket, err := tx.CreateBucket(configBucket)
if err != nil {
return err
}
chunkVal := make([]byte, 4)
binary.BigEndian.PutUint32(chunkVal, chunkFactor)
err = bucket.Put(chunkKey, chunkVal)
if err != nil {
return err
}
err = bucket.Put(versionKey, []byte{byte(version)})
if err != nil {
return err
}
return nil
}