Updated readme, zap version, added new docvalue cmd,
fixed the footer and fields cmd, interface name updated
This commit is contained in:
parent
8abac42796
commit
c8df014c0c
|
@ -118,7 +118,7 @@ func (s *Segment) processDocument(result *index.AnalysisResult) {
|
|||
}
|
||||
// TODO with mapping changes for dv
|
||||
//if field.Options().IncludeDocValues() {
|
||||
s.DocValueFields[fieldID] = true
|
||||
s.DocValueFields = append(s.DocValueFields, fieldID)
|
||||
//}
|
||||
}
|
||||
|
||||
|
|
|
@ -88,16 +88,15 @@ type Segment struct {
|
|||
// docNum -> field id -> slice of array positions (each is []uint64)
|
||||
StoredPos []map[uint16][][]uint64
|
||||
|
||||
// for marking the docValue override status
|
||||
// field id -> status
|
||||
DocValueFields map[uint16]bool
|
||||
// for storing the docValue persisted fields
|
||||
// field id
|
||||
DocValueFields []uint16
|
||||
}
|
||||
|
||||
// New builds a new empty Segment
|
||||
func New() *Segment {
|
||||
return &Segment{
|
||||
FieldsMap: map[string]uint16{},
|
||||
DocValueFields: map[uint16]bool{},
|
||||
FieldsMap: map[string]uint16{},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -87,3 +87,11 @@ type Location interface {
|
|||
Pos() uint64
|
||||
ArrayPositions() []uint64
|
||||
}
|
||||
|
||||
// DocumentFieldTermVisitable is implemented by various scorch segment
|
||||
// implementations to provide the un inverting of the postings
|
||||
// or other indexed values.
|
||||
type DocumentFieldTermVisitable interface {
|
||||
VisitDocumentFieldTerms(localDocNum uint64, fields []string,
|
||||
visitor index.DocumentFieldTermVisitor) error
|
||||
}
|
||||
|
|
|
@ -8,7 +8,7 @@ Current usage:
|
|||
- crc-32 bytes and version are in fixed position at end of the file
|
||||
- reading remainder of footer could be version specific
|
||||
- remainder of footer gives us:
|
||||
- 2 important offsets (fields index and stored data index)
|
||||
- 3 important offsets (docValue , fields index and stored data index)
|
||||
- 2 important values (number of docs and chunk factor)
|
||||
- field data is processed once and memoized onto the heap so that we never have to go back to disk for it
|
||||
- access to stored data by doc number means first navigating to the stored data index, then accessing a fixed position offset into that slice, which gives us the actual address of the data. the first bytes of that section tell us the size of data so that we know where it ends.
|
||||
|
@ -140,12 +140,28 @@ If you know the doc number you're interested in, this format lets you jump to th
|
|||
|
||||
NOTE: currently we don't know or record the length of this fields index. Instead we rely on the fact that we know it immediately precedes a footer of known size.
|
||||
|
||||
## fields DocValue
|
||||
|
||||
- for each field
|
||||
- preparation phase:
|
||||
- produce a slice containing multiple consecutive chunks, where each chunk is composed of a meta section followed by compressed columnar field data
|
||||
- produce a slice remembering the length of each chunk
|
||||
- file writing phase:
|
||||
- remember the start position of this first field DocValue offset in the footer
|
||||
- write out number of chunks that follow (varint uint64)
|
||||
- write out length of each chunk (each a varint uint64)
|
||||
- write out the byte slice containing all the chunk data
|
||||
|
||||
NOTE: currently the meta header inside each chunk gives clue to the location offsets and size of the data pertaining to a given docID and any
|
||||
read operation leverage that meta information to extract the document specific data from the file.
|
||||
|
||||
## footer
|
||||
|
||||
- file writing phase
|
||||
- write number of docs (big endian uint64)
|
||||
- write stored field index location (big endian uint64)
|
||||
- write field index location (big endian uint64)
|
||||
- write field docValue location (big endian uint64)
|
||||
- write out chunk factor (big endian uint32)
|
||||
- write out version (big endian uint32)
|
||||
- write out file CRC of everything preceding this (big endian uint32)
|
||||
|
|
|
@ -28,7 +28,9 @@ import (
|
|||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
const version uint32 = 1
|
||||
const version uint32 = 2
|
||||
|
||||
const fieldNotUninverted = math.MaxUint64
|
||||
|
||||
// PersistSegment takes the in-memory segment and persists it to the specified
|
||||
// path in the zap file format.
|
||||
|
@ -435,15 +437,15 @@ func (a docIDRange) Less(i, j int) bool { return a[i] < a[j] }
|
|||
|
||||
func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
|
||||
chunkFactor uint32) (map[uint16]uint64, error) {
|
||||
fieldChunkOffsets := make(map[uint16]uint64, len(memSegment.DocValueFields))
|
||||
fieldChunkOffsets := make(map[uint16]uint64, len(memSegment.FieldsInv))
|
||||
fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), uint64(len(memSegment.Stored)-1))
|
||||
|
||||
for fieldID := range memSegment.DocValueFields {
|
||||
for _, fieldID := range memSegment.DocValueFields {
|
||||
field := memSegment.FieldsInv[fieldID]
|
||||
docTermMap := make(map[uint64][]byte, 0)
|
||||
dict, err := memSegment.Dictionary(field)
|
||||
if err != nil {
|
||||
return fieldChunkOffsets, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dictItr := dict.Iterator()
|
||||
|
@ -451,7 +453,7 @@ func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
|
|||
for err == nil && next != nil {
|
||||
postings, err1 := dict.PostingsList(next.Term, nil)
|
||||
if err1 != nil {
|
||||
return fieldChunkOffsets, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
postingsItr := postings.Iterator()
|
||||
|
@ -463,14 +465,14 @@ func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
|
|||
nextPosting, err2 = postingsItr.Next()
|
||||
}
|
||||
if err2 != nil {
|
||||
return fieldChunkOffsets, err2
|
||||
return nil, err2
|
||||
}
|
||||
|
||||
next, err = dictItr.Next()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fieldChunkOffsets, err
|
||||
return nil, err
|
||||
}
|
||||
// sort wrt to docIDs
|
||||
var docNumbers docIDRange
|
||||
|
@ -482,16 +484,19 @@ func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
|
|||
for _, docNum := range docNumbers {
|
||||
err = fdvEncoder.Add(docNum, docTermMap[docNum])
|
||||
if err != nil {
|
||||
return fieldChunkOffsets, err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
fieldChunkOffsets[fieldID] = uint64(w.Count())
|
||||
fdvEncoder.Close()
|
||||
err = fdvEncoder.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// persist the doc value details for this field
|
||||
_, err = fdvEncoder.Write(w)
|
||||
if err != nil {
|
||||
return fieldChunkOffsets, err
|
||||
return nil, err
|
||||
}
|
||||
// resetting encoder for the next field
|
||||
fdvEncoder.Reset()
|
||||
|
@ -505,23 +510,23 @@ func persistFieldDocValues(w *CountHashWriter, chunkFactor uint32,
|
|||
|
||||
fieldDvOffsets, err := persistDocValues(memSegment, w, chunkFactor)
|
||||
if err != nil {
|
||||
return math.MaxUint64, err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
fieldDocValuesOffset := uint64(w.Count())
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
offset := uint64(math.MaxUint64)
|
||||
offset := uint64(0)
|
||||
ok := true
|
||||
for fieldID := range memSegment.FieldsInv {
|
||||
// if the field isn't configured for docValue, then mark
|
||||
// the offset accordingly
|
||||
if offset, ok = fieldDvOffsets[uint16(fieldID)]; !ok {
|
||||
offset = math.MaxUint64
|
||||
offset = fieldNotUninverted
|
||||
}
|
||||
n := binary.PutUvarint(buf, uint64(offset))
|
||||
_, err := w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return math.MaxUint64, err
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,224 @@
|
|||
// Copyright (c) 2017 Couchbase, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
||||
"github.com/blevesearch/bleve/index/scorch/segment/zap"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
// docvalueCmd represents the docvalue command
|
||||
var docvalueCmd = &cobra.Command{
|
||||
Use: "docvalue [path] <field> optional <docNum> optional",
|
||||
Short: "docvalue prints the docvalue details by field, and docNum",
|
||||
Long: `The docvalue command lets you explore the docValues in order of field and by doc number.`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
if len(args) < 1 {
|
||||
return fmt.Errorf("must specify index file path")
|
||||
}
|
||||
|
||||
data := segment.Data()
|
||||
crcOffset := len(data) - 4
|
||||
verOffset := crcOffset - 4
|
||||
chunkOffset := verOffset - 4
|
||||
fieldsOffset := chunkOffset - 16
|
||||
fieldsIndexOffset := binary.BigEndian.Uint64(data[fieldsOffset : fieldsOffset+8])
|
||||
fieldsIndexEnd := uint64(len(data) - zap.FooterSize)
|
||||
|
||||
// iterate through fields index
|
||||
var fieldInv []string
|
||||
var id, read, fieldLoc uint64
|
||||
var nread int
|
||||
for fieldsIndexOffset+(8*id) < fieldsIndexEnd {
|
||||
addr := binary.BigEndian.Uint64(data[fieldsIndexOffset+(8*id) : fieldsIndexOffset+(8*id)+8])
|
||||
var n uint64
|
||||
_, read := binary.Uvarint(data[addr+n : fieldsIndexEnd])
|
||||
n += uint64(read)
|
||||
|
||||
var nameLen uint64
|
||||
nameLen, read = binary.Uvarint(data[addr+n : fieldsIndexEnd])
|
||||
n += uint64(read)
|
||||
|
||||
name := string(data[addr+n : addr+n+nameLen])
|
||||
|
||||
id++
|
||||
fieldInv = append(fieldInv, name)
|
||||
}
|
||||
|
||||
dvLoc := segment.DocValueOffset()
|
||||
fieldDvLoc := uint64(0)
|
||||
var fieldName string
|
||||
var fieldID uint16
|
||||
|
||||
// if no fields are specified then print the docValue offsets for all fields set
|
||||
for id, field := range fieldInv {
|
||||
fieldLoc, nread = binary.Uvarint(data[dvLoc+read : dvLoc+read+binary.MaxVarintLen64])
|
||||
if nread <= 0 {
|
||||
return fmt.Errorf("loadDvIterators: failed to read the docvalue offsets for field %d", fieldID)
|
||||
}
|
||||
read += uint64(nread)
|
||||
if len(args) == 1 {
|
||||
// if no field args are given, then print out the dv locations for all fields
|
||||
fmt.Printf("fieldID: %d '%s' docvalue at %d (%x)\n", id, field, fieldLoc, fieldLoc)
|
||||
continue
|
||||
}
|
||||
|
||||
if field != args[1] {
|
||||
continue
|
||||
} else {
|
||||
fieldDvLoc = fieldLoc
|
||||
fieldName = field
|
||||
fieldID = uint16(id)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// done with the fields dv locs printing for the given zap file
|
||||
if len(args) == 1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if fieldName == "" || fieldDvLoc == 0 {
|
||||
return fmt.Errorf("no field found for given field arg: %s", args[1])
|
||||
}
|
||||
|
||||
// read the number of chunks
|
||||
var offset, clen, numChunks uint64
|
||||
numChunks, nread = binary.Uvarint(data[fieldDvLoc : fieldDvLoc+binary.MaxVarintLen64])
|
||||
if nread <= 0 {
|
||||
return fmt.Errorf("failed to read the field "+
|
||||
"doc values for field %s", fieldName)
|
||||
}
|
||||
offset += uint64(nread)
|
||||
|
||||
if len(args) == 2 {
|
||||
fmt.Printf("number of chunks: %d\n", numChunks)
|
||||
}
|
||||
|
||||
// read the length of chunks
|
||||
chunkLens := make([]uint64, numChunks)
|
||||
for i := 0; i < int(numChunks); i++ {
|
||||
clen, nread = binary.Uvarint(data[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64])
|
||||
if nread <= 0 {
|
||||
return fmt.Errorf("corrupted chunk length for chunk number: %d", i)
|
||||
}
|
||||
|
||||
chunkLens[i] = clen
|
||||
offset += uint64(nread)
|
||||
if len(args) == 2 {
|
||||
fmt.Printf("chunk: %d size: %d \n", i, clen)
|
||||
}
|
||||
}
|
||||
|
||||
if len(args) == 2 {
|
||||
return nil
|
||||
}
|
||||
|
||||
localDocNum, err := strconv.Atoi(args[2])
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to parse doc number: %v", err)
|
||||
}
|
||||
|
||||
if localDocNum >= int(segment.NumDocs()) {
|
||||
return fmt.Errorf("invalid doc number %d (valid 0 - %d)", localDocNum, segment.NumDocs()-1)
|
||||
}
|
||||
|
||||
// find the chunkNumber where the docValues are stored
|
||||
docInChunk := uint64(localDocNum) / uint64(segment.ChunkFactor())
|
||||
|
||||
if numChunks < docInChunk {
|
||||
return fmt.Errorf("no chunk exists for chunk number: %d for docID: %d", docInChunk, localDocNum)
|
||||
}
|
||||
|
||||
destChunkDataLoc := fieldDvLoc + offset
|
||||
for i := 0; i < int(docInChunk); i++ {
|
||||
destChunkDataLoc += chunkLens[i]
|
||||
}
|
||||
curChunkSize := chunkLens[docInChunk]
|
||||
|
||||
// read the number of docs reside in the chunk
|
||||
numDocs := uint64(0)
|
||||
numDocs, nread = binary.Uvarint(data[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64])
|
||||
if nread <= 0 {
|
||||
return fmt.Errorf("failed to read the target chunk: %d", docInChunk)
|
||||
}
|
||||
chunkMetaLoc := destChunkDataLoc + uint64(nread)
|
||||
|
||||
offset = uint64(0)
|
||||
curChunkHeader := make([]zap.MetaData, int(numDocs))
|
||||
for i := 0; i < int(numDocs); i++ {
|
||||
curChunkHeader[i].DocID, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
|
||||
offset += uint64(nread)
|
||||
curChunkHeader[i].DocDvLoc, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
|
||||
offset += uint64(nread)
|
||||
curChunkHeader[i].DocDvLen, nread = binary.Uvarint(data[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
|
||||
offset += uint64(nread)
|
||||
}
|
||||
|
||||
compressedDataLoc := chunkMetaLoc + offset
|
||||
dataLength := destChunkDataLoc + curChunkSize - compressedDataLoc
|
||||
curChunkData := data[compressedDataLoc : compressedDataLoc+dataLength]
|
||||
|
||||
start, length := getDocValueLocs(uint64(localDocNum), curChunkHeader)
|
||||
if start == math.MaxUint64 || length == math.MaxUint64 {
|
||||
return nil
|
||||
}
|
||||
// uncompress the already loaded data
|
||||
uncompressed, err := snappy.Decode(nil, curChunkData)
|
||||
if err != nil {
|
||||
log.Printf("snappy err %+v ", err)
|
||||
return err
|
||||
}
|
||||
|
||||
var termSeparator byte = 0xff
|
||||
var termSeparatorSplitSlice = []byte{termSeparator}
|
||||
// pick the terms for the given docID
|
||||
uncompressed = uncompressed[start : start+length]
|
||||
for {
|
||||
i := bytes.Index(uncompressed, termSeparatorSplitSlice)
|
||||
if i < 0 {
|
||||
break
|
||||
}
|
||||
|
||||
fmt.Printf(" %s ", uncompressed[0:i])
|
||||
uncompressed = uncompressed[i+1:]
|
||||
}
|
||||
fmt.Printf(" \n ")
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func getDocValueLocs(docID uint64, metaHeader []zap.MetaData) (uint64, uint64) {
|
||||
i := sort.Search(len(metaHeader), func(i int) bool {
|
||||
return metaHeader[i].DocID >= docID
|
||||
})
|
||||
if i < len(metaHeader) && metaHeader[i].DocID == docID {
|
||||
return metaHeader[i].DocDvLoc, metaHeader[i].DocDvLen
|
||||
}
|
||||
return math.MaxUint64, math.MaxUint64
|
||||
}
|
||||
|
||||
func init() {
|
||||
RootCmd.AddCommand(docvalueCmd)
|
||||
}
|
|
@ -34,7 +34,7 @@ var fieldsCmd = &cobra.Command{
|
|||
crcOffset := len(data) - 4
|
||||
verOffset := crcOffset - 4
|
||||
chunkOffset := verOffset - 4
|
||||
fieldsOffset := chunkOffset - 8
|
||||
fieldsOffset := chunkOffset - 16
|
||||
fieldsIndexOffset := binary.BigEndian.Uint64(data[fieldsOffset : fieldsOffset+8])
|
||||
fieldsIndexEnd := uint64(len(data) - zap.FooterSize)
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ var footerCmd = &cobra.Command{
|
|||
fmt.Printf("Chunk Factor: %d\n", segment.ChunkFactor())
|
||||
fmt.Printf("Fields Idx: %d (%#x)\n", segment.FieldsIndexOffset(), segment.FieldsIndexOffset())
|
||||
fmt.Printf("Stored Idx: %d (%#x)\n", segment.StoredIndexOffset(), segment.StoredIndexOffset())
|
||||
fmt.Printf("DocValue Idx: %d (%#x)\n", segment.DocValueOffset(), segment.DocValueOffset())
|
||||
fmt.Printf("Num Docs: %d\n", segment.NumDocs())
|
||||
return nil
|
||||
},
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
package zap
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
|
@ -33,15 +34,15 @@ type chunkedContentCoder struct {
|
|||
chunkMetaBuf bytes.Buffer
|
||||
chunkBuf bytes.Buffer
|
||||
|
||||
chunkMeta []metaData
|
||||
chunkMeta []MetaData
|
||||
}
|
||||
|
||||
// metaData represents the data information inside a
|
||||
// MetaData represents the data information inside a
|
||||
// chunk.
|
||||
type metaData struct {
|
||||
docID uint64 // docid of the data inside the chunk
|
||||
docDvLoc uint64 // starting offset for a given docid
|
||||
docDvLen uint64 // length of data inside the chunk for the given docid
|
||||
type MetaData struct {
|
||||
DocID uint64 // docid of the data inside the chunk
|
||||
DocDvLoc uint64 // starting offset for a given docid
|
||||
DocDvLen uint64 // length of data inside the chunk for the given docid
|
||||
}
|
||||
|
||||
// newChunkedContentCoder returns a new chunk content coder which
|
||||
|
@ -52,7 +53,7 @@ func newChunkedContentCoder(chunkSize uint64,
|
|||
rv := &chunkedContentCoder{
|
||||
chunkSize: chunkSize,
|
||||
chunkLens: make([]uint64, total),
|
||||
chunkMeta: []metaData{},
|
||||
chunkMeta: []MetaData{},
|
||||
}
|
||||
|
||||
return rv
|
||||
|
@ -68,13 +69,13 @@ func (c *chunkedContentCoder) Reset() {
|
|||
for i := range c.chunkLens {
|
||||
c.chunkLens[i] = 0
|
||||
}
|
||||
c.chunkMeta = []metaData{}
|
||||
c.chunkMeta = []MetaData{}
|
||||
}
|
||||
|
||||
// Close indicates you are done calling Add() this allows
|
||||
// the final chunk to be encoded.
|
||||
func (c *chunkedContentCoder) Close() {
|
||||
_ = c.flushContents()
|
||||
func (c *chunkedContentCoder) Close() error {
|
||||
return c.flushContents()
|
||||
}
|
||||
|
||||
func (c *chunkedContentCoder) flushContents() error {
|
||||
|
@ -86,26 +87,17 @@ func (c *chunkedContentCoder) flushContents() error {
|
|||
return err
|
||||
}
|
||||
|
||||
w := bufio.NewWriter(&c.chunkMetaBuf)
|
||||
// write out the metaData slice
|
||||
for _, meta := range c.chunkMeta {
|
||||
n := binary.PutUvarint(buf, meta.docID)
|
||||
_, err = c.chunkMetaBuf.Write(buf[:n])
|
||||
_, err := writeUvarints(w, meta.DocID, meta.DocDvLoc, meta.DocDvLen)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n = binary.PutUvarint(buf, meta.docDvLoc)
|
||||
_, err = c.chunkMetaBuf.Write(buf[:n])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n = binary.PutUvarint(buf, meta.docDvLen)
|
||||
_, err = c.chunkMetaBuf.Write(buf[:n])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
err = w.Flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// write the metadata to final data
|
||||
|
@ -132,7 +124,7 @@ func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error {
|
|||
// clearing the chunk specific meta for next chunk
|
||||
c.chunkBuf.Reset()
|
||||
c.chunkMetaBuf.Reset()
|
||||
c.chunkMeta = []metaData{}
|
||||
c.chunkMeta = []MetaData{}
|
||||
c.currChunk = chunk
|
||||
}
|
||||
|
||||
|
@ -143,10 +135,10 @@ func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error {
|
|||
return err
|
||||
}
|
||||
|
||||
c.chunkMeta = append(c.chunkMeta, metaData{
|
||||
docID: docNum,
|
||||
docDvLoc: uint64(dvOffset),
|
||||
docDvLen: uint64(dvSize),
|
||||
c.chunkMeta = append(c.chunkMeta, MetaData{
|
||||
DocID: docNum,
|
||||
DocDvLoc: uint64(dvOffset),
|
||||
DocDvLen: uint64(dvSize),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -61,7 +61,7 @@ func TestChunkContentCoder(t *testing.T) {
|
|||
t.Fatalf("error adding to intcoder: %v", err)
|
||||
}
|
||||
}
|
||||
cic.Close()
|
||||
_ = cic.Close()
|
||||
var actual bytes.Buffer
|
||||
_, err := cic.Write(&actual)
|
||||
if err != nil {
|
||||
|
|
|
@ -31,7 +31,7 @@ type docValueIterator struct {
|
|||
numChunks uint64
|
||||
chunkLens []uint64
|
||||
dvDataLoc uint64
|
||||
curChunkHeader []metaData
|
||||
curChunkHeader []MetaData
|
||||
curChunkData []byte // compressed data cache
|
||||
}
|
||||
|
||||
|
@ -96,13 +96,13 @@ func (di *docValueIterator) loadDvChunk(chunkNumber,
|
|||
chunkMetaLoc := destChunkDataLoc + uint64(read)
|
||||
|
||||
offset := uint64(0)
|
||||
di.curChunkHeader = make([]metaData, int(numDocs))
|
||||
di.curChunkHeader = make([]MetaData, int(numDocs))
|
||||
for i := 0; i < int(numDocs); i++ {
|
||||
di.curChunkHeader[i].docID, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
|
||||
di.curChunkHeader[i].DocID, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
|
||||
offset += uint64(read)
|
||||
di.curChunkHeader[i].docDvLoc, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
|
||||
di.curChunkHeader[i].DocDvLoc, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
|
||||
offset += uint64(read)
|
||||
di.curChunkHeader[i].docDvLen, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
|
||||
di.curChunkHeader[i].DocDvLen, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
|
||||
offset += uint64(read)
|
||||
}
|
||||
|
||||
|
@ -143,10 +143,10 @@ func (di *docValueIterator) visitDocValues(docID uint64,
|
|||
|
||||
func (di *docValueIterator) getDocValueLocs(docID uint64) (uint64, uint64) {
|
||||
i := sort.Search(len(di.curChunkHeader), func(i int) bool {
|
||||
return di.curChunkHeader[i].docID >= docID
|
||||
return di.curChunkHeader[i].DocID >= docID
|
||||
})
|
||||
if i < len(di.curChunkHeader) && di.curChunkHeader[i].docID == docID {
|
||||
return di.curChunkHeader[i].docDvLoc, di.curChunkHeader[i].docDvLen
|
||||
if i < len(di.curChunkHeader) && di.curChunkHeader[i].DocID == docID {
|
||||
return di.curChunkHeader[i].DocDvLoc, di.curChunkHeader[i].DocDvLen
|
||||
}
|
||||
return math.MaxUint64, math.MaxUint64
|
||||
}
|
||||
|
|
|
@ -134,7 +134,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
|
||||
rv1 := make([]uint64, len(fieldsInv))
|
||||
fieldDvLocs := make([]uint64, len(fieldsInv))
|
||||
fieldDvLocsOffset := uint64(math.MaxUint64)
|
||||
fieldDvLocsOffset := uint64(fieldNotUninverted)
|
||||
|
||||
var vellumBuf bytes.Buffer
|
||||
// for each field
|
||||
|
@ -144,7 +144,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
}
|
||||
newVellum, err := vellum.New(&vellumBuf, nil)
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// collect FST iterators from all segments for this field
|
||||
|
@ -153,14 +153,14 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
for _, segment := range segments {
|
||||
dict, err2 := segment.dictionary(fieldName)
|
||||
if err2 != nil {
|
||||
return nil, fieldDvLocsOffset, err2
|
||||
return nil, 0, err2
|
||||
}
|
||||
dicts = append(dicts, dict)
|
||||
|
||||
if dict != nil && dict.fst != nil {
|
||||
itr, err2 := dict.fst.Iterator(nil, nil)
|
||||
if err2 != nil && err2 != vellum.ErrIteratorDone {
|
||||
return nil, fieldDvLocsOffset, err2
|
||||
return nil, 0, err2
|
||||
}
|
||||
if itr != nil {
|
||||
itrs = append(itrs, itr)
|
||||
|
@ -194,7 +194,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
}
|
||||
postings, err2 := dict.postingsList(string(term), drops[dictI])
|
||||
if err2 != nil {
|
||||
return nil, fieldDvLocsOffset, err2
|
||||
return nil, 0, err2
|
||||
}
|
||||
|
||||
postItr := postings.Iterator()
|
||||
|
@ -202,7 +202,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
for next != nil && err2 == nil {
|
||||
hitNewDocNum := newDocNums[dictI][next.Number()]
|
||||
if hitNewDocNum == docDropped {
|
||||
return nil, fieldDvLocsOffset, fmt.Errorf("see hit with dropped doc num")
|
||||
return nil, 0, fmt.Errorf("see hit with dropped doc num")
|
||||
}
|
||||
newRoaring.Add(uint32(hitNewDocNum))
|
||||
// encode norm bits
|
||||
|
@ -210,7 +210,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
normBits := math.Float32bits(float32(norm))
|
||||
err3 := tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits))
|
||||
if err3 != nil {
|
||||
return nil, fieldDvLocsOffset, err3
|
||||
return nil, 0, err3
|
||||
}
|
||||
locs := next.Locations()
|
||||
if len(locs) > 0 {
|
||||
|
@ -228,7 +228,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
args = append(args, loc.ArrayPositions()...)
|
||||
err = locEncoder.Add(hitNewDocNum, args...)
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -238,7 +238,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
next, err2 = postItr.Next()
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -250,17 +250,17 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
freqOffset := uint64(w.Count())
|
||||
_, err = tfEncoder.Write(w)
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
locOffset := uint64(w.Count())
|
||||
_, err = locEncoder.Write(w)
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
postingLocOffset := uint64(w.Count())
|
||||
_, err = writeRoaringWithLen(newRoaringLocs, w)
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
postingOffset := uint64(w.Count())
|
||||
// write out the start of the term info
|
||||
|
@ -268,43 +268,43 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
n := binary.PutUvarint(buf, freqOffset)
|
||||
_, err = w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// write out the start of the loc info
|
||||
n = binary.PutUvarint(buf, locOffset)
|
||||
_, err = w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// write out the start of the loc posting list
|
||||
n = binary.PutUvarint(buf, postingLocOffset)
|
||||
_, err = w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
_, err = writeRoaringWithLen(newRoaring, w)
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
err = newVellum.Insert(term, postingOffset)
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
|
||||
err = mergeItr.Next()
|
||||
}
|
||||
if err != nil && err != vellum.ErrIteratorDone {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
dictOffset := uint64(w.Count())
|
||||
err = newVellum.Close()
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
vellumData := vellumBuf.Bytes()
|
||||
|
||||
|
@ -314,18 +314,18 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
n := binary.PutUvarint(buf, uint64(len(vellumData)))
|
||||
_, err = w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// write this vellum to disk
|
||||
_, err = w.Write(vellumData)
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
rv1[fieldID] = dictOffset
|
||||
|
||||
// update teh doc value
|
||||
// update the doc value
|
||||
var docNumbers docIDRange
|
||||
for k := range docTermMap {
|
||||
docNumbers = append(docNumbers, k)
|
||||
|
@ -335,16 +335,19 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
for _, docNum := range docNumbers {
|
||||
err = fdvEncoder.Add(docNum, docTermMap[docNum])
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
// get the field doc value offset
|
||||
fieldDvLocs[fieldID] = uint64(w.Count())
|
||||
fdvEncoder.Close()
|
||||
err = fdvEncoder.Close()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
// persist the doc value details for this field
|
||||
_, err = fdvEncoder.Write(w)
|
||||
if err != nil {
|
||||
return nil, fieldDvLocsOffset, err
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -354,7 +357,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
|
|||
n := binary.PutUvarint(buf, uint64(offset))
|
||||
_, err := w.Write(buf[:n])
|
||||
if err != nil {
|
||||
return nil, math.MaxUint64, err
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@ import (
|
|||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
|
@ -349,11 +348,16 @@ func (s *Segment) FieldsIndexOffset() uint64 {
|
|||
return s.fieldsIndexOffset
|
||||
}
|
||||
|
||||
// StoredIndexOffset returns the stored value index offset in the file foooter
|
||||
// StoredIndexOffset returns the stored value index offset in the file footer
|
||||
func (s *Segment) StoredIndexOffset() uint64 {
|
||||
return s.storedIndexOffset
|
||||
}
|
||||
|
||||
// DocValueOffset returns the docValue offset in the file footer
|
||||
func (s *Segment) DocValueOffset() uint64 {
|
||||
return s.docValueOffset
|
||||
}
|
||||
|
||||
// NumDocs returns the number of documents in the file footer
|
||||
func (s *Segment) NumDocs() uint64 {
|
||||
return s.numDocs
|
||||
|
@ -372,7 +376,7 @@ func (s *Segment) DictAddr(field string) (uint64, error) {
|
|||
}
|
||||
|
||||
func (s *Segment) loadDvIterators() error {
|
||||
if s.docValueOffset == math.MaxUint64 || s.docValueOffset == 0 {
|
||||
if s.docValueOffset == fieldNotUninverted || s.docValueOffset == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -22,9 +22,6 @@ import (
|
|||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/blevesearch/bleve/index/scorch/segment/mem"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment/zap"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/blevesearch/bleve/document"
|
||||
"github.com/blevesearch/bleve/index"
|
||||
|
@ -404,35 +401,29 @@ func (i *IndexSnapshot) DocumentVisitFieldTerms(id index.IndexInternalID,
|
|||
|
||||
ss := i.segment[segmentIndex]
|
||||
|
||||
switch seg := ss.segment.(type) {
|
||||
case *mem.Segment:
|
||||
err = ss.cachedDocs.prepareFields(fields, ss)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if zaps, ok := ss.segment.(segment.DocumentFieldTermVisitable); ok {
|
||||
return zaps.VisitDocumentFieldTerms(localDocNum, fields, visitor)
|
||||
}
|
||||
|
||||
for _, field := range fields {
|
||||
if cachedFieldDocs, exists := ss.cachedDocs.cache[field]; exists {
|
||||
if tlist, exists := cachedFieldDocs.docs[localDocNum]; exists {
|
||||
for {
|
||||
i := bytes.Index(tlist, TermSeparatorSplitSlice)
|
||||
if i < 0 {
|
||||
break
|
||||
}
|
||||
visitor(field, tlist[0:i])
|
||||
tlist = tlist[i+1:]
|
||||
// else fallback to the in memory fieldCache
|
||||
err = ss.cachedDocs.prepareFields(fields, ss)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, field := range fields {
|
||||
if cachedFieldDocs, exists := ss.cachedDocs.cache[field]; exists {
|
||||
if tlist, exists := cachedFieldDocs.docs[localDocNum]; exists {
|
||||
for {
|
||||
i := bytes.Index(tlist, TermSeparatorSplitSlice)
|
||||
if i < 0 {
|
||||
break
|
||||
}
|
||||
visitor(field, tlist[0:i])
|
||||
tlist = tlist[i+1:]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case *zap.Segment:
|
||||
if zaps, ok := ss.segment.(UnInvertIndex); ok {
|
||||
return zaps.VisitDocumentFieldTerms(localDocNum, fields, visitor)
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("snapshot_index: DocumentVisitFieldTerms, unknown segment type: %T", seg)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue