0
0
Fork 0

Merge pull request #703 from sreekanth-cb/docValue_persisted

docValue persist changes
This commit is contained in:
Marty Schoch 2018-01-04 10:34:58 -05:00 committed by GitHub
commit 71cdac785d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1006 additions and 42 deletions

View File

@ -37,6 +37,14 @@ const Name = "scorch"
const Version uint8 = 1
// UnInvertIndex is implemented by various scorch index implementations
// to provide the un inverting of the postings or other indexed values.
type UnInvertIndex interface {
// apparently need better namings here..
VisitDocumentFieldTerms(localDocNum uint64, fields []string,
visitor index.DocumentFieldTermVisitor) error
}
type Scorch struct {
readOnly bool
version uint8

View File

@ -119,6 +119,10 @@ func (s *Segment) processDocument(result *index.AnalysisResult) {
if field.Options().IsStored() {
storeField(docNum, fieldID, encodeFieldType(field), field.Value(), field.ArrayPositions())
}
// TODO with mapping changes for dv
//if field.Options().IncludeDocValues() {
s.DocValueFields[fieldID] = true
//}
}
// now that its been rolled up into docMap, walk that

View File

@ -88,6 +88,9 @@ type Segment struct {
// docNum -> field id -> slice of array positions (each is []uint64)
StoredPos []map[uint16][][]uint64
// for storing the docValue persisted fields
DocValueFields map[uint16]bool
// footprint of the segment, updated when analyzed document mutations
// are added into the segment
sizeInBytes uint64
@ -96,7 +99,8 @@ type Segment struct {
// New builds a new empty Segment
func New() *Segment {
return &Segment{
FieldsMap: map[string]uint16{},
FieldsMap: map[string]uint16{},
DocValueFields: map[uint16]bool{},
}
}

View File

@ -89,3 +89,11 @@ type Location interface {
Pos() uint64
ArrayPositions() []uint64
}
// DocumentFieldTermVisitable is implemented by various scorch segment
// implementations to provide the un inverting of the postings
// or other indexed values.
type DocumentFieldTermVisitable interface {
VisitDocumentFieldTerms(localDocNum uint64, fields []string,
visitor index.DocumentFieldTermVisitor) error
}

View File

@ -8,7 +8,7 @@ Current usage:
- crc-32 bytes and version are in fixed position at end of the file
- reading remainder of footer could be version specific
- remainder of footer gives us:
- 2 important offsets (fields index and stored data index)
- 3 important offsets (docValue , fields index and stored data index)
- 2 important values (number of docs and chunk factor)
- field data is processed once and memoized onto the heap so that we never have to go back to disk for it
- access to stored data by doc number means first navigating to the stored data index, then accessing a fixed position offset into that slice, which gives us the actual address of the data. the first bytes of that section tell us the size of data so that we know where it ends.
@ -140,12 +140,28 @@ If you know the doc number you're interested in, this format lets you jump to th
NOTE: currently we don't know or record the length of this fields index. Instead we rely on the fact that we know it immediately precedes a footer of known size.
## fields DocValue
- for each field
- preparation phase:
- produce a slice containing multiple consecutive chunks, where each chunk is composed of a meta section followed by compressed columnar field data
- produce a slice remembering the length of each chunk
- file writing phase:
- remember the start position of this first field DocValue offset in the footer
- write out number of chunks that follow (varint uint64)
- write out length of each chunk (each a varint uint64)
- write out the byte slice containing all the chunk data
NOTE: currently the meta header inside each chunk gives clue to the location offsets and size of the data pertaining to a given docID and any
read operation leverage that meta information to extract the document specific data from the file.
## footer
- file writing phase
- write number of docs (big endian uint64)
- write stored field index location (big endian uint64)
- write field index location (big endian uint64)
- write field docValue location (big endian uint64)
- write out chunk factor (big endian uint32)
- write out version (big endian uint32)
- write out file CRC of everything preceding this (big endian uint32)

View File

@ -20,6 +20,7 @@ import (
"encoding/binary"
"math"
"os"
"sort"
"github.com/Smerity/govarint"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
@ -27,7 +28,9 @@ import (
"github.com/golang/snappy"
)
const version uint32 = 1
const version uint32 = 2
const fieldNotUninverted = math.MaxUint64
// PersistSegment takes the in-memory segment and persists it to the specified
// path in the zap file format.
@ -48,6 +51,7 @@ func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (e
var storedIndexOffset uint64
var dictLocs []uint64
docValueOffset := uint64(fieldNotUninverted)
if len(memSegment.Stored) > 0 {
storedIndexOffset, err = persistStored(memSegment, cr)
@ -78,6 +82,11 @@ func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (e
return err
}
docValueOffset, err = persistFieldDocValues(cr, chunkFactor, memSegment)
if err != nil {
return err
}
} else {
dictLocs = make([]uint64, len(memSegment.FieldsInv))
}
@ -89,7 +98,7 @@ func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (e
}
err = persistFooter(uint64(len(memSegment.Stored)), storedIndexOffset,
fieldIndexStart, chunkFactor, cr)
fieldIndexStart, docValueOffset, chunkFactor, cr)
if err != nil {
return err
}
@ -419,3 +428,107 @@ func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs
return rv, nil
}
type docIDRange []uint64
func (a docIDRange) Len() int { return len(a) }
func (a docIDRange) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a docIDRange) Less(i, j int) bool { return a[i] < a[j] }
func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
chunkFactor uint32) (map[uint16]uint64, error) {
fieldChunkOffsets := make(map[uint16]uint64, len(memSegment.FieldsInv))
fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
for fieldID := range memSegment.DocValueFields {
field := memSegment.FieldsInv[fieldID]
docTermMap := make(map[uint64][]byte, 0)
dict, err := memSegment.Dictionary(field)
if err != nil {
return nil, err
}
dictItr := dict.Iterator()
next, err := dictItr.Next()
for err == nil && next != nil {
postings, err1 := dict.PostingsList(next.Term, nil)
if err1 != nil {
return nil, err
}
postingsItr := postings.Iterator()
nextPosting, err2 := postingsItr.Next()
for err2 == nil && nextPosting != nil {
docNum := nextPosting.Number()
docTermMap[docNum] = append(docTermMap[docNum], []byte(next.Term)...)
docTermMap[docNum] = append(docTermMap[docNum], termSeparator)
nextPosting, err2 = postingsItr.Next()
}
if err2 != nil {
return nil, err2
}
next, err = dictItr.Next()
}
if err != nil {
return nil, err
}
// sort wrt to docIDs
var docNumbers docIDRange
for k := range docTermMap {
docNumbers = append(docNumbers, k)
}
sort.Sort(docNumbers)
for _, docNum := range docNumbers {
err = fdvEncoder.Add(docNum, docTermMap[docNum])
if err != nil {
return nil, err
}
}
fieldChunkOffsets[fieldID] = uint64(w.Count())
err = fdvEncoder.Close()
if err != nil {
return nil, err
}
// persist the doc value details for this field
_, err = fdvEncoder.Write(w)
if err != nil {
return nil, err
}
// resetting encoder for the next field
fdvEncoder.Reset()
}
return fieldChunkOffsets, nil
}
func persistFieldDocValues(w *CountHashWriter, chunkFactor uint32,
memSegment *mem.Segment) (uint64, error) {
fieldDvOffsets, err := persistDocValues(memSegment, w, chunkFactor)
if err != nil {
return 0, err
}
fieldDocValuesOffset := uint64(w.Count())
buf := make([]byte, binary.MaxVarintLen64)
offset := uint64(0)
ok := true
for fieldID := range memSegment.FieldsInv {
// if the field isn't configured for docValue, then mark
// the offset accordingly
if offset, ok = fieldDvOffsets[uint16(fieldID)]; !ok {
offset = fieldNotUninverted
}
n := binary.PutUvarint(buf, uint64(offset))
_, err := w.Write(buf[:n])
if err != nil {
return 0, err
}
}
return fieldDocValuesOffset, nil
}

View File

@ -0,0 +1,274 @@
// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"bytes"
"encoding/binary"
"fmt"
"log"
"math"
"sort"
"strconv"
"github.com/blevesearch/bleve/index/scorch/segment/zap"
"github.com/golang/snappy"
"github.com/spf13/cobra"
)
// docvalueCmd represents the docvalue command
var docvalueCmd = &cobra.Command{
Use: "docvalue [path] <field> optional <docNum> optional",
Short: "docvalue prints the docvalue details by field, and docNum",
Long: `The docvalue command lets you explore the docValues in order of field and by doc number.`,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) < 1 {
return fmt.Errorf("must specify index file path")
}
data := segment.Data()
crcOffset := len(data) - 4
verOffset := crcOffset - 4
chunkOffset := verOffset - 4
fieldsOffset := chunkOffset - 16
fieldsIndexOffset := binary.BigEndian.Uint64(data[fieldsOffset : fieldsOffset+8])
fieldsIndexEnd := uint64(len(data) - zap.FooterSize)
// iterate through fields index
var fieldInv []string
var id, read, fieldLoc uint64
var nread int
for fieldsIndexOffset+(8*id) < fieldsIndexEnd {
addr := binary.BigEndian.Uint64(data[fieldsIndexOffset+(8*id) : fieldsIndexOffset+(8*id)+8])
var n uint64
_, read := binary.Uvarint(data[addr+n : fieldsIndexEnd])
n += uint64(read)
var nameLen uint64
nameLen, read = binary.Uvarint(data[addr+n : fieldsIndexEnd])
n += uint64(read)
name := string(data[addr+n : addr+n+nameLen])
id++
fieldInv = append(fieldInv, name)
}
dvLoc := segment.DocValueOffset()
fieldDvLoc, total, fdvread := uint64(0), uint64(0), int(0)
var fieldName string
var fieldID uint16
// if no fields are specified then print the docValue offsets for all fields set
for id, field := range fieldInv {
fieldLoc, fdvread = binary.Uvarint(data[dvLoc+read : dvLoc+read+binary.MaxVarintLen64])
if fdvread <= 0 {
return fmt.Errorf("loadDvIterators: failed to read the docvalue offsets for field %d", fieldID)
}
read += uint64(fdvread)
if fieldLoc == math.MaxUint64 {
fmt.Printf("fieldID: %d '%s' docvalue at %d (%x) not persisted \n", id, field, fieldLoc, fieldLoc)
continue
}
var offset, clen, numChunks uint64
numChunks, nread = binary.Uvarint(data[fieldLoc : fieldLoc+binary.MaxVarintLen64])
if nread <= 0 {
return fmt.Errorf("failed to read the field "+
"doc values for field %s", fieldName)
}
offset += uint64(nread)
// read the length of chunks
totalSize := uint64(0)
chunkLens := make([]uint64, numChunks)
for i := 0; i < int(numChunks); i++ {
clen, nread = binary.Uvarint(data[fieldLoc+offset : fieldLoc+offset+binary.MaxVarintLen64])
if nread <= 0 {
return fmt.Errorf("corrupted chunk length for chunk number: %d", i)
}
chunkLens[i] = clen
totalSize += clen
offset += uint64(nread)
}
total += totalSize
if len(args) == 1 {
// if no field args are given, then print out the dv locations for all fields
mbsize := float64(totalSize) / (1024 * 1024)
fmt.Printf("fieldID: %d '%s' docvalue at %d (%x) numChunks %d diskSize %.3f MB\n", id, field, fieldLoc, fieldLoc, numChunks, mbsize)
continue
}
if field != args[1] {
continue
} else {
fieldDvLoc = fieldLoc
fieldName = field
fieldID = uint16(id)
}
}
mbsize := float64(total) / (1024 * 1024)
fmt.Printf("Total Doc Values Size on Disk: %.3f MB\n", mbsize)
// done with the fields dv locs printing for the given zap file
if len(args) == 1 {
return nil
}
if fieldName == "" || fieldDvLoc == 0 {
return fmt.Errorf("no field found for given field arg: %s", args[1])
}
// read the number of chunks
var offset, clen, numChunks uint64
numChunks, nread = binary.Uvarint(data[fieldDvLoc : fieldDvLoc+binary.MaxVarintLen64])
if nread <= 0 {
return fmt.Errorf("failed to read the field "+
"doc values for field %s", fieldName)
}
offset += uint64(nread)
if len(args) == 2 {
fmt.Printf("number of chunks: %d\n", numChunks)
}
// read the length of chunks
chunkLens := make([]uint64, numChunks)
for i := 0; i < int(numChunks); i++ {
clen, nread = binary.Uvarint(data[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64])
if nread <= 0 {
return fmt.Errorf("corrupted chunk length for chunk number: %d", i)
}
chunkLens[i] = clen
offset += uint64(nread)
if len(args) == 2 {
fmt.Printf("chunk: %d size: %d \n", i, clen)
}
/*
TODO => dump all chunk headers??
if len(args) == 3 && args[2] == ">" {
dumpChunkDocIDs(data, )
}*/
}
if len(args) == 2 {
return nil
}
localDocNum, err := strconv.Atoi(args[2])
if err != nil {
return fmt.Errorf("unable to parse doc number: %v", err)
}
if localDocNum >= int(segment.NumDocs()) {
return fmt.Errorf("invalid doc number %d (valid 0 - %d)", localDocNum, segment.NumDocs()-1)
}
// find the chunkNumber where the docValues are stored
docInChunk := uint64(localDocNum) / uint64(segment.ChunkFactor())
if numChunks < docInChunk {
return fmt.Errorf("no chunk exists for chunk number: %d for docID: %d", docInChunk, localDocNum)
}
destChunkDataLoc := fieldDvLoc + offset
for i := 0; i < int(docInChunk); i++ {
destChunkDataLoc += chunkLens[i]
}
curChunkSize := chunkLens[docInChunk]
// read the number of docs reside in the chunk
numDocs := uint64(0)
numDocs, nread = binary.Uvarint(data[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64])
if nread <= 0 {
return fmt.Errorf("failed to read the target chunk: %d", docInChunk)
}
chunkMetaLoc := destChunkDataLoc + uint64(nread)
offset = uint64(0)
curChunkHeader := make([]zap.MetaData, int(numDocs))
for i := 0; i < int(numDocs); i++ {
curChunkHeader[i].DocID, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(nread)
curChunkHeader[i].DocDvLoc, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(nread)
curChunkHeader[i].DocDvLen, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(nread)
}
compressedDataLoc := chunkMetaLoc + offset
dataLength := destChunkDataLoc + curChunkSize - compressedDataLoc
curChunkData := data[compressedDataLoc : compressedDataLoc+dataLength]
start, length := getDocValueLocs(uint64(localDocNum), curChunkHeader)
if start == math.MaxUint64 || length == math.MaxUint64 {
fmt.Printf("no field values found for docID %d\n", localDocNum)
fmt.Printf("Try docIDs present in chunk: %s\n", assortDocID(curChunkHeader))
return nil
}
// uncompress the already loaded data
uncompressed, err := snappy.Decode(nil, curChunkData)
if err != nil {
log.Printf("snappy err %+v ", err)
return err
}
var termSeparator byte = 0xff
var termSeparatorSplitSlice = []byte{termSeparator}
// pick the terms for the given docID
uncompressed = uncompressed[start : start+length]
for {
i := bytes.Index(uncompressed, termSeparatorSplitSlice)
if i < 0 {
break
}
fmt.Printf(" %s ", uncompressed[0:i])
uncompressed = uncompressed[i+1:]
}
fmt.Printf(" \n ")
return nil
},
}
func getDocValueLocs(docID uint64, metaHeader []zap.MetaData) (uint64, uint64) {
i := sort.Search(len(metaHeader), func(i int) bool {
return metaHeader[i].DocID >= docID
})
if i < len(metaHeader) && metaHeader[i].DocID == docID {
return metaHeader[i].DocDvLoc, metaHeader[i].DocDvLen
}
return math.MaxUint64, math.MaxUint64
}
func assortDocID(metaHeader []zap.MetaData) string {
docIDs := ""
for _, meta := range metaHeader {
id := fmt.Sprintf("%d", meta.DocID)
docIDs += id + ", "
}
return docIDs
}
func init() {
RootCmd.AddCommand(docvalueCmd)
}

View File

@ -34,7 +34,7 @@ var fieldsCmd = &cobra.Command{
crcOffset := len(data) - 4
verOffset := crcOffset - 4
chunkOffset := verOffset - 4
fieldsOffset := chunkOffset - 8
fieldsOffset := chunkOffset - 16
fieldsIndexOffset := binary.BigEndian.Uint64(data[fieldsOffset : fieldsOffset+8])
fieldsIndexEnd := uint64(len(data) - zap.FooterSize)

View File

@ -33,6 +33,7 @@ var footerCmd = &cobra.Command{
fmt.Printf("Chunk Factor: %d\n", segment.ChunkFactor())
fmt.Printf("Fields Idx: %d (%#x)\n", segment.FieldsIndexOffset(), segment.FieldsIndexOffset())
fmt.Printf("Stored Idx: %d (%#x)\n", segment.StoredIndexOffset(), segment.StoredIndexOffset())
fmt.Printf("DocValue Idx: %d (%#x)\n", segment.DocValueOffset(), segment.DocValueOffset())
fmt.Printf("Num Docs: %d\n", segment.NumDocs())
return nil
},

View File

@ -0,0 +1,167 @@
// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package zap
import (
"bytes"
"encoding/binary"
"io"
"github.com/golang/snappy"
)
var termSeparator byte = 0xff
var termSeparatorSplitSlice = []byte{termSeparator}
type chunkedContentCoder struct {
final []byte
chunkSize uint64
currChunk uint64
chunkLens []uint64
chunkMetaBuf bytes.Buffer
chunkBuf bytes.Buffer
chunkMeta []MetaData
}
// MetaData represents the data information inside a
// chunk.
type MetaData struct {
DocID uint64 // docid of the data inside the chunk
DocDvLoc uint64 // starting offset for a given docid
DocDvLen uint64 // length of data inside the chunk for the given docid
}
// newChunkedContentCoder returns a new chunk content coder which
// packs data into chunks based on the provided chunkSize
func newChunkedContentCoder(chunkSize uint64,
maxDocNum uint64) *chunkedContentCoder {
total := maxDocNum/chunkSize + 1
rv := &chunkedContentCoder{
chunkSize: chunkSize,
chunkLens: make([]uint64, total),
chunkMeta: []MetaData{},
}
return rv
}
// Reset lets you reuse this chunked content coder. Buffers are reset
// and re used. You cannot change the chunk size.
func (c *chunkedContentCoder) Reset() {
c.currChunk = 0
c.final = c.final[:0]
c.chunkBuf.Reset()
c.chunkMetaBuf.Reset()
for i := range c.chunkLens {
c.chunkLens[i] = 0
}
c.chunkMeta = []MetaData{}
}
// Close indicates you are done calling Add() this allows
// the final chunk to be encoded.
func (c *chunkedContentCoder) Close() error {
return c.flushContents()
}
func (c *chunkedContentCoder) flushContents() error {
// flush the contents, with meta information at first
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(buf, uint64(len(c.chunkMeta)))
_, err := c.chunkMetaBuf.Write(buf[:n])
if err != nil {
return err
}
// write out the metaData slice
for _, meta := range c.chunkMeta {
_, err := writeUvarints(&c.chunkMetaBuf, meta.DocID, meta.DocDvLoc, meta.DocDvLen)
if err != nil {
return err
}
}
// write the metadata to final data
metaData := c.chunkMetaBuf.Bytes()
c.final = append(c.final, c.chunkMetaBuf.Bytes()...)
// write the compressed data to the final data
compressedData := snappy.Encode(nil, c.chunkBuf.Bytes())
c.final = append(c.final, compressedData...)
c.chunkLens[c.currChunk] = uint64(len(compressedData) + len(metaData))
return nil
}
// Add encodes the provided byte slice into the correct chunk for the provided
// doc num. You MUST call Add() with increasing docNums.
func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error {
chunk := docNum / c.chunkSize
if chunk != c.currChunk {
// flush out the previous chunk details
err := c.flushContents()
if err != nil {
return err
}
// clearing the chunk specific meta for next chunk
c.chunkBuf.Reset()
c.chunkMetaBuf.Reset()
c.chunkMeta = []MetaData{}
c.currChunk = chunk
}
// mark the starting offset for this doc
dvOffset := c.chunkBuf.Len()
dvSize, err := c.chunkBuf.Write(vals)
if err != nil {
return err
}
c.chunkMeta = append(c.chunkMeta, MetaData{
DocID: docNum,
DocDvLoc: uint64(dvOffset),
DocDvLen: uint64(dvSize),
})
return nil
}
// Write commits all the encoded chunked contents to the provided writer.
func (c *chunkedContentCoder) Write(w io.Writer) (int, error) {
var tw int
buf := make([]byte, binary.MaxVarintLen64)
// write out the number of chunks
n := binary.PutUvarint(buf, uint64(len(c.chunkLens)))
nw, err := w.Write(buf[:n])
tw += nw
if err != nil {
return tw, err
}
// write out the chunk lens
for _, chunkLen := range c.chunkLens {
n := binary.PutUvarint(buf, uint64(chunkLen))
nw, err = w.Write(buf[:n])
tw += nw
if err != nil {
return tw, err
}
}
// write out the data
nw, err = w.Write(c.final)
tw += nw
if err != nil {
return tw, err
}
return tw, nil
}

View File

@ -0,0 +1,75 @@
// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package zap
import (
"bytes"
"reflect"
"testing"
)
func TestChunkContentCoder(t *testing.T) {
tests := []struct {
maxDocNum uint64
chunkSize uint64
docNums []uint64
vals [][]byte
expected string
}{
{
maxDocNum: 0,
chunkSize: 1,
docNums: []uint64{0},
vals: [][]byte{[]byte("bleve")},
// 1 chunk, chunk-0 length 11(b), value
expected: string([]byte{0x1, 0xb, 0x1, 0x0, 0x0, 0x05, 0x05, 0x10, 0x62, 0x6c, 0x65, 0x76, 0x65}),
},
{
maxDocNum: 1,
chunkSize: 1,
docNums: []uint64{0, 1},
vals: [][]byte{
[]byte("upside"),
[]byte("scorch"),
},
expected: string([]byte{0x02, 0x0c, 0x0c, 0x01, 0x00, 0x00, 0x06, 0x06, 0x14,
0x75, 0x70, 0x73, 0x69, 0x64, 0x65, 0x01, 0x01, 0x00, 0x06, 0x06,
0x14, 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68}),
},
}
for _, test := range tests {
cic := newChunkedContentCoder(test.chunkSize, test.maxDocNum)
for i, docNum := range test.docNums {
err := cic.Add(docNum, test.vals[i])
if err != nil {
t.Fatalf("error adding to intcoder: %v", err)
}
}
_ = cic.Close()
var actual bytes.Buffer
_, err := cic.Write(&actual)
if err != nil {
t.Fatalf("error writing: %v", err)
}
if !reflect.DeepEqual(test.expected, string(actual.Bytes())) {
t.Errorf("got % s, expected % s", string(actual.Bytes()), test.expected)
}
}
}

View File

@ -0,0 +1,180 @@
// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package zap
import (
"bytes"
"encoding/binary"
"fmt"
"math"
"sort"
"github.com/blevesearch/bleve/index"
"github.com/golang/snappy"
)
type docValueIterator struct {
field string
curChunkNum uint64
numChunks uint64
chunkLens []uint64
dvDataLoc uint64
curChunkHeader []MetaData
curChunkData []byte // compressed data cache
}
func (di *docValueIterator) fieldName() string {
return di.field
}
func (di *docValueIterator) curChunkNumber() uint64 {
return di.curChunkNum
}
func (s *Segment) loadFieldDocValueIterator(field string,
fieldDvLoc uint64) (*docValueIterator, error) {
// get the docValue offset for the given fields
if fieldDvLoc == fieldNotUninverted {
return nil, fmt.Errorf("loadFieldDocValueConfigs: "+
"no docValues found for field: %s", field)
}
// read the number of chunks, chunk lengths
var offset, clen uint64
numChunks, read := binary.Uvarint(s.mm[fieldDvLoc : fieldDvLoc+binary.MaxVarintLen64])
if read <= 0 {
return nil, fmt.Errorf("failed to read the field "+
"doc values for field %s", field)
}
offset += uint64(read)
fdvIter := &docValueIterator{
curChunkNum: math.MaxUint64,
field: field,
chunkLens: make([]uint64, int(numChunks)),
}
for i := 0; i < int(numChunks); i++ {
clen, read = binary.Uvarint(s.mm[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64])
if read <= 0 {
return nil, fmt.Errorf("corrupted chunk length during segment load")
}
fdvIter.chunkLens[i] = clen
offset += uint64(read)
}
fdvIter.dvDataLoc = fieldDvLoc + offset
return fdvIter, nil
}
func (di *docValueIterator) loadDvChunk(chunkNumber,
localDocNum uint64, s *Segment) error {
// advance to the chunk where the docValues
// reside for the given docID
destChunkDataLoc := di.dvDataLoc
for i := 0; i < int(chunkNumber); i++ {
destChunkDataLoc += di.chunkLens[i]
}
curChunkSize := di.chunkLens[chunkNumber]
// read the number of docs reside in the chunk
numDocs, read := binary.Uvarint(s.mm[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64])
if read <= 0 {
return fmt.Errorf("failed to read the chunk")
}
chunkMetaLoc := destChunkDataLoc + uint64(read)
offset := uint64(0)
di.curChunkHeader = make([]MetaData, int(numDocs))
for i := 0; i < int(numDocs); i++ {
di.curChunkHeader[i].DocID, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(read)
di.curChunkHeader[i].DocDvLoc, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(read)
di.curChunkHeader[i].DocDvLen, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(read)
}
compressedDataLoc := chunkMetaLoc + offset
dataLength := destChunkDataLoc + curChunkSize - compressedDataLoc
di.curChunkData = s.mm[compressedDataLoc : compressedDataLoc+dataLength]
di.curChunkNum = chunkNumber
return nil
}
func (di *docValueIterator) visitDocValues(docID uint64,
visitor index.DocumentFieldTermVisitor) error {
// binary search the term locations for the docID
start, length := di.getDocValueLocs(docID)
if start == math.MaxUint64 || length == math.MaxUint64 {
return nil
}
// uncompress the already loaded data
uncompressed, err := snappy.Decode(nil, di.curChunkData)
if err != nil {
return err
}
// pick the terms for the given docID
uncompressed = uncompressed[start : start+length]
for {
i := bytes.Index(uncompressed, termSeparatorSplitSlice)
if i < 0 {
break
}
visitor(di.field, uncompressed[0:i])
uncompressed = uncompressed[i+1:]
}
return nil
}
func (di *docValueIterator) getDocValueLocs(docID uint64) (uint64, uint64) {
i := sort.Search(len(di.curChunkHeader), func(i int) bool {
return di.curChunkHeader[i].DocID >= docID
})
if i < len(di.curChunkHeader) && di.curChunkHeader[i].DocID == docID {
return di.curChunkHeader[i].DocDvLoc, di.curChunkHeader[i].DocDvLen
}
return math.MaxUint64, math.MaxUint64
}
// VisitDocumentFieldTerms is an implementation of the UnInvertIndex interface
func (s *Segment) VisitDocumentFieldTerms(localDocNum uint64, fields []string,
visitor index.DocumentFieldTermVisitor) error {
fieldID := uint16(0)
ok := true
for _, field := range fields {
if fieldID, ok = s.fieldsMap[field]; !ok {
continue
}
// find the chunkNumber where the docValues are stored
docInChunk := localDocNum / uint64(s.chunkFactor)
if dvIter, exists := s.fieldDvIterMap[fieldID-1]; exists &&
dvIter != nil {
// check if the chunk is already loaded
if docInChunk != dvIter.curChunkNumber() {
err := dvIter.loadDvChunk(docInChunk, localDocNum, s)
if err != nil {
continue
}
}
_ = dvIter.visitDocValues(localDocNum, visitor)
}
}
return nil
}

View File

@ -21,6 +21,7 @@ import (
"fmt"
"math"
"os"
"sort"
"github.com/RoaringBitmap/roaring"
"github.com/Smerity/govarint"
@ -53,6 +54,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
var newDocNums [][]uint64
var storedIndexOffset uint64
fieldDvLocsOffset := uint64(fieldNotUninverted)
var dictLocs []uint64
if newSegDocCount > 0 {
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
@ -61,7 +63,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
return nil, err
}
dictLocs, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap,
dictLocs, fieldDvLocsOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap,
newDocNums, newSegDocCount, chunkFactor, cr)
if err != nil {
return nil, err
@ -77,7 +79,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
}
err = persistFooter(newSegDocCount, storedIndexOffset,
fieldsIndexOffset, chunkFactor, cr)
fieldsIndexOffset, fieldDvLocsOffset, chunkFactor, cr)
if err != nil {
return nil, err
}
@ -126,12 +128,14 @@ func computeNewDocCount(segments []*Segment, drops []*roaring.Bitmap) uint64 {
func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
fieldsInv []string, fieldsMap map[string]uint16, newDocNums [][]uint64,
newSegDocCount uint64, chunkFactor uint32,
w *CountHashWriter) ([]uint64, error) {
w *CountHashWriter) ([]uint64, uint64, error) {
var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64)
var bufLoc []uint64
rv := make([]uint64, len(fieldsInv))
rv1 := make([]uint64, len(fieldsInv))
fieldDvLocs := make([]uint64, len(fieldsInv))
fieldDvLocsOffset := uint64(fieldNotUninverted)
var vellumBuf bytes.Buffer
// for each field
@ -141,23 +145,23 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
}
newVellum, err := vellum.New(&vellumBuf, nil)
if err != nil {
return nil, err
return nil, 0, err
}
// collect FTS iterators from all segments for this field
// collect FST iterators from all segments for this field
var dicts []*Dictionary
var itrs []vellum.Iterator
for _, segment := range segments {
dict, err2 := segment.dictionary(fieldName)
if err2 != nil {
return nil, err2
return nil, 0, err2
}
dicts = append(dicts, dict)
if dict != nil && dict.fst != nil {
itr, err2 := dict.fst.Iterator(nil, nil)
if err2 != nil && err2 != vellum.ErrIteratorDone {
return nil, err2
return nil, 0, err2
}
if itr != nil {
itrs = append(itrs, itr)
@ -173,6 +177,8 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), newSegDocCount-1)
docTermMap := make(map[uint64][]byte, 0)
for err == nil {
term, _ := mergeItr.Current()
@ -189,7 +195,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
}
postings, err2 := dict.postingsList(string(term), drops[dictI])
if err2 != nil {
return nil, err2
return nil, 0, err2
}
postItr := postings.Iterator()
@ -197,7 +203,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
for next != nil && err2 == nil {
hitNewDocNum := newDocNums[dictI][next.Number()]
if hitNewDocNum == docDropped {
return nil, fmt.Errorf("see hit with dropped doc num")
return nil, 0, fmt.Errorf("see hit with dropped doc num")
}
newRoaring.Add(uint32(hitNewDocNum))
// encode norm bits
@ -205,7 +211,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
normBits := math.Float32bits(float32(norm))
err3 := tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits))
if err3 != nil {
return nil, err3
return nil, 0, err3
}
locs := next.Locations()
if len(locs) > 0 {
@ -223,14 +229,17 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
args = append(args, loc.ArrayPositions()...)
err = locEncoder.Add(hitNewDocNum, args...)
if err != nil {
return nil, err
return nil, 0, err
}
}
}
docTermMap[hitNewDocNum] = append(docTermMap[hitNewDocNum], []byte(term)...)
docTermMap[hitNewDocNum] = append(docTermMap[hitNewDocNum], termSeparator)
next, err2 = postItr.Next()
}
if err != nil {
return nil, err
return nil, 0, err
}
}
@ -242,17 +251,17 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
freqOffset := uint64(w.Count())
_, err = tfEncoder.Write(w)
if err != nil {
return nil, err
return nil, 0, err
}
locOffset := uint64(w.Count())
_, err = locEncoder.Write(w)
if err != nil {
return nil, err
return nil, 0, err
}
postingLocOffset := uint64(w.Count())
_, err = writeRoaringWithLen(newRoaringLocs, w)
if err != nil {
return nil, err
return nil, 0, err
}
postingOffset := uint64(w.Count())
// write out the start of the term info
@ -260,43 +269,43 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
n := binary.PutUvarint(buf, freqOffset)
_, err = w.Write(buf[:n])
if err != nil {
return nil, err
return nil, 0, err
}
// write out the start of the loc info
n = binary.PutUvarint(buf, locOffset)
_, err = w.Write(buf[:n])
if err != nil {
return nil, err
return nil, 0, err
}
// write out the start of the loc posting list
n = binary.PutUvarint(buf, postingLocOffset)
_, err = w.Write(buf[:n])
if err != nil {
return nil, err
return nil, 0, err
}
_, err = writeRoaringWithLen(newRoaring, w)
if err != nil {
return nil, err
return nil, 0, err
}
err = newVellum.Insert(term, postingOffset)
if err != nil {
return nil, err
return nil, 0, err
}
}
err = mergeItr.Next()
}
if err != nil && err != vellum.ErrIteratorDone {
return nil, err
return nil, 0, err
}
dictOffset := uint64(w.Count())
err = newVellum.Close()
if err != nil {
return nil, err
return nil, 0, err
}
vellumData := vellumBuf.Bytes()
@ -306,19 +315,54 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
n := binary.PutUvarint(buf, uint64(len(vellumData)))
_, err = w.Write(buf[:n])
if err != nil {
return nil, err
return nil, 0, err
}
// write this vellum to disk
_, err = w.Write(vellumData)
if err != nil {
return nil, err
return nil, 0, err
}
rv[fieldID] = dictOffset
rv1[fieldID] = dictOffset
// update the doc value
var docNumbers docIDRange
for k := range docTermMap {
docNumbers = append(docNumbers, k)
}
sort.Sort(docNumbers)
for _, docNum := range docNumbers {
err = fdvEncoder.Add(docNum, docTermMap[docNum])
if err != nil {
return nil, 0, err
}
}
// get the field doc value offset
fieldDvLocs[fieldID] = uint64(w.Count())
err = fdvEncoder.Close()
if err != nil {
return nil, 0, err
}
// persist the doc value details for this field
_, err = fdvEncoder.Write(w)
if err != nil {
return nil, 0, err
}
}
return rv, nil
fieldDvLocsOffset = uint64(w.Count())
buf := make([]byte, binary.MaxVarintLen64)
for _, offset := range fieldDvLocs {
n := binary.PutUvarint(buf, uint64(offset))
_, err := w.Write(buf[:n])
if err != nil {
return nil, 0, err
}
}
return rv1, fieldDvLocsOffset, nil
}
const docDropped = math.MaxUint64

View File

@ -44,11 +44,12 @@ func Open(path string) (segment.Segment, error) {
}
rv := &Segment{
f: f,
mm: mm,
path: path,
fieldsMap: make(map[string]uint16),
refs: 1,
f: f,
mm: mm,
path: path,
fieldsMap: make(map[string]uint16),
fieldDvIterMap: make(map[uint16]*docValueIterator),
refs: 1,
}
err = rv.loadConfig()
@ -63,6 +64,12 @@ func Open(path string) (segment.Segment, error) {
return nil, err
}
err = rv.loadDvIterators()
if err != nil {
_ = rv.Close()
return nil, err
}
return rv, nil
}
@ -82,6 +89,9 @@ type Segment struct {
fieldsInv []string
fieldsOffsets []uint64
docValueOffset uint64
fieldDvIterMap map[uint16]*docValueIterator // naive chunk cache per field
m sync.Mutex // Protects the fields that follow.
refs int64
}
@ -137,7 +147,11 @@ func (s *Segment) loadConfig() error {
}
chunkOffset := verOffset - 4
s.chunkFactor = binary.BigEndian.Uint32(s.mm[chunkOffset : chunkOffset+4])
fieldsOffset := chunkOffset - 8
docValueOffset := chunkOffset - 8
s.docValueOffset = binary.BigEndian.Uint64(s.mm[docValueOffset : docValueOffset+8])
fieldsOffset := docValueOffset - 8
s.fieldsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsOffset : fieldsOffset+8])
storedOffset := fieldsOffset - 8
s.storedIndexOffset = binary.BigEndian.Uint64(s.mm[storedOffset : storedOffset+8])
@ -359,11 +373,16 @@ func (s *Segment) FieldsIndexOffset() uint64 {
return s.fieldsIndexOffset
}
// StoredIndexOffset returns the stored value index offset in the file foooter
// StoredIndexOffset returns the stored value index offset in the file footer
func (s *Segment) StoredIndexOffset() uint64 {
return s.storedIndexOffset
}
// DocValueOffset returns the docValue offset in the file footer
func (s *Segment) DocValueOffset() uint64 {
return s.docValueOffset
}
// NumDocs returns the number of documents in the file footer
func (s *Segment) NumDocs() uint64 {
return s.numDocs
@ -380,3 +399,20 @@ func (s *Segment) DictAddr(field string) (uint64, error) {
return s.fieldsOffsets[fieldID-1], nil
}
func (s *Segment) loadDvIterators() error {
if s.docValueOffset == fieldNotUninverted {
return nil
}
var read uint64
for fieldID, field := range s.fieldsInv {
fieldLoc, n := binary.Uvarint(s.mm[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64])
if n <= 0 {
return fmt.Errorf("loadDvIterators: failed to read the docvalue offsets for field %d", fieldID)
}
s.fieldDvIterMap[uint16(fieldID)], _ = s.loadFieldDocValueIterator(field, fieldLoc)
read += uint64(n)
}
return nil
}

View File

@ -86,10 +86,10 @@ func persistFields(fieldsInv []string, w *CountHashWriter, dictLocs []uint64) (u
}
// FooterSize is the size of the footer record in bytes
// crc + ver + chunk + field offset + stored offset + num docs
const FooterSize = 4 + 4 + 4 + 8 + 8 + 8
// crc + ver + chunk + field offset + stored offset + num docs + docValueOffset
const FooterSize = 4 + 4 + 4 + 8 + 8 + 8 + 8
func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset uint64,
func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset, docValueOffset uint64,
chunkFactor uint32, w *CountHashWriter) error {
// write out the number of docs
err := binary.Write(w, binary.BigEndian, numDocs)
@ -106,6 +106,11 @@ func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset uint64,
if err != nil {
return err
}
// write out the fieldDocValue location
err = binary.Write(w, binary.BigEndian, docValueOffset)
if err != nil {
return err
}
// write out 32-bit chunk factor
err = binary.Write(w, binary.BigEndian, chunkFactor)
if err != nil {

View File

@ -403,6 +403,11 @@ func (i *IndexSnapshot) DocumentVisitFieldTerms(id index.IndexInternalID,
ss := i.segment[segmentIndex]
if zaps, ok := ss.segment.(segment.DocumentFieldTermVisitable); ok {
return zaps.VisitDocumentFieldTerms(localDocNum, fields, visitor)
}
// else fallback to the in memory fieldCache
err = ss.cachedDocs.prepareFields(fields, ss)
if err != nil {
return err

View File

@ -179,6 +179,7 @@ OUTER:
continue OUTER
}
}
break
}
return current
}

View File

@ -991,3 +991,26 @@ func TestMappingForNilTextMarshaler(t *testing.T) {
}
}
func TestClosestDocDynamicMapping(t *testing.T) {
mapping := NewIndexMapping()
mapping.IndexDynamic = false
mapping.DefaultMapping = NewDocumentStaticMapping()
mapping.DefaultMapping.AddFieldMappingsAt("foo", NewTextFieldMapping())
doc := document.NewDocument("x")
err := mapping.MapDocument(doc, map[string]interface{}{
"foo": "value",
"bar": map[string]string{
"foo": "value2",
"baz": "value3",
},
})
if err != nil {
t.Fatal(err)
}
if len(doc.Fields) != 1 {
t.Fatalf("expected 1 field, got: %d", len(doc.Fields))
}
}