0
0
Fork 0

scorch zap in-memory segment representation (SegmentBase)

The zap SegmentBase struct is a refactoring of the zap Segment into
the subset of fields that are needed for read-only ops, without any
persistence related info.  This allows us to use zap's optimized data
encoding as scorch's in-memory segments.

The zap Segment struct now embeds a zap SegmentBase struct, and layers
on persistence.  Both the zap Segment and zap SegmentBase implement
scorch's Segment interface.
This commit is contained in:
Steve Yen 2018-01-17 18:46:57 -08:00
parent dc62324e02
commit 5a035dc9aa
14 changed files with 339 additions and 217 deletions

View File

@ -28,11 +28,12 @@ import (
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
"github.com/blevesearch/bleve/index/scorch/segment/zap"
"github.com/boltdb/bolt"
)
var DefaultChunkFactor uint32 = 1024
type notificationChan chan struct{}
func (s *Scorch) persisterLoop() {
@ -178,11 +179,11 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
return err2
}
switch seg := segmentSnapshot.segment.(type) {
case *mem.Segment:
case *zap.SegmentBase:
// need to persist this to disk
filename := zapFileName(segmentSnapshot.id)
path := s.path + string(os.PathSeparator) + filename
err2 := zap.PersistSegment(seg, path, 1024)
err2 := zap.PersistSegmentBase(seg, path)
if err2 != nil {
return fmt.Errorf("error persisting segment: %v", err2)
}

View File

@ -28,6 +28,7 @@ import (
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
"github.com/blevesearch/bleve/index/scorch/segment/zap"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/registry"
"github.com/boltdb/bolt"
@ -217,7 +218,7 @@ func (s *Scorch) Delete(id string) error {
}
// Batch applices a batch of changes to the index atomically
func (s *Scorch) Batch(batch *index.Batch) error {
func (s *Scorch) Batch(batch *index.Batch) (err error) {
start := time.Now()
defer func() {
@ -271,10 +272,13 @@ func (s *Scorch) Batch(batch *index.Batch) error {
var newSegment segment.Segment
if len(analysisResults) > 0 {
newSegment = mem.NewFromAnalyzedDocs(analysisResults)
newSegment, err = zap.NewSegmentBase(mem.NewFromAnalyzedDocs(analysisResults), DefaultChunkFactor)
if err != nil {
return err
}
}
err := s.prepareSegment(newSegment, ids, batch.InternalOps)
err = s.prepareSegment(newSegment, ids, batch.InternalOps)
if err != nil {
if newSegment != nil {
_ = newSegment.Close()

View File

@ -1395,7 +1395,7 @@ func TestConcurrentUpdate(t *testing.T) {
// do some concurrent updates
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
doc := document.NewDocument("1")

View File

@ -267,15 +267,15 @@ func (s *Segment) processDocument(result *index.AnalysisResult) {
}
func (s *Segment) getOrDefineField(name string) int {
fieldID, ok := s.FieldsMap[name]
fieldIDPlus1, ok := s.FieldsMap[name]
if !ok {
fieldID = uint16(len(s.FieldsInv) + 1)
s.FieldsMap[name] = fieldID
fieldIDPlus1 = uint16(len(s.FieldsInv) + 1)
s.FieldsMap[name] = fieldIDPlus1
s.FieldsInv = append(s.FieldsInv, name)
s.Dicts = append(s.Dicts, make(map[string]uint64))
s.DictKeys = append(s.DictKeys, make([]string, 0))
}
return int(fieldID - 1)
return int(fieldIDPlus1 - 1)
}
func (s *Segment) addDocument() int {

View File

@ -40,35 +40,38 @@ const idFieldID uint16 = 0
// Segment is an in memory implementation of scorch.Segment
type Segment struct {
// FieldsMap name -> id+1
// FieldsMap adds 1 to field id to avoid zero value issues
// name -> field id + 1
FieldsMap map[string]uint16
// fields id -> name
// FieldsInv is the inverse of FieldsMap
// field id -> name
FieldsInv []string
// term dictionary
// Term dictionaries for each field
// field id -> term -> postings list id + 1
Dicts []map[string]uint64
// term dictionary keys
// field id -> []dictionary keys
// Terms for each field, where terms are sorted ascending
// field id -> []term
DictKeys [][]string
// Postings list
// postings list id -> Postings bitmap
// postings list id -> bitmap by docNum
Postings []*roaring.Bitmap
// Postings List has locations
// Postings list has locations
PostingsLocs []*roaring.Bitmap
// term frequencies
// Term frequencies
// postings list id -> Freqs (one for each hit in bitmap)
Freqs [][]uint64
// field Norms
// Field norms
// postings list id -> Norms (one for each hit in bitmap)
Norms [][]float32
// field/start/end/pos/locarraypos
// Field/start/end/pos/locarraypos
// postings list id -> start/end/pos/locarraypos (one for each freq)
Locfields [][]uint16
Locstarts [][]uint64
@ -80,18 +83,18 @@ type Segment struct {
// docNum -> field id -> slice of values (each value []byte)
Stored []map[uint16][][]byte
// stored field types
// Stored field types
// docNum -> field id -> slice of types (each type byte)
StoredTypes []map[uint16][]byte
// stored field array positions
// Stored field array positions
// docNum -> field id -> slice of array positions (each is []uint64)
StoredPos []map[uint16][][]uint64
// for storing the docValue persisted fields
// For storing the docValue persisted fields
DocValueFields map[uint16]bool
// footprint of the segment, updated when analyzed document mutations
// Footprint of the segment, updated when analyzed document mutations
// are added into the segment
sizeInBytes uint64
}

View File

@ -32,10 +32,8 @@ const version uint32 = 2
const fieldNotUninverted = math.MaxUint64
// PersistSegment takes the in-memory segment and persists it to the specified
// path in the zap file format.
func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (err error) {
// PersistSegmentBase persists SegmentBase in the zap file format.
func PersistSegmentBase(sb *SegmentBase, path string) error {
flag := os.O_RDWR | os.O_CREATE
f, err := os.OpenFile(path, flag, 0600)
@ -43,84 +41,151 @@ func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (e
return err
}
cleanup := func() {
_ = f.Close()
_ = os.Remove(path)
}
br := bufio.NewWriter(f)
_, err = br.Write(sb.mem)
if err != nil {
cleanup()
return err
}
err = persistFooter(sb.numDocs, sb.storedIndexOffset, sb.fieldsIndexOffset, sb.docValueOffset,
sb.chunkFactor, sb.memCRC, br)
if err != nil {
cleanup()
return err
}
err = br.Flush()
if err != nil {
cleanup()
return err
}
err = f.Sync()
if err != nil {
cleanup()
return err
}
err = f.Close()
if err != nil {
cleanup()
return err
}
return nil
}
// PersistSegment takes the in-memory segment and persists it to
// the specified path in the zap file format.
func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) error {
flag := os.O_RDWR | os.O_CREATE
f, err := os.OpenFile(path, flag, 0600)
if err != nil {
return err
}
cleanup := func() {
_ = f.Close()
_ = os.Remove(path)
}
// buffer the output
br := bufio.NewWriter(f)
// wrap it for counting (tracking offsets)
cr := NewCountHashWriter(br)
var storedIndexOffset uint64
var dictLocs []uint64
docValueOffset := uint64(fieldNotUninverted)
if len(memSegment.Stored) > 0 {
storedIndexOffset, err = persistStored(memSegment, cr)
if err != nil {
return err
}
var freqOffsets, locOffsets []uint64
freqOffsets, locOffsets, err = persistPostingDetails(memSegment, cr, chunkFactor)
if err != nil {
return err
}
var postingsListLocs []uint64
postingsListLocs, err = persistPostingsLocs(memSegment, cr)
if err != nil {
return err
}
var postingsLocs []uint64
postingsLocs, err = persistPostingsLists(memSegment, cr, postingsListLocs, freqOffsets, locOffsets)
if err != nil {
return err
}
dictLocs, err = persistDictionary(memSegment, cr, postingsLocs)
if err != nil {
return err
}
docValueOffset, err = persistFieldDocValues(cr, chunkFactor, memSegment)
if err != nil {
return err
}
} else {
dictLocs = make([]uint64, len(memSegment.FieldsInv))
}
var fieldIndexStart uint64
fieldIndexStart, err = persistFields(memSegment.FieldsInv, cr, dictLocs)
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, _, err :=
persistBase(memSegment, cr, chunkFactor)
if err != nil {
cleanup()
return err
}
err = persistFooter(uint64(len(memSegment.Stored)), storedIndexOffset,
fieldIndexStart, docValueOffset, chunkFactor, cr)
err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset,
chunkFactor, cr.Sum32(), cr)
if err != nil {
cleanup()
return err
}
err = br.Flush()
if err != nil {
cleanup()
return err
}
err = f.Sync()
if err != nil {
cleanup()
return err
}
err = f.Close()
if err != nil {
cleanup()
return err
}
return nil
}
func persistBase(memSegment *mem.Segment, cr *CountHashWriter, chunkFactor uint32) (
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64,
dictLocs []uint64, err error) {
docValueOffset = uint64(fieldNotUninverted)
if len(memSegment.Stored) > 0 {
storedIndexOffset, err = persistStored(memSegment, cr)
if err != nil {
return 0, 0, 0, 0, nil, err
}
freqOffsets, locOffsets, err := persistPostingDetails(memSegment, cr, chunkFactor)
if err != nil {
return 0, 0, 0, 0, nil, err
}
postingsListLocs, err := persistPostingsLocs(memSegment, cr)
if err != nil {
return 0, 0, 0, 0, nil, err
}
postingsLocs, err := persistPostingsLists(memSegment, cr, postingsListLocs, freqOffsets, locOffsets)
if err != nil {
return 0, 0, 0, 0, nil, err
}
dictLocs, err = persistDictionary(memSegment, cr, postingsLocs)
if err != nil {
return 0, 0, 0, 0, nil, err
}
docValueOffset, err = persistFieldDocValues(memSegment, cr, chunkFactor)
if err != nil {
return 0, 0, 0, 0, nil, err
}
} else {
dictLocs = make([]uint64, len(memSegment.FieldsInv))
}
fieldsIndexOffset, err = persistFields(memSegment.FieldsInv, cr, dictLocs)
if err != nil {
return 0, 0, 0, 0, nil, err
}
return uint64(len(memSegment.Stored)), storedIndexOffset, fieldsIndexOffset, docValueOffset,
dictLocs, nil
}
func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) {
var curr int
@ -394,6 +459,8 @@ func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter,
func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs []uint64) ([]uint64, error) {
rv := make([]uint64, 0, len(memSegment.DictKeys))
varintBuf := make([]byte, binary.MaxVarintLen64)
var buffer bytes.Buffer
for fieldID, fieldTerms := range memSegment.DictKeys {
if fieldID != 0 {
@ -427,10 +494,8 @@ func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs
vellumData := buffer.Bytes()
// write out the length of the vellum data
buf := make([]byte, binary.MaxVarintLen64)
// write out the number of chunks
n := binary.PutUvarint(buf, uint64(len(vellumData)))
_, err = w.Write(buf[:n])
n := binary.PutUvarint(varintBuf, uint64(len(vellumData)))
_, err = w.Write(varintBuf[:n])
if err != nil {
return nil, err
}
@ -521,9 +586,8 @@ func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
return fieldChunkOffsets, nil
}
func persistFieldDocValues(w *CountHashWriter, chunkFactor uint32,
memSegment *mem.Segment) (uint64, error) {
func persistFieldDocValues(memSegment *mem.Segment, w *CountHashWriter,
chunkFactor uint32) (uint64, error) {
fieldDvOffsets, err := persistDocValues(memSegment, w, chunkFactor)
if err != nil {
return 0, err
@ -548,3 +612,36 @@ func persistFieldDocValues(w *CountHashWriter, chunkFactor uint32,
return fieldDocValuesOffset, nil
}
func NewSegmentBase(memSegment *mem.Segment, chunkFactor uint32) (*SegmentBase, error) {
var br bytes.Buffer
cr := NewCountHashWriter(&br)
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs, err :=
persistBase(memSegment, cr, chunkFactor)
if err != nil {
return nil, err
}
sb := &SegmentBase{
mem: br.Bytes(),
memCRC: cr.Sum32(),
chunkFactor: chunkFactor,
fieldsMap: memSegment.FieldsMap,
fieldsInv: memSegment.FieldsInv,
numDocs: numDocs,
storedIndexOffset: storedIndexOffset,
fieldsIndexOffset: fieldsIndexOffset,
docValueOffset: docValueOffset,
dictLocs: dictLocs,
fieldDvIterMap: make(map[uint16]*docValueIterator),
}
err = sb.loadDvIterators()
if err != nil {
return nil, err
}
return sb, nil
}

View File

@ -15,32 +15,28 @@
package zap
import (
"hash"
"hash/crc32"
"io"
)
// CountHashWriter is a wrapper around a Writer which counts the number of
// bytes which have been written
// bytes which have been written and computes a crc32 hash
type CountHashWriter struct {
w io.Writer
h hash.Hash32
n int
w io.Writer
crc uint32
n int
}
// NewCountHashWriter returns a CountHashWriter which wraps the provided Writer
func NewCountHashWriter(w io.Writer) *CountHashWriter {
return &CountHashWriter{
w: w,
h: crc32.NewIEEE(),
}
return &CountHashWriter{w: w}
}
// Write writes the provided bytes to the wrapped writer and counts the bytes
func (c *CountHashWriter) Write(b []byte) (int, error) {
n, err := c.w.Write(b)
c.crc = crc32.Update(c.crc, crc32.IEEETable, b[:n])
c.n += n
_, _ = c.h.Write(b)
return n, err
}
@ -51,5 +47,5 @@ func (c *CountHashWriter) Count() int {
// Sum32 returns the CRC-32 hash of the content written to this writer
func (c *CountHashWriter) Sum32() uint32 {
return c.h.Sum32()
return c.crc
}

View File

@ -27,7 +27,7 @@ import (
// Dictionary is the zap representation of the term dictionary
type Dictionary struct {
segment *Segment
sb *SegmentBase
field string
fieldID uint16
fst *vellum.FST
@ -40,9 +40,9 @@ func (d *Dictionary) PostingsList(term string, except *roaring.Bitmap) (segment.
func (d *Dictionary) postingsList(term string, except *roaring.Bitmap) (*PostingsList, error) {
rv := &PostingsList{
dictionary: d,
term: term,
except: except,
sb: d.sb,
term: term,
except: except,
}
if d.fst != nil {
@ -56,19 +56,19 @@ func (d *Dictionary) postingsList(term string, except *roaring.Bitmap) (*Posting
var n uint64
var read int
rv.freqOffset, read = binary.Uvarint(d.segment.mm[postingsOffset+n : postingsOffset+binary.MaxVarintLen64])
rv.freqOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+binary.MaxVarintLen64])
n += uint64(read)
rv.locOffset, read = binary.Uvarint(d.segment.mm[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
rv.locOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
n += uint64(read)
var locBitmapOffset uint64
locBitmapOffset, read = binary.Uvarint(d.segment.mm[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
locBitmapOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
n += uint64(read)
// go ahead and load loc bitmap
var locBitmapLen uint64
locBitmapLen, read = binary.Uvarint(d.segment.mm[locBitmapOffset : locBitmapOffset+binary.MaxVarintLen64])
locRoaringBytes := d.segment.mm[locBitmapOffset+uint64(read) : locBitmapOffset+uint64(read)+locBitmapLen]
locBitmapLen, read = binary.Uvarint(d.sb.mem[locBitmapOffset : locBitmapOffset+binary.MaxVarintLen64])
locRoaringBytes := d.sb.mem[locBitmapOffset+uint64(read) : locBitmapOffset+uint64(read)+locBitmapLen]
rv.locBitmap = roaring.NewBitmap()
_, err := rv.locBitmap.FromBuffer(locRoaringBytes)
if err != nil {
@ -76,10 +76,10 @@ func (d *Dictionary) postingsList(term string, except *roaring.Bitmap) (*Posting
}
var postingsLen uint64
postingsLen, read = binary.Uvarint(d.segment.mm[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
postingsLen, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
n += uint64(read)
roaringBytes := d.segment.mm[postingsOffset+n : postingsOffset+n+postingsLen]
roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen]
bitmap := roaring.NewBitmap()
_, err = bitmap.FromBuffer(roaringBytes)

View File

@ -61,7 +61,7 @@ func (di *docValueIterator) curChunkNumber() uint64 {
return di.curChunkNum
}
func (s *Segment) loadFieldDocValueIterator(field string,
func (s *SegmentBase) loadFieldDocValueIterator(field string,
fieldDvLoc uint64) (*docValueIterator, error) {
// get the docValue offset for the given fields
if fieldDvLoc == fieldNotUninverted {
@ -71,7 +71,7 @@ func (s *Segment) loadFieldDocValueIterator(field string,
// read the number of chunks, chunk lengths
var offset, clen uint64
numChunks, read := binary.Uvarint(s.mm[fieldDvLoc : fieldDvLoc+binary.MaxVarintLen64])
numChunks, read := binary.Uvarint(s.mem[fieldDvLoc : fieldDvLoc+binary.MaxVarintLen64])
if read <= 0 {
return nil, fmt.Errorf("failed to read the field "+
"doc values for field %s", field)
@ -84,7 +84,7 @@ func (s *Segment) loadFieldDocValueIterator(field string,
chunkLens: make([]uint64, int(numChunks)),
}
for i := 0; i < int(numChunks); i++ {
clen, read = binary.Uvarint(s.mm[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64])
clen, read = binary.Uvarint(s.mem[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64])
if read <= 0 {
return nil, fmt.Errorf("corrupted chunk length during segment load")
}
@ -97,7 +97,7 @@ func (s *Segment) loadFieldDocValueIterator(field string,
}
func (di *docValueIterator) loadDvChunk(chunkNumber,
localDocNum uint64, s *Segment) error {
localDocNum uint64, s *SegmentBase) error {
// advance to the chunk where the docValues
// reside for the given docID
destChunkDataLoc := di.dvDataLoc
@ -107,7 +107,7 @@ func (di *docValueIterator) loadDvChunk(chunkNumber,
curChunkSize := di.chunkLens[chunkNumber]
// read the number of docs reside in the chunk
numDocs, read := binary.Uvarint(s.mm[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64])
numDocs, read := binary.Uvarint(s.mem[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64])
if read <= 0 {
return fmt.Errorf("failed to read the chunk")
}
@ -116,17 +116,17 @@ func (di *docValueIterator) loadDvChunk(chunkNumber,
offset := uint64(0)
di.curChunkHeader = make([]MetaData, int(numDocs))
for i := 0; i < int(numDocs); i++ {
di.curChunkHeader[i].DocID, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
di.curChunkHeader[i].DocID, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(read)
di.curChunkHeader[i].DocDvLoc, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
di.curChunkHeader[i].DocDvLoc, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(read)
di.curChunkHeader[i].DocDvLen, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
di.curChunkHeader[i].DocDvLen, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(read)
}
compressedDataLoc := chunkMetaLoc + offset
dataLength := destChunkDataLoc + curChunkSize - compressedDataLoc
di.curChunkData = s.mm[compressedDataLoc : compressedDataLoc+dataLength]
di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength]
di.curChunkNum = chunkNumber
return nil
}
@ -171,18 +171,18 @@ func (di *docValueIterator) getDocValueLocs(docID uint64) (uint64, uint64) {
// VisitDocumentFieldTerms is an implementation of the
// DocumentFieldTermVisitable interface
func (s *Segment) VisitDocumentFieldTerms(localDocNum uint64, fields []string,
func (s *SegmentBase) VisitDocumentFieldTerms(localDocNum uint64, fields []string,
visitor index.DocumentFieldTermVisitor) error {
fieldID := uint16(0)
fieldIDPlus1 := uint16(0)
ok := true
for _, field := range fields {
if fieldID, ok = s.fieldsMap[field]; !ok {
if fieldIDPlus1, ok = s.fieldsMap[field]; !ok {
continue
}
// find the chunkNumber where the docValues are stored
docInChunk := localDocNum / uint64(s.chunkFactor)
if dvIter, exists := s.fieldDvIterMap[fieldID-1]; exists &&
if dvIter, exists := s.fieldDvIterMap[fieldIDPlus1-1]; exists &&
dvIter != nil {
// check if the chunk is already loaded
if docInChunk != dvIter.curChunkNumber() {

View File

@ -72,14 +72,13 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
dictLocs = make([]uint64, len(fieldsInv))
}
var fieldsIndexOffset uint64
fieldsIndexOffset, err = persistFields(fieldsInv, cr, dictLocs)
fieldsIndexOffset, err := persistFields(fieldsInv, cr, dictLocs)
if err != nil {
return nil, err
}
err = persistFooter(newSegDocCount, storedIndexOffset,
fieldsIndexOffset, fieldDvLocsOffset, chunkFactor, cr)
fieldsIndexOffset, fieldDvLocsOffset, chunkFactor, cr.Sum32(), cr)
if err != nil {
return nil, err
}

View File

@ -27,7 +27,7 @@ import (
// PostingsList is an in-memory represenation of a postings list
type PostingsList struct {
dictionary *Dictionary
sb *SegmentBase
term string
postingsOffset uint64
freqOffset uint64
@ -48,11 +48,11 @@ func (p *PostingsList) Iterator() segment.PostingsIterator {
var n uint64
var read int
var numFreqChunks uint64
numFreqChunks, read = binary.Uvarint(p.dictionary.segment.mm[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
n += uint64(read)
rv.freqChunkLens = make([]uint64, int(numFreqChunks))
for i := 0; i < int(numFreqChunks); i++ {
rv.freqChunkLens[i], read = binary.Uvarint(p.dictionary.segment.mm[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
rv.freqChunkLens[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
n += uint64(read)
}
rv.freqChunkStart = p.freqOffset + n
@ -60,11 +60,11 @@ func (p *PostingsList) Iterator() segment.PostingsIterator {
// prepare the loc chunk details
n = 0
var numLocChunks uint64
numLocChunks, read = binary.Uvarint(p.dictionary.segment.mm[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
n += uint64(read)
rv.locChunkLens = make([]uint64, int(numLocChunks))
for i := 0; i < int(numLocChunks); i++ {
rv.locChunkLens[i], read = binary.Uvarint(p.dictionary.segment.mm[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
rv.locChunkLens[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
n += uint64(read)
}
rv.locChunkStart = p.locOffset + n
@ -133,7 +133,7 @@ func (i *PostingsIterator) loadChunk(chunk int) error {
start += i.freqChunkLens[j]
}
end := start + i.freqChunkLens[chunk]
i.currChunkFreqNorm = i.postings.dictionary.segment.mm[start:end]
i.currChunkFreqNorm = i.postings.sb.mem[start:end]
i.freqNormDecoder = govarint.NewU64Base128Decoder(bytes.NewReader(i.currChunkFreqNorm))
start = i.locChunkStart
@ -141,7 +141,7 @@ func (i *PostingsIterator) loadChunk(chunk int) error {
start += i.locChunkLens[j]
}
end = start + i.locChunkLens[chunk]
i.currChunkLoc = i.postings.dictionary.segment.mm[start:end]
i.currChunkLoc = i.postings.sb.mem[start:end]
i.locDecoder = govarint.NewU64Base128Decoder(bytes.NewReader(i.currChunkLoc))
i.currChunk = uint32(chunk)
return nil
@ -192,7 +192,7 @@ func (i *PostingsIterator) readLocation(l *Location) error {
// group these together for less branching
if l != nil {
l.field = i.postings.dictionary.segment.fieldsInv[fieldID]
l.field = i.postings.sb.fieldsInv[fieldID]
l.pos = pos
l.start = start
l.end = end
@ -221,9 +221,9 @@ func (i *PostingsIterator) Next() (segment.Posting, error) {
return nil, nil
}
n := i.actual.Next()
nChunk := n / i.postings.dictionary.segment.chunkFactor
nChunk := n / i.postings.sb.chunkFactor
allN := i.all.Next()
allNChunk := allN / i.postings.dictionary.segment.chunkFactor
allNChunk := allN / i.postings.sb.chunkFactor
// n is the next actual hit (excluding some postings)
// allN is the next hit in the full postings

View File

@ -16,16 +16,16 @@ package zap
import "encoding/binary"
func (s *Segment) getStoredMetaAndCompressed(docNum uint64) ([]byte, []byte) {
func (s *SegmentBase) getDocStoredMetaAndCompressed(docNum uint64) ([]byte, []byte) {
docStoredStartAddr := s.storedIndexOffset + (8 * docNum)
docStoredStart := binary.BigEndian.Uint64(s.mm[docStoredStartAddr : docStoredStartAddr+8])
docStoredStart := binary.BigEndian.Uint64(s.mem[docStoredStartAddr : docStoredStartAddr+8])
var n uint64
metaLen, read := binary.Uvarint(s.mm[docStoredStart : docStoredStart+binary.MaxVarintLen64])
metaLen, read := binary.Uvarint(s.mem[docStoredStart : docStoredStart+binary.MaxVarintLen64])
n += uint64(read)
var dataLen uint64
dataLen, read = binary.Uvarint(s.mm[docStoredStart+n : docStoredStart+n+binary.MaxVarintLen64])
dataLen, read = binary.Uvarint(s.mem[docStoredStart+n : docStoredStart+n+binary.MaxVarintLen64])
n += uint64(read)
meta := s.mm[docStoredStart+n : docStoredStart+n+metaLen]
data := s.mm[docStoredStart+n+metaLen : docStoredStart+n+metaLen+dataLen]
meta := s.mem[docStoredStart+n : docStoredStart+n+metaLen]
data := s.mem[docStoredStart+n+metaLen : docStoredStart+n+metaLen+dataLen]
return meta, data
}

View File

@ -44,12 +44,15 @@ func Open(path string) (segment.Segment, error) {
}
rv := &Segment{
f: f,
mm: mm,
path: path,
fieldsMap: make(map[string]uint16),
fieldDvIterMap: make(map[uint16]*docValueIterator),
refs: 1,
SegmentBase: SegmentBase{
mem: mm[0 : len(mm)-FooterSize],
fieldsMap: make(map[string]uint16),
fieldDvIterMap: make(map[uint16]*docValueIterator),
},
f: f,
mm: mm,
path: path,
refs: 1,
}
err = rv.loadConfig()
@ -73,24 +76,36 @@ func Open(path string) (segment.Segment, error) {
return rv, nil
}
// Segment implements the segment.Segment inteface over top the zap file format
type Segment struct {
f *os.File
mm mmap.MMap
path string
crc uint32
version uint32
// SegmentBase is a memory only, read-only implementation of the
// segment.Segment interface, using zap's data representation.
type SegmentBase struct {
mem []byte
memCRC uint32
chunkFactor uint32
fieldsMap map[string]uint16 // fieldName -> fieldID+1
fieldsInv []string // fieldID -> fieldName
numDocs uint64
storedIndexOffset uint64
fieldsIndexOffset uint64
docValueOffset uint64
dictLocs []uint64
fieldDvIterMap map[uint16]*docValueIterator // naive chunk cache per field
}
fieldsMap map[string]uint16
fieldsInv []string
fieldsOffsets []uint64
func (sb *SegmentBase) AddRef() {}
func (sb *SegmentBase) DecRef() (err error) { return nil }
func (sb *SegmentBase) Close() (err error) { return nil }
docValueOffset uint64
fieldDvIterMap map[uint16]*docValueIterator // naive chunk cache per field
// Segment implements a persisted segment.Segment interface, by
// embedding an mmap()'ed SegmentBase.
type Segment struct {
SegmentBase
f *os.File
mm mmap.MMap
path string
version uint32
crc uint32
m sync.Mutex // Protects the fields that follow.
refs int64
@ -98,17 +113,29 @@ type Segment struct {
func (s *Segment) SizeInBytes() uint64 {
// 8 /* size of file pointer */
// 4 /* size of crc -> uint32 */
// 4 /* size of version -> uint32 */
// 4 /* size of crc -> uint32 */
sizeOfUints := 16
sizeInBytes := (len(s.path) + int(segment.SizeOfString)) + sizeOfUints
// mutex, refs -> int64
sizeInBytes += 16
// do not include the mmap'ed part
return uint64(sizeInBytes) + s.SegmentBase.SizeInBytes() - uint64(len(s.mem))
}
func (s *SegmentBase) SizeInBytes() uint64 {
// 4 /* size of memCRC -> uint32 */
// 4 /* size of chunkFactor -> uint32 */
// 8 /* size of numDocs -> uint64 */
// 8 /* size of storedIndexOffset -> uint64 */
// 8 /* size of fieldsIndexOffset -> uint64 */
// 8 /* size of docValueOffset -> uint64 */
sizeOfUints := 52
sizeInBytes := 40
// Do not include the mmap'ed part
sizeInBytes := (len(s.path) + int(segment.SizeOfString)) + sizeOfUints
sizeInBytes += len(s.mem) + int(segment.SizeOfSlice)
// fieldsMap
for k, _ := range s.fieldsMap {
@ -116,12 +143,12 @@ func (s *Segment) SizeInBytes() uint64 {
}
sizeInBytes += int(segment.SizeOfMap) /* overhead from map */
// fieldsInv, fieldsOffsets
// fieldsInv, dictLocs
for _, entry := range s.fieldsInv {
sizeInBytes += (len(entry) + int(segment.SizeOfString))
}
sizeInBytes += len(s.fieldsOffsets) * 8 /* size of uint64 */
sizeInBytes += int(segment.SizeOfSlice) * 2 /* overhead from slices */
sizeInBytes += len(s.dictLocs) * 8 /* size of uint64 */
sizeInBytes += int(segment.SizeOfSlice) * 3 /* overhead from slices */
// fieldDvIterMap
sizeInBytes += len(s.fieldDvIterMap) *
@ -133,9 +160,6 @@ func (s *Segment) SizeInBytes() uint64 {
}
sizeInBytes += int(segment.SizeOfMap)
// mutex, refs -> int64
sizeInBytes += 16
return uint64(sizeInBytes)
}
@ -158,49 +182,50 @@ func (s *Segment) DecRef() (err error) {
func (s *Segment) loadConfig() error {
crcOffset := len(s.mm) - 4
s.crc = binary.BigEndian.Uint32(s.mm[crcOffset : crcOffset+4])
verOffset := crcOffset - 4
s.version = binary.BigEndian.Uint32(s.mm[verOffset : verOffset+4])
if s.version != version {
return fmt.Errorf("unsupported version %d", s.version)
}
chunkOffset := verOffset - 4
s.chunkFactor = binary.BigEndian.Uint32(s.mm[chunkOffset : chunkOffset+4])
docValueOffset := chunkOffset - 8
s.docValueOffset = binary.BigEndian.Uint64(s.mm[docValueOffset : docValueOffset+8])
fieldsOffset := docValueOffset - 8
s.fieldsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsOffset : fieldsOffset+8])
fieldsIndexOffset := docValueOffset - 8
s.fieldsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8])
storedOffset := fieldsOffset - 8
s.storedIndexOffset = binary.BigEndian.Uint64(s.mm[storedOffset : storedOffset+8])
storedIndexOffset := fieldsIndexOffset - 8
s.storedIndexOffset = binary.BigEndian.Uint64(s.mm[storedIndexOffset : storedIndexOffset+8])
docNumOffset := storedOffset - 8
s.numDocs = binary.BigEndian.Uint64(s.mm[docNumOffset : docNumOffset+8])
numDocsOffset := storedIndexOffset - 8
s.numDocs = binary.BigEndian.Uint64(s.mm[numDocsOffset : numDocsOffset+8])
return nil
}
func (s *Segment) loadFields() error {
// NOTE for now we assume the fields index immediately preceeds the footer
// if this changes, need to adjust accordingly (or store explicit length)
fieldsIndexEnd := uint64(len(s.mm) - FooterSize)
func (s *SegmentBase) loadFields() error {
// NOTE for now we assume the fields index immediately preceeds
// the footer, and if this changes, need to adjust accordingly (or
// store explicit length), where s.mem was sliced from s.mm in Open().
fieldsIndexEnd := uint64(len(s.mem))
// iterate through fields index
var fieldID uint64
for s.fieldsIndexOffset+(8*fieldID) < fieldsIndexEnd {
addr := binary.BigEndian.Uint64(s.mm[s.fieldsIndexOffset+(8*fieldID) : s.fieldsIndexOffset+(8*fieldID)+8])
var n uint64
addr := binary.BigEndian.Uint64(s.mem[s.fieldsIndexOffset+(8*fieldID) : s.fieldsIndexOffset+(8*fieldID)+8])
dictLoc, read := binary.Uvarint(s.mm[addr+n : fieldsIndexEnd])
n += uint64(read)
s.fieldsOffsets = append(s.fieldsOffsets, dictLoc)
dictLoc, read := binary.Uvarint(s.mem[addr:fieldsIndexEnd])
n := uint64(read)
s.dictLocs = append(s.dictLocs, dictLoc)
var nameLen uint64
nameLen, read = binary.Uvarint(s.mm[addr+n : fieldsIndexEnd])
nameLen, read = binary.Uvarint(s.mem[addr+n : fieldsIndexEnd])
n += uint64(read)
name := string(s.mm[addr+n : addr+n+nameLen])
name := string(s.mem[addr+n : addr+n+nameLen])
s.fieldsInv = append(s.fieldsInv, name)
s.fieldsMap[name] = uint16(fieldID + 1)
@ -210,7 +235,7 @@ func (s *Segment) loadFields() error {
}
// Dictionary returns the term dictionary for the specified field
func (s *Segment) Dictionary(field string) (segment.TermDictionary, error) {
func (s *SegmentBase) Dictionary(field string) (segment.TermDictionary, error) {
dict, err := s.dictionary(field)
if err == nil && dict == nil {
return &segment.EmptyDictionary{}, nil
@ -218,21 +243,20 @@ func (s *Segment) Dictionary(field string) (segment.TermDictionary, error) {
return dict, err
}
func (s *Segment) dictionary(field string) (rv *Dictionary, err error) {
rv = &Dictionary{
segment: s,
field: field,
}
func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) {
fieldIDPlus1 := sb.fieldsMap[field]
if fieldIDPlus1 > 0 {
rv = &Dictionary{
sb: sb,
field: field,
fieldID: fieldIDPlus1 - 1,
}
rv.fieldID = s.fieldsMap[field]
if rv.fieldID > 0 {
rv.fieldID = rv.fieldID - 1
dictStart := s.fieldsOffsets[rv.fieldID]
dictStart := sb.dictLocs[rv.fieldID]
if dictStart > 0 {
// read the length of the vellum data
vellumLen, read := binary.Uvarint(s.mm[dictStart : dictStart+binary.MaxVarintLen64])
fstBytes := s.mm[dictStart+uint64(read) : dictStart+uint64(read)+vellumLen]
vellumLen, read := binary.Uvarint(sb.mem[dictStart : dictStart+binary.MaxVarintLen64])
fstBytes := sb.mem[dictStart+uint64(read) : dictStart+uint64(read)+vellumLen]
if fstBytes != nil {
rv.fst, err = vellum.Load(fstBytes)
if err != nil {
@ -240,9 +264,6 @@ func (s *Segment) dictionary(field string) (rv *Dictionary, err error) {
}
}
}
} else {
return nil, nil
}
return rv, nil
@ -250,10 +271,10 @@ func (s *Segment) dictionary(field string) (rv *Dictionary, err error) {
// VisitDocument invokes the DocFieldValueVistor for each stored field
// for the specified doc number
func (s *Segment) VisitDocument(num uint64, visitor segment.DocumentFieldValueVisitor) error {
func (s *SegmentBase) VisitDocument(num uint64, visitor segment.DocumentFieldValueVisitor) error {
// first make sure this is a valid number in this segment
if num < s.numDocs {
meta, compressed := s.getStoredMetaAndCompressed(num)
meta, compressed := s.getDocStoredMetaAndCompressed(num)
uncompressed, err := snappy.Decode(nil, compressed)
if err != nil {
return err
@ -307,13 +328,13 @@ func (s *Segment) VisitDocument(num uint64, visitor segment.DocumentFieldValueVi
}
// Count returns the number of documents in this segment.
func (s *Segment) Count() uint64 {
func (s *SegmentBase) Count() uint64 {
return s.numDocs
}
// DocNumbers returns a bitset corresponding to the doc numbers of all the
// provided _id strings
func (s *Segment) DocNumbers(ids []string) (*roaring.Bitmap, error) {
func (s *SegmentBase) DocNumbers(ids []string) (*roaring.Bitmap, error) {
rv := roaring.New()
if len(s.fieldsMap) > 0 {
@ -337,7 +358,7 @@ func (s *Segment) DocNumbers(ids []string) (*roaring.Bitmap, error) {
}
// Fields returns the field names used in this segment
func (s *Segment) Fields() []string {
func (s *SegmentBase) Fields() []string {
return s.fieldsInv
}
@ -411,23 +432,22 @@ func (s *Segment) NumDocs() uint64 {
// DictAddr is a helper function to compute the file offset where the
// dictionary is stored for the specified field.
func (s *Segment) DictAddr(field string) (uint64, error) {
var fieldID uint16
var ok bool
if fieldID, ok = s.fieldsMap[field]; !ok {
fieldIDPlus1, ok := s.fieldsMap[field]
if !ok {
return 0, fmt.Errorf("no such field '%s'", field)
}
return s.fieldsOffsets[fieldID-1], nil
return s.dictLocs[fieldIDPlus1-1], nil
}
func (s *Segment) loadDvIterators() error {
func (s *SegmentBase) loadDvIterators() error {
if s.docValueOffset == fieldNotUninverted {
return nil
}
var read uint64
for fieldID, field := range s.fieldsInv {
fieldLoc, n := binary.Uvarint(s.mm[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64])
fieldLoc, n := binary.Uvarint(s.mem[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64])
if n <= 0 {
return fmt.Errorf("loadDvIterators: failed to read the docvalue offsets for field %d", fieldID)
}

View File

@ -53,12 +53,11 @@ func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer) (int, error) {
func persistFields(fieldsInv []string, w *CountHashWriter, dictLocs []uint64) (uint64, error) {
var rv uint64
var fieldsOffsets []uint64
var fieldStarts []uint64
for fieldID, fieldName := range fieldsInv {
// record start of this field
fieldStarts = append(fieldStarts, uint64(w.Count()))
fieldsOffsets = append(fieldsOffsets, uint64(w.Count()))
// write out the dict location and field name length
_, err := writeUvarints(w, dictLocs[fieldID], uint64(len(fieldName)))
@ -76,7 +75,7 @@ func persistFields(fieldsInv []string, w *CountHashWriter, dictLocs []uint64) (u
// now write out the fields index
rv = uint64(w.Count())
for fieldID := range fieldsInv {
err := binary.Write(w, binary.BigEndian, fieldStarts[fieldID])
err := binary.Write(w, binary.BigEndian, fieldsOffsets[fieldID])
if err != nil {
return 0, err
}
@ -89,8 +88,11 @@ func persistFields(fieldsInv []string, w *CountHashWriter, dictLocs []uint64) (u
// crc + ver + chunk + field offset + stored offset + num docs + docValueOffset
const FooterSize = 4 + 4 + 4 + 8 + 8 + 8 + 8
func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset, docValueOffset uint64,
chunkFactor uint32, w *CountHashWriter) error {
func persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64,
chunkFactor uint32, crcBeforeFooter uint32, writerIn io.Writer) error {
w := NewCountHashWriter(writerIn)
w.crc = crcBeforeFooter
// write out the number of docs
err := binary.Write(w, binary.BigEndian, numDocs)
if err != nil {
@ -102,7 +104,7 @@ func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset, docValueOffset
return err
}
// write out the field index location
err = binary.Write(w, binary.BigEndian, fieldIndexOffset)
err = binary.Write(w, binary.BigEndian, fieldsIndexOffset)
if err != nil {
return err
}
@ -122,7 +124,7 @@ func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset, docValueOffset
return err
}
// write out CRC-32 of everything upto but not including this CRC
err = binary.Write(w, binary.BigEndian, w.Sum32())
err = binary.Write(w, binary.BigEndian, w.crc)
if err != nil {
return err
}