docValue persist changes
docValues are persisted along with the index, in a columnar fashion per field with variable sized chunking for quick look up. -naive chunk level caching is added per field -data part inside a chunk is snappy compressed -metaHeader inside the chunk index the dv values inside the uncompressed data part -all the fields are docValue persisted in this iteration
This commit is contained in:
parent
7afeb1ae1d
commit
76f827f469
|
@ -37,6 +37,14 @@ const Name = "scorch"
|
|||
|
||||
const Version uint8 = 1
|
||||
|
||||
// UnInvertIndex is implemented by various scorch index implementations
|
||||
// to provide the un inverting of the postings or other indexed values.
|
||||
type UnInvertIndex interface {
|
||||
// apparently need better namings here..
|
||||
VisitDocumentFieldTerms(localDocNum uint64, fields []string,
|
||||
visitor index.DocumentFieldTermVisitor) error
|
||||
}
|
||||
|
||||
type Scorch struct {
|
||||
readOnly bool
|
||||
version uint8
|
||||
|
|
|
@ -116,6 +116,10 @@ func (s *Segment) processDocument(result *index.AnalysisResult) {
|
|||
if field.Options().IsStored() {
|
||||
storeField(docNum, fieldID, encodeFieldType(field), field.Value(), field.ArrayPositions())
|
||||
}
|
||||
// TODO with mapping changes for dv
|
||||
//if field.Options().IncludeDocValues() {
|
||||
s.DocValueFields[fieldID] = true
|
||||
//}
|
||||
}
|
||||
|
||||
// now that its been rolled up into docMap, walk that
|
||||
|
|
|
@ -87,12 +87,17 @@ type Segment struct {
|
|||
// stored field array positions
|
||||
// docNum -> field id -> slice of array positions (each is []uint64)
|
||||
StoredPos []map[uint16][][]uint64
|
||||
|
||||
// for marking the docValue override status
|
||||
// field id -> status
|
||||
DocValueFields map[uint16]bool
|
||||
}
|
||||
|
||||
// New builds a new empty Segment
|
||||
func New() *Segment {
|
||||
return &Segment{
|
||||
FieldsMap: map[string]uint16{},
|
||||
FieldsMap: map[string]uint16{},
|
||||
DocValueFields: map[uint16]bool{},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"encoding/binary"
|
||||
"math"
|
||||
"os"
|
||||
"sort"
|
||||
|
||||
"github.com/Smerity/govarint"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment/mem"
|
||||
|
@ -48,6 +49,7 @@ func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (e
|
|||
|
||||
var storedIndexOffset uint64
|
||||
var dictLocs []uint64
|
||||
var docValueOffset uint64
|
||||
if len(memSegment.Stored) > 0 {
|
||||
|
||||
storedIndexOffset, err = persistStored(memSegment, cr)
|
||||
|
@ -78,6 +80,11 @@ func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (e
|
|||
return err
|
||||
}
|
||||
|
||||
docValueOffset, err = persistFieldDocValues(cr, chunkFactor, memSegment)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
} else {
|
||||
dictLocs = make([]uint64, len(memSegment.FieldsInv))
|
||||
}
|
||||
|
@ -89,7 +96,7 @@ func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (e
|
|||
}
|
||||
|
||||
err = persistFooter(uint64(len(memSegment.Stored)), storedIndexOffset,
|
||||
fieldIndexStart, chunkFactor, cr)
|
||||
fieldIndexStart, docValueOffset, chunkFactor, cr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -419,3 +426,104 @@ func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs
|
|||
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
type docIDRange []uint64
|
||||
|
||||
func (a docIDRange) Len() int { return len(a) }
|
||||
func (a docIDRange) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a docIDRange) Less(i, j int) bool { return a[i] < a[j] }
|
||||
|
||||
func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
|
||||
chunkFactor uint32) (map[uint16]uint64, error) {
|
||||
fieldChunkOffsets := make(map[uint16]uint64, len(memSegment.DocValueFields))
|
||||
fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
|
||||
|
||||
for fieldID := range memSegment.DocValueFields {
|
||||
field := memSegment.FieldsInv[fieldID]
|
||||
docTermMap := make(map[uint64][]byte, 0)
|
||||
dict, err := memSegment.Dictionary(field)
|
||||
if err != nil {
|
||||
return fieldChunkOffsets, err
|
||||
}
|
||||
|
||||
dictItr := dict.Iterator()
|
||||
next, err := dictItr.Next()
|
||||
for err == nil && next != nil {
|
||||
postings, err1 := dict.PostingsList(next.Term, nil)
|
||||
if err1 != nil {
|
||||
return fieldChunkOffsets, err
|
||||
}
|
||||
|
||||
postingsItr := postings.Iterator()
|
||||
nextPosting, err2 := postingsItr.Next()
|
||||
for err2 == nil && nextPosting != nil {
|
||||
docNum := nextPosting.Number()
|
||||
docTermMap[docNum] = append(docTermMap[docNum], []byte(next.Term)...)
|
||||
docTermMap[docNum] = append(docTermMap[docNum], termSeparator)
|
||||
nextPosting, err2 = postingsItr.Next()
|
||||
}
|
||||
if err2 != nil {
|
||||
return fieldChunkOffsets, err2
|
||||
}
|
||||
|
||||
next, err = dictItr.Next()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fieldChunkOffsets, err
|
||||
}
|
||||
// sort wrt to docIDs
|
||||
var docNumbers docIDRange
|
||||
for k := range docTermMap {
|
||||
docNumbers = append(docNumbers, k)
|
||||
}
|
||||
sort.Sort(docNumbers)
|
||||
|
||||
for _, docNum := range docNumbers {
|
||||
err = fdvEncoder.Add(docNum, docTermMap[docNum])
|
||||
if err != nil {
|
||||
return fieldChunkOffsets, err
|
||||
}
|
||||
}
|
||||
|
||||
fieldChunkOffsets[fieldID] = uint64(w.Count())
|
||||
fdvEncoder.Close()
|
||||
// persist the doc value details for this field
|
||||
_, err = fdvEncoder.Write(w)
|
||||
if err != nil {
|
||||
return fieldChunkOffsets, err
|
||||
}
|
||||
// resetting encoder for the next field
|
||||
fdvEncoder.Reset()
|
||||
}
|
||||
|
||||
return fieldChunkOffsets, nil
|
||||
}
|
||||
|
||||
func persistFieldDocValues(w *CountHashWriter, chunkFactor uint32,
|
||||
memSegment *mem.Segment) (uint64, error) {
|
||||
|
||||
fieldDvOffsets, err := persistDocValues(memSegment, w, chunkFactor)
|
||||
if err != nil {
|
||||
return math.MaxUint64, err
|
||||
}
|
||||
|
||||
fieldDocValuesOffset := uint64(w.Count())
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
offset := uint64(math.MaxUint64)
|
||||
ok := true
|
||||
for fieldID := range memSegment.FieldsInv {
|
||||
// if the field isn't configured for docValue, then mark
|
||||
// the offset accordingly
|
||||
if offset, ok = fieldDvOffsets[uint16(fieldID)]; !ok {
|
||||
offset = math.MaxUint64
|
||||
}
|
||||
n := binary.PutUvarint(buf, uint64(offset))
|
||||
_, err := w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return math.MaxUint64, err
|
||||
}
|
||||
}
|
||||
|
||||
return fieldDocValuesOffset, nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,180 @@
|
|||
// Copyright (c) 2017 Couchbase, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package zap
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
var termSeparator byte = 0xff
|
||||
var termSeparatorSplitSlice = []byte{termSeparator}
|
||||
|
||||
type chunkedContentCoder struct {
|
||||
final []byte
|
||||
chunkSize uint64
|
||||
currChunk uint64
|
||||
chunkLens []uint64
|
||||
chunkMetaBuf bytes.Buffer
|
||||
chunkBuf bytes.Buffer
|
||||
|
||||
chunkMeta []metaData
|
||||
}
|
||||
|
||||
// metaData represents the data information inside a
|
||||
// chunk.
|
||||
type metaData struct {
|
||||
docID uint64 // docid of the data inside the chunk
|
||||
docDvLoc uint64 // starting offset for a given docid
|
||||
docDvLen uint64 // length of data inside the chunk for the given docid
|
||||
}
|
||||
|
||||
// newChunkedContentCoder returns a new chunk content coder which
|
||||
// packs data into chunks based on the provided chunkSize
|
||||
func newChunkedContentCoder(chunkSize uint64, maxDocNum uint64) *chunkedContentCoder {
|
||||
total := maxDocNum/chunkSize + 1
|
||||
rv := &chunkedContentCoder{
|
||||
chunkSize: chunkSize,
|
||||
chunkLens: make([]uint64, total),
|
||||
chunkMeta: []metaData{},
|
||||
}
|
||||
|
||||
return rv
|
||||
}
|
||||
|
||||
// Reset lets you reuse this chunked content coder. Buffers are reset
|
||||
// and re used. You cannot change the chunk size.
|
||||
func (c *chunkedContentCoder) Reset() {
|
||||
c.currChunk = 0
|
||||
c.final = c.final[:0]
|
||||
c.chunkBuf.Reset()
|
||||
c.chunkMetaBuf.Reset()
|
||||
for i := range c.chunkLens {
|
||||
c.chunkLens[i] = 0
|
||||
}
|
||||
c.chunkMeta = []metaData{}
|
||||
}
|
||||
|
||||
// Close indicates you are done calling Add() this allows
|
||||
// the final chunk to be encoded.
|
||||
func (c *chunkedContentCoder) Close() {
|
||||
c.flushContents()
|
||||
}
|
||||
|
||||
func (c *chunkedContentCoder) flushContents() error {
|
||||
// flush the contents, with meta information at first
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
n := binary.PutUvarint(buf, uint64(len(c.chunkMeta)))
|
||||
_, err := c.chunkMetaBuf.Write(buf[:n])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// write out the metaData slice
|
||||
for _, meta := range c.chunkMeta {
|
||||
n := binary.PutUvarint(buf, meta.docID)
|
||||
_, err = c.chunkMetaBuf.Write(buf[:n])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n = binary.PutUvarint(buf, meta.docDvLoc)
|
||||
_, err = c.chunkMetaBuf.Write(buf[:n])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n = binary.PutUvarint(buf, meta.docDvLen)
|
||||
_, err = c.chunkMetaBuf.Write(buf[:n])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// write the metadata to final data
|
||||
metaData := c.chunkMetaBuf.Bytes()
|
||||
c.final = append(c.final, c.chunkMetaBuf.Bytes()...)
|
||||
// write the compressed data to the final data
|
||||
compressedData := snappy.Encode(nil, c.chunkBuf.Bytes())
|
||||
c.final = append(c.final, compressedData...)
|
||||
//c.chunkLens = append(c.chunkLens, uint64(len(compressedData)+len(metaData)))
|
||||
c.chunkLens[c.currChunk] = uint64(len(compressedData) + len(metaData))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add encodes the provided byte slice into the correct chunk for the provided
|
||||
// doc num. You MUST call Add() with increasing docNums.
|
||||
func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error {
|
||||
chunk := docNum / c.chunkSize
|
||||
if chunk != c.currChunk {
|
||||
// flush out the previous chunk details
|
||||
err := c.flushContents()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// clearing the chunk specific meta for next chunk
|
||||
c.chunkBuf.Reset()
|
||||
c.chunkMetaBuf.Reset()
|
||||
c.chunkMeta = []metaData{}
|
||||
c.currChunk = chunk
|
||||
}
|
||||
|
||||
// mark the starting offset for this doc
|
||||
dvOffset := c.chunkBuf.Len()
|
||||
dvSize, err := c.chunkBuf.Write(vals)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.chunkMeta = append(c.chunkMeta, metaData{
|
||||
docID: docNum,
|
||||
docDvLoc: uint64(dvOffset),
|
||||
docDvLen: uint64(dvSize),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write commits all the encoded chunked contents to the provided writer.
|
||||
func (c *chunkedContentCoder) Write(w io.Writer) (int, error) {
|
||||
var tw int
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
// write out the number of chunks
|
||||
n := binary.PutUvarint(buf, uint64(len(c.chunkLens)))
|
||||
nw, err := w.Write(buf[:n])
|
||||
tw += nw
|
||||
if err != nil {
|
||||
return tw, err
|
||||
}
|
||||
// write out the chunk lens
|
||||
for _, chunkLen := range c.chunkLens {
|
||||
n := binary.PutUvarint(buf, uint64(chunkLen))
|
||||
nw, err = w.Write(buf[:n])
|
||||
tw += nw
|
||||
if err != nil {
|
||||
return tw, err
|
||||
}
|
||||
}
|
||||
// write out the data
|
||||
nw, err = w.Write(c.final)
|
||||
tw += nw
|
||||
if err != nil {
|
||||
return tw, err
|
||||
}
|
||||
return tw, nil
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
// Copyright (c) 2017 Couchbase, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package zap
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestChunkContentCoder(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
maxDocNum uint64
|
||||
chunkSize uint64
|
||||
docNums []uint64
|
||||
vals [][]byte
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
maxDocNum: 0,
|
||||
chunkSize: 1,
|
||||
docNums: []uint64{0},
|
||||
vals: [][]byte{[]byte("bleve")},
|
||||
// 1 chunk, chunk-0 length 11(b), value
|
||||
expected: string([]byte{0x1, 0xb, 0x1, 0x0, 0x0, 0x05, 0x05, 0x10, 0x62, 0x6c, 0x65, 0x76, 0x65}),
|
||||
},
|
||||
{
|
||||
maxDocNum: 1,
|
||||
chunkSize: 1,
|
||||
docNums: []uint64{0, 1},
|
||||
vals: [][]byte{
|
||||
[]byte("upside"),
|
||||
[]byte("scorch"),
|
||||
},
|
||||
|
||||
expected: string([]byte{0x02, 0x0c, 0x0c, 0x01, 0x00, 0x00, 0x06, 0x06, 0x14,
|
||||
0x75, 0x70, 0x73, 0x69, 0x64, 0x65, 0x01, 0x01, 0x00, 0x06, 0x06,
|
||||
0x14, 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68}),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
||||
cic := newChunkedContentCoder(test.chunkSize, test.maxDocNum)
|
||||
for i, docNum := range test.docNums {
|
||||
err := cic.Add(docNum, test.vals[i])
|
||||
if err != nil {
|
||||
t.Fatalf("error adding to intcoder: %v", err)
|
||||
}
|
||||
}
|
||||
cic.Close()
|
||||
var actual bytes.Buffer
|
||||
_, err := cic.Write(&actual)
|
||||
if err != nil {
|
||||
t.Fatalf("error writing: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(test.expected, string(actual.Bytes())) {
|
||||
t.Errorf("got % s, expected % s", string(actual.Bytes()), test.expected)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,176 @@
|
|||
// Copyright (c) 2017 Couchbase, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package zap
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/blevesearch/bleve/index"
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
type docValueIterator struct {
|
||||
field string
|
||||
curChunkNum uint64
|
||||
numChunks uint64
|
||||
chunkLens []uint64
|
||||
dvDataLoc uint64
|
||||
curChunkHeader []metaData
|
||||
curChunkData []byte // compressed data cache
|
||||
}
|
||||
|
||||
func (di *docValueIterator) fieldName() string {
|
||||
return di.field
|
||||
}
|
||||
|
||||
func (di *docValueIterator) curChunkNumber() uint64 {
|
||||
return di.curChunkNum
|
||||
}
|
||||
|
||||
func (s *Segment) loadFieldDocValueIterator(field string,
|
||||
fieldDvLoc uint64) (*docValueIterator, error) {
|
||||
// get the docValue offset for the given fields
|
||||
if fieldDvLoc == math.MaxUint64 {
|
||||
return nil, fmt.Errorf("loadFieldDocValueConfigs: "+
|
||||
"no docValues found for field: %s", field)
|
||||
}
|
||||
|
||||
// read the number of chunks, chunk lengths
|
||||
var offset uint64
|
||||
numChunks, read := binary.Uvarint(s.mm[fieldDvLoc : fieldDvLoc+binary.MaxVarintLen64])
|
||||
if read <= 0 {
|
||||
return nil, fmt.Errorf("failed to read the field "+
|
||||
"doc values for field %s", field)
|
||||
}
|
||||
offset += uint64(read)
|
||||
|
||||
fdvIter := &docValueIterator{
|
||||
curChunkNum: math.MaxUint64,
|
||||
field: field,
|
||||
chunkLens: make([]uint64, int(numChunks)),
|
||||
}
|
||||
for i := 0; i < int(numChunks); i++ {
|
||||
fdvIter.chunkLens[i], read = binary.Uvarint(s.mm[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64])
|
||||
offset += uint64(read)
|
||||
}
|
||||
|
||||
fdvIter.dvDataLoc = fieldDvLoc + offset
|
||||
return fdvIter, nil
|
||||
}
|
||||
|
||||
func (di *docValueIterator) loadDvChunk(chunkNumber,
|
||||
localDocNum uint64, s *Segment) error {
|
||||
// advance to the chunk where the docValues
|
||||
// reside for the given docID
|
||||
destChunkDataLoc := di.dvDataLoc
|
||||
for i := 0; i < int(chunkNumber); i++ {
|
||||
destChunkDataLoc += di.chunkLens[i]
|
||||
}
|
||||
|
||||
curChunkSize := di.chunkLens[chunkNumber]
|
||||
// read the number of docs reside in the chunk
|
||||
numDocs, read := binary.Uvarint(s.mm[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64])
|
||||
if read <= 0 {
|
||||
return fmt.Errorf("failed to read the chunk")
|
||||
}
|
||||
chunkMetaLoc := destChunkDataLoc + uint64(read)
|
||||
|
||||
offset := uint64(0)
|
||||
di.curChunkHeader = make([]metaData, int(numDocs))
|
||||
for i := 0; i < int(numDocs); i++ {
|
||||
di.curChunkHeader[i].docID, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
|
||||
offset += uint64(read)
|
||||
di.curChunkHeader[i].docDvLoc, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
|
||||
offset += uint64(read)
|
||||
di.curChunkHeader[i].docDvLen, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
|
||||
offset += uint64(read)
|
||||
}
|
||||
|
||||
compressedDataLoc := chunkMetaLoc + offset
|
||||
dataLength := destChunkDataLoc + curChunkSize - compressedDataLoc
|
||||
di.curChunkData = s.mm[compressedDataLoc : compressedDataLoc+dataLength]
|
||||
di.curChunkNum = chunkNumber
|
||||
return nil
|
||||
}
|
||||
|
||||
func (di *docValueIterator) visitDocValues(docID uint64,
|
||||
visitor index.DocumentFieldTermVisitor) error {
|
||||
// binary search the term locations for the docID
|
||||
start, length := di.getDocValueLocs(docID)
|
||||
if start == math.MaxUint64 || length == math.MaxUint64 {
|
||||
return nil
|
||||
}
|
||||
// uncompress the already loaded data
|
||||
uncompressed, err := snappy.Decode(nil, di.curChunkData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// pick the terms for the given docID
|
||||
uncompressed = uncompressed[start : start+length]
|
||||
for {
|
||||
i := bytes.Index(uncompressed, termSeparatorSplitSlice)
|
||||
if i < 0 {
|
||||
break
|
||||
}
|
||||
|
||||
visitor(di.field, uncompressed[0:i])
|
||||
uncompressed = uncompressed[i+1:]
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (di *docValueIterator) getDocValueLocs(docID uint64) (uint64, uint64) {
|
||||
i := sort.Search(len(di.curChunkHeader), func(i int) bool {
|
||||
return di.curChunkHeader[i].docID >= docID
|
||||
})
|
||||
if i < len(di.curChunkHeader) && di.curChunkHeader[i].docID == docID {
|
||||
return di.curChunkHeader[i].docDvLoc, di.curChunkHeader[i].docDvLen
|
||||
}
|
||||
return math.MaxUint64, math.MaxUint64
|
||||
}
|
||||
|
||||
// VisitDocumentFieldTerms is an implementation of the UnInvertIndex interface
|
||||
func (s *Segment) VisitDocumentFieldTerms(localDocNum uint64, fields []string,
|
||||
visitor index.DocumentFieldTermVisitor) error {
|
||||
fieldID := uint16(0)
|
||||
ok := true
|
||||
for _, field := range fields {
|
||||
if fieldID, ok = s.fieldsMap[field]; !ok {
|
||||
continue
|
||||
}
|
||||
// find the chunkNumber where the docValues are stored
|
||||
docInChunk := localDocNum / uint64(s.chunkFactor)
|
||||
|
||||
if dvIter, exists := s.fieldDvIterMap[fieldID-1]; exists &&
|
||||
dvIter != nil {
|
||||
// check if the chunk is already loaded
|
||||
if docInChunk != dvIter.curChunkNumber() {
|
||||
err := dvIter.loadDvChunk(docInChunk, localDocNum, s)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
dvIter.visitDocValues(localDocNum, visitor)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -21,6 +21,7 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"sort"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/Smerity/govarint"
|
||||
|
@ -52,7 +53,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
|
|||
newSegDocCount := computeNewDocCount(segments, drops)
|
||||
|
||||
var newDocNums [][]uint64
|
||||
var storedIndexOffset uint64
|
||||
var storedIndexOffset, fieldDvLocsOffset uint64
|
||||
var dictLocs []uint64
|
||||
if newSegDocCount > 0 {
|
||||
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
|
||||
|
@ -61,7 +62,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
dictLocs, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap,
|
||||
dictLocs, fieldDvLocsOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap,
|
||||
newDocNums, newSegDocCount, chunkFactor, cr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -77,7 +78,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
|
|||
}
|
||||
|
||||
err = persistFooter(newSegDocCount, storedIndexOffset,
|
||||
fieldsIndexOffset, chunkFactor, cr)
|
||||
fieldsIndexOffset, fieldDvLocsOffset, chunkFactor, cr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -126,12 +127,14 @@ func computeNewDocCount(segments []*Segment, drops []*roaring.Bitmap) uint64 {
|
|||
func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
||||
fieldsInv []string, fieldsMap map[string]uint16, newDocNums [][]uint64,
|
||||
newSegDocCount uint64, chunkFactor uint32,
|
||||
w *CountHashWriter) ([]uint64, error) {
|
||||
w *CountHashWriter) ([]uint64, uint64, error) {
|
||||
|
||||
var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64)
|
||||
var bufLoc []uint64
|
||||
|
||||
rv := make([]uint64, len(fieldsInv))
|
||||
rv1 := make([]uint64, len(fieldsInv))
|
||||
fieldDvLocs := make([]uint64, len(fieldsInv))
|
||||
fieldDvLocsOffset := uint64(math.MaxUint64)
|
||||
|
||||
var vellumBuf bytes.Buffer
|
||||
// for each field
|
||||
|
@ -141,23 +144,23 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
}
|
||||
newVellum, err := vellum.New(&vellumBuf, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
|
||||
// collect FTS iterators from all segments for this field
|
||||
// collect FST iterators from all segments for this field
|
||||
var dicts []*Dictionary
|
||||
var itrs []vellum.Iterator
|
||||
for _, segment := range segments {
|
||||
dict, err2 := segment.dictionary(fieldName)
|
||||
if err2 != nil {
|
||||
return nil, err2
|
||||
return nil, fieldDvLocsOffset, err2
|
||||
}
|
||||
dicts = append(dicts, dict)
|
||||
|
||||
if dict != nil && dict.fst != nil {
|
||||
itr, err2 := dict.fst.Iterator(nil, nil)
|
||||
if err2 != nil && err2 != vellum.ErrIteratorDone {
|
||||
return nil, err2
|
||||
return nil, fieldDvLocsOffset, err2
|
||||
}
|
||||
if itr != nil {
|
||||
itrs = append(itrs, itr)
|
||||
|
@ -173,6 +176,8 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
|
||||
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
|
||||
locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
|
||||
fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), newSegDocCount-1)
|
||||
docTermMap := make(map[uint64][]byte, 0)
|
||||
for err == nil {
|
||||
term, _ := mergeItr.Current()
|
||||
|
||||
|
@ -189,7 +194,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
}
|
||||
postings, err2 := dict.postingsList(string(term), drops[dictI])
|
||||
if err2 != nil {
|
||||
return nil, err2
|
||||
return nil, fieldDvLocsOffset, err2
|
||||
}
|
||||
|
||||
postItr := postings.Iterator()
|
||||
|
@ -197,7 +202,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
for next != nil && err2 == nil {
|
||||
hitNewDocNum := newDocNums[dictI][next.Number()]
|
||||
if hitNewDocNum == docDropped {
|
||||
return nil, fmt.Errorf("see hit with dropped doc num")
|
||||
return nil, fieldDvLocsOffset, fmt.Errorf("see hit with dropped doc num")
|
||||
}
|
||||
newRoaring.Add(uint32(hitNewDocNum))
|
||||
// encode norm bits
|
||||
|
@ -205,7 +210,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
normBits := math.Float32bits(float32(norm))
|
||||
err3 := tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits))
|
||||
if err3 != nil {
|
||||
return nil, err3
|
||||
return nil, fieldDvLocsOffset, err3
|
||||
}
|
||||
locs := next.Locations()
|
||||
if len(locs) > 0 {
|
||||
|
@ -223,14 +228,17 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
args = append(args, loc.ArrayPositions()...)
|
||||
err = locEncoder.Add(hitNewDocNum, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
docTermMap[hitNewDocNum] = append(docTermMap[hitNewDocNum], []byte(term)...)
|
||||
docTermMap[hitNewDocNum] = append(docTermMap[hitNewDocNum], termSeparator)
|
||||
next, err2 = postItr.Next()
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -242,17 +250,17 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
freqOffset := uint64(w.Count())
|
||||
_, err = tfEncoder.Write(w)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
locOffset := uint64(w.Count())
|
||||
_, err = locEncoder.Write(w)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
postingLocOffset := uint64(w.Count())
|
||||
_, err = writeRoaringWithLen(newRoaringLocs, w)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
postingOffset := uint64(w.Count())
|
||||
// write out the start of the term info
|
||||
|
@ -260,43 +268,43 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
n := binary.PutUvarint(buf, freqOffset)
|
||||
_, err = w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
|
||||
// write out the start of the loc info
|
||||
n = binary.PutUvarint(buf, locOffset)
|
||||
_, err = w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
|
||||
// write out the start of the loc posting list
|
||||
n = binary.PutUvarint(buf, postingLocOffset)
|
||||
_, err = w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
_, err = writeRoaringWithLen(newRoaring, w)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
|
||||
err = newVellum.Insert(term, postingOffset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
}
|
||||
|
||||
err = mergeItr.Next()
|
||||
}
|
||||
if err != nil && err != vellum.ErrIteratorDone {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
|
||||
dictOffset := uint64(w.Count())
|
||||
err = newVellum.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
vellumData := vellumBuf.Bytes()
|
||||
|
||||
|
@ -306,19 +314,51 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
n := binary.PutUvarint(buf, uint64(len(vellumData)))
|
||||
_, err = w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
|
||||
// write this vellum to disk
|
||||
_, err = w.Write(vellumData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
|
||||
rv[fieldID] = dictOffset
|
||||
rv1[fieldID] = dictOffset
|
||||
|
||||
// update teh doc value
|
||||
var docNumbers docIDRange
|
||||
for k := range docTermMap {
|
||||
docNumbers = append(docNumbers, k)
|
||||
}
|
||||
sort.Sort(docNumbers)
|
||||
|
||||
for _, docNum := range docNumbers {
|
||||
err = fdvEncoder.Add(docNum, docTermMap[docNum])
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
}
|
||||
// get the field doc value offset
|
||||
fieldDvLocs[fieldID] = uint64(w.Count())
|
||||
fdvEncoder.Close()
|
||||
// persist the doc value details for this field
|
||||
_, err = fdvEncoder.Write(w)
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
}
|
||||
}
|
||||
|
||||
return rv, nil
|
||||
fieldDvLocsOffset = uint64(w.Count())
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
for _, offset := range fieldDvLocs {
|
||||
n := binary.PutUvarint(buf, uint64(offset))
|
||||
_, err := w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return nil, math.MaxUint64, err
|
||||
}
|
||||
}
|
||||
|
||||
return rv1, fieldDvLocsOffset, nil
|
||||
}
|
||||
|
||||
const docDropped = math.MaxUint64
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
|
@ -44,11 +45,12 @@ func Open(path string) (segment.Segment, error) {
|
|||
}
|
||||
|
||||
rv := &Segment{
|
||||
f: f,
|
||||
mm: mm,
|
||||
path: path,
|
||||
fieldsMap: make(map[string]uint16),
|
||||
refs: 1,
|
||||
f: f,
|
||||
mm: mm,
|
||||
path: path,
|
||||
fieldsMap: make(map[string]uint16),
|
||||
fieldDvIterMap: make(map[uint16]*docValueIterator),
|
||||
refs: 1,
|
||||
}
|
||||
|
||||
err = rv.loadConfig()
|
||||
|
@ -63,6 +65,12 @@ func Open(path string) (segment.Segment, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
err = rv.loadDvIterators()
|
||||
if err != nil {
|
||||
_ = rv.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
|
@ -82,6 +90,9 @@ type Segment struct {
|
|||
fieldsInv []string
|
||||
fieldsOffsets []uint64
|
||||
|
||||
docValueOffset uint64
|
||||
fieldDvIterMap map[uint16]*docValueIterator // naive chunk cache per field
|
||||
|
||||
m sync.Mutex // Protects the fields that follow.
|
||||
refs int64
|
||||
}
|
||||
|
@ -112,7 +123,11 @@ func (s *Segment) loadConfig() error {
|
|||
}
|
||||
chunkOffset := verOffset - 4
|
||||
s.chunkFactor = binary.BigEndian.Uint32(s.mm[chunkOffset : chunkOffset+4])
|
||||
fieldsOffset := chunkOffset - 8
|
||||
|
||||
docValueOffset := chunkOffset - 8
|
||||
s.docValueOffset = binary.BigEndian.Uint64(s.mm[docValueOffset : docValueOffset+8])
|
||||
fieldsOffset := docValueOffset - 8
|
||||
|
||||
s.fieldsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsOffset : fieldsOffset+8])
|
||||
storedOffset := fieldsOffset - 8
|
||||
s.storedIndexOffset = binary.BigEndian.Uint64(s.mm[storedOffset : storedOffset+8])
|
||||
|
@ -355,3 +370,20 @@ func (s *Segment) DictAddr(field string) (uint64, error) {
|
|||
|
||||
return s.fieldsOffsets[fieldID-1], nil
|
||||
}
|
||||
|
||||
func (s *Segment) loadDvIterators() error {
|
||||
if s.docValueOffset == math.MaxUint64 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var read uint64
|
||||
for fieldID, field := range s.fieldsInv {
|
||||
fieldLoc, n := binary.Uvarint(s.mm[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64])
|
||||
if n <= 0 {
|
||||
return fmt.Errorf("loadDvIterators: failed to read the docvalue offsets for field %d", fieldID)
|
||||
}
|
||||
s.fieldDvIterMap[uint16(fieldID)], _ = s.loadFieldDocValueIterator(field, fieldLoc)
|
||||
read += uint64(n)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -86,10 +86,10 @@ func persistFields(fieldsInv []string, w *CountHashWriter, dictLocs []uint64) (u
|
|||
}
|
||||
|
||||
// FooterSize is the size of the footer record in bytes
|
||||
// crc + ver + chunk + field offset + stored offset + num docs
|
||||
const FooterSize = 4 + 4 + 4 + 8 + 8 + 8
|
||||
// crc + ver + chunk + field offset + stored offset + num docs + docValueOffset
|
||||
const FooterSize = 4 + 4 + 4 + 8 + 8 + 8 + 8
|
||||
|
||||
func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset uint64,
|
||||
func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset, docValueOffset uint64,
|
||||
chunkFactor uint32, w *CountHashWriter) error {
|
||||
// write out the number of docs
|
||||
err := binary.Write(w, binary.BigEndian, numDocs)
|
||||
|
@ -106,6 +106,11 @@ func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset uint64,
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// write out the fieldDocValue location
|
||||
err = binary.Write(w, binary.BigEndian, docValueOffset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// write out 32-bit chunk factor
|
||||
err = binary.Write(w, binary.BigEndian, chunkFactor)
|
||||
if err != nil {
|
||||
|
|
|
@ -22,6 +22,9 @@ import (
|
|||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/blevesearch/bleve/index/scorch/segment/mem"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment/zap"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/blevesearch/bleve/document"
|
||||
"github.com/blevesearch/bleve/index"
|
||||
|
@ -401,24 +404,35 @@ func (i *IndexSnapshot) DocumentVisitFieldTerms(id index.IndexInternalID,
|
|||
|
||||
ss := i.segment[segmentIndex]
|
||||
|
||||
err = ss.cachedDocs.prepareFields(fields, ss)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch seg := ss.segment.(type) {
|
||||
case *mem.Segment:
|
||||
err = ss.cachedDocs.prepareFields(fields, ss)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, field := range fields {
|
||||
if cachedFieldDocs, exists := ss.cachedDocs.cache[field]; exists {
|
||||
if tlist, exists := cachedFieldDocs.docs[localDocNum]; exists {
|
||||
for {
|
||||
i := bytes.Index(tlist, TermSeparatorSplitSlice)
|
||||
if i < 0 {
|
||||
break
|
||||
for _, field := range fields {
|
||||
if cachedFieldDocs, exists := ss.cachedDocs.cache[field]; exists {
|
||||
if tlist, exists := cachedFieldDocs.docs[localDocNum]; exists {
|
||||
for {
|
||||
i := bytes.Index(tlist, TermSeparatorSplitSlice)
|
||||
if i < 0 {
|
||||
break
|
||||
}
|
||||
visitor(field, tlist[0:i])
|
||||
tlist = tlist[i+1:]
|
||||
}
|
||||
visitor(field, tlist[0:i])
|
||||
tlist = tlist[i+1:]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case *zap.Segment:
|
||||
if zaps, ok := ss.segment.(UnInvertIndex); ok {
|
||||
return zaps.VisitDocumentFieldTerms(localDocNum, fields, visitor)
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("snapshot_index: DocumentVisitFieldTerms, unknown segment type: %T", seg)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue