0
0
Fork 0

Merge pull request #743 from steveyen/master

zap-based in-memory segment impl & various merge optimizations
This commit is contained in:
Steve Yen 2018-01-29 09:22:12 -08:00 committed by GitHub
commit 5d1a2b0ad7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 536 additions and 380 deletions

View File

@ -54,7 +54,6 @@ OUTER:
lastEpochMergePlanned = ourSnapshot.epoch
s.fireEvent(EventKindMergerProgress, time.Since(startTime))
}
_ = ourSnapshot.DecRef()
@ -81,6 +80,7 @@ OUTER:
// lets get started
err := s.planMergeAtSnapshot(ourSnapshot)
if err != nil {
s.fireAsyncError(fmt.Errorf("merging err: %v", err))
_ = ourSnapshot.DecRef()
continue OUTER
}
@ -141,7 +141,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
filename := zapFileName(newSegmentID)
s.markIneligibleForRemoval(filename)
path := s.path + string(os.PathSeparator) + filename
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024)
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, DefaultChunkFactor)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
return fmt.Errorf("merging failed: %v", err)

View File

@ -28,11 +28,12 @@ import (
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
"github.com/blevesearch/bleve/index/scorch/segment/zap"
"github.com/boltdb/bolt"
)
var DefaultChunkFactor uint32 = 1024
type notificationChan chan struct{}
func (s *Scorch) persisterLoop() {
@ -178,11 +179,11 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
return err2
}
switch seg := segmentSnapshot.segment.(type) {
case *mem.Segment:
case *zap.SegmentBase:
// need to persist this to disk
filename := zapFileName(segmentSnapshot.id)
path := s.path + string(os.PathSeparator) + filename
err2 := zap.PersistSegment(seg, path, 1024)
err2 := zap.PersistSegmentBase(seg, path)
if err2 != nil {
return fmt.Errorf("error persisting segment: %v", err2)
}

View File

@ -28,6 +28,7 @@ import (
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
"github.com/blevesearch/bleve/index/scorch/segment/zap"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/registry"
"github.com/boltdb/bolt"
@ -217,7 +218,7 @@ func (s *Scorch) Delete(id string) error {
}
// Batch applices a batch of changes to the index atomically
func (s *Scorch) Batch(batch *index.Batch) error {
func (s *Scorch) Batch(batch *index.Batch) (err error) {
start := time.Now()
defer func() {
@ -271,10 +272,13 @@ func (s *Scorch) Batch(batch *index.Batch) error {
var newSegment segment.Segment
if len(analysisResults) > 0 {
newSegment = mem.NewFromAnalyzedDocs(analysisResults)
newSegment, err = zap.NewSegmentBase(mem.NewFromAnalyzedDocs(analysisResults), DefaultChunkFactor)
if err != nil {
return err
}
}
err := s.prepareSegment(newSegment, ids, batch.InternalOps)
err = s.prepareSegment(newSegment, ids, batch.InternalOps)
if err != nil {
if newSegment != nil {
_ = newSegment.Close()

View File

@ -1395,7 +1395,7 @@ func TestConcurrentUpdate(t *testing.T) {
// do some concurrent updates
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
doc := document.NewDocument("1")

View File

@ -267,15 +267,15 @@ func (s *Segment) processDocument(result *index.AnalysisResult) {
}
func (s *Segment) getOrDefineField(name string) int {
fieldID, ok := s.FieldsMap[name]
fieldIDPlus1, ok := s.FieldsMap[name]
if !ok {
fieldID = uint16(len(s.FieldsInv) + 1)
s.FieldsMap[name] = fieldID
fieldIDPlus1 = uint16(len(s.FieldsInv) + 1)
s.FieldsMap[name] = fieldIDPlus1
s.FieldsInv = append(s.FieldsInv, name)
s.Dicts = append(s.Dicts, make(map[string]uint64))
s.DictKeys = append(s.DictKeys, make([]string, 0))
}
return int(fieldID - 1)
return int(fieldIDPlus1 - 1)
}
func (s *Segment) addDocument() int {

View File

@ -40,35 +40,38 @@ const idFieldID uint16 = 0
// Segment is an in memory implementation of scorch.Segment
type Segment struct {
// FieldsMap name -> id+1
// FieldsMap adds 1 to field id to avoid zero value issues
// name -> field id + 1
FieldsMap map[string]uint16
// fields id -> name
// FieldsInv is the inverse of FieldsMap
// field id -> name
FieldsInv []string
// term dictionary
// Term dictionaries for each field
// field id -> term -> postings list id + 1
Dicts []map[string]uint64
// term dictionary keys
// field id -> []dictionary keys
// Terms for each field, where terms are sorted ascending
// field id -> []term
DictKeys [][]string
// Postings list
// postings list id -> Postings bitmap
// postings list id -> bitmap by docNum
Postings []*roaring.Bitmap
// Postings List has locations
// Postings list has locations
PostingsLocs []*roaring.Bitmap
// term frequencies
// Term frequencies
// postings list id -> Freqs (one for each hit in bitmap)
Freqs [][]uint64
// field Norms
// Field norms
// postings list id -> Norms (one for each hit in bitmap)
Norms [][]float32
// field/start/end/pos/locarraypos
// Field/start/end/pos/locarraypos
// postings list id -> start/end/pos/locarraypos (one for each freq)
Locfields [][]uint16
Locstarts [][]uint64
@ -80,18 +83,18 @@ type Segment struct {
// docNum -> field id -> slice of values (each value []byte)
Stored []map[uint16][][]byte
// stored field types
// Stored field types
// docNum -> field id -> slice of types (each type byte)
StoredTypes []map[uint16][]byte
// stored field array positions
// Stored field array positions
// docNum -> field id -> slice of array positions (each is []uint64)
StoredPos []map[uint16][][]uint64
// for storing the docValue persisted fields
// For storing the docValue persisted fields
DocValueFields map[uint16]bool
// footprint of the segment, updated when analyzed document mutations
// Footprint of the segment, updated when analyzed document mutations
// are added into the segment
sizeInBytes uint64
}

View File

@ -32,10 +32,8 @@ const version uint32 = 2
const fieldNotUninverted = math.MaxUint64
// PersistSegment takes the in-memory segment and persists it to the specified
// path in the zap file format.
func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (err error) {
// PersistSegmentBase persists SegmentBase in the zap file format.
func PersistSegmentBase(sb *SegmentBase, path string) error {
flag := os.O_RDWR | os.O_CREATE
f, err := os.OpenFile(path, flag, 0600)
@ -43,84 +41,151 @@ func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (e
return err
}
// bufer the output
br := bufio.NewWriter(f)
// wrap it for counting (tracking offsets)
cr := NewCountHashWriter(br)
var storedIndexOffset uint64
var dictLocs []uint64
docValueOffset := uint64(fieldNotUninverted)
if len(memSegment.Stored) > 0 {
storedIndexOffset, err = persistStored(memSegment, cr)
if err != nil {
return err
}
var freqOffsets, locOffsets []uint64
freqOffsets, locOffsets, err = persistPostingDetails(memSegment, cr, chunkFactor)
if err != nil {
return err
}
var postingsListLocs []uint64
postingsListLocs, err = persistPostingsLocs(memSegment, cr)
if err != nil {
return err
}
var postingsLocs []uint64
postingsLocs, err = persistPostingsLists(memSegment, cr, postingsListLocs, freqOffsets, locOffsets)
if err != nil {
return err
}
dictLocs, err = persistDictionary(memSegment, cr, postingsLocs)
if err != nil {
return err
}
docValueOffset, err = persistFieldDocValues(cr, chunkFactor, memSegment)
if err != nil {
return err
}
} else {
dictLocs = make([]uint64, len(memSegment.FieldsInv))
cleanup := func() {
_ = f.Close()
_ = os.Remove(path)
}
var fieldIndexStart uint64
fieldIndexStart, err = persistFields(memSegment.FieldsInv, cr, dictLocs)
br := bufio.NewWriter(f)
_, err = br.Write(sb.mem)
if err != nil {
cleanup()
return err
}
err = persistFooter(uint64(len(memSegment.Stored)), storedIndexOffset,
fieldIndexStart, docValueOffset, chunkFactor, cr)
err = persistFooter(sb.numDocs, sb.storedIndexOffset, sb.fieldsIndexOffset, sb.docValueOffset,
sb.chunkFactor, sb.memCRC, br)
if err != nil {
cleanup()
return err
}
err = br.Flush()
if err != nil {
cleanup()
return err
}
err = f.Sync()
if err != nil {
cleanup()
return err
}
err = f.Close()
if err != nil {
cleanup()
return err
}
return nil
}
// PersistSegment takes the in-memory segment and persists it to
// the specified path in the zap file format.
func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) error {
flag := os.O_RDWR | os.O_CREATE
f, err := os.OpenFile(path, flag, 0600)
if err != nil {
return err
}
cleanup := func() {
_ = f.Close()
_ = os.Remove(path)
}
// buffer the output
br := bufio.NewWriter(f)
// wrap it for counting (tracking offsets)
cr := NewCountHashWriter(br)
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, _, err :=
persistBase(memSegment, cr, chunkFactor)
if err != nil {
cleanup()
return err
}
err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset,
chunkFactor, cr.Sum32(), cr)
if err != nil {
cleanup()
return err
}
err = br.Flush()
if err != nil {
cleanup()
return err
}
err = f.Sync()
if err != nil {
cleanup()
return err
}
err = f.Close()
if err != nil {
cleanup()
return err
}
return nil
}
func persistBase(memSegment *mem.Segment, cr *CountHashWriter, chunkFactor uint32) (
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64,
dictLocs []uint64, err error) {
docValueOffset = uint64(fieldNotUninverted)
if len(memSegment.Stored) > 0 {
storedIndexOffset, err = persistStored(memSegment, cr)
if err != nil {
return 0, 0, 0, 0, nil, err
}
freqOffsets, locOffsets, err := persistPostingDetails(memSegment, cr, chunkFactor)
if err != nil {
return 0, 0, 0, 0, nil, err
}
postingsListLocs, err := persistPostingsLocs(memSegment, cr)
if err != nil {
return 0, 0, 0, 0, nil, err
}
postingsLocs, err := persistPostingsLists(memSegment, cr, postingsListLocs, freqOffsets, locOffsets)
if err != nil {
return 0, 0, 0, 0, nil, err
}
dictLocs, err = persistDictionary(memSegment, cr, postingsLocs)
if err != nil {
return 0, 0, 0, 0, nil, err
}
docValueOffset, err = persistFieldDocValues(memSegment, cr, chunkFactor)
if err != nil {
return 0, 0, 0, 0, nil, err
}
} else {
dictLocs = make([]uint64, len(memSegment.FieldsInv))
}
fieldsIndexOffset, err = persistFields(memSegment.FieldsInv, cr, dictLocs)
if err != nil {
return 0, 0, 0, 0, nil, err
}
return uint64(len(memSegment.Stored)), storedIndexOffset, fieldsIndexOffset, docValueOffset,
dictLocs, nil
}
func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) {
var curr int
@ -356,11 +421,13 @@ func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFac
func persistPostingsLocs(memSegment *mem.Segment, w *CountHashWriter) (rv []uint64, err error) {
rv = make([]uint64, 0, len(memSegment.PostingsLocs))
var reuseBuf bytes.Buffer
reuseBufVarint := make([]byte, binary.MaxVarintLen64)
for postingID := range memSegment.PostingsLocs {
// record where we start this posting loc
rv = append(rv, uint64(w.Count()))
// write out the length and bitmap
_, err = writeRoaringWithLen(memSegment.PostingsLocs[postingID], w)
_, err = writeRoaringWithLen(memSegment.PostingsLocs[postingID], w, &reuseBuf, reuseBufVarint)
if err != nil {
return nil, err
}
@ -371,6 +438,8 @@ func persistPostingsLocs(memSegment *mem.Segment, w *CountHashWriter) (rv []uint
func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter,
postingsListLocs, freqOffsets, locOffsets []uint64) (rv []uint64, err error) {
rv = make([]uint64, 0, len(memSegment.Postings))
var reuseBuf bytes.Buffer
reuseBufVarint := make([]byte, binary.MaxVarintLen64)
for postingID := range memSegment.Postings {
// record where we start this posting list
rv = append(rv, uint64(w.Count()))
@ -383,7 +452,7 @@ func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter,
}
// write out the length and bitmap
_, err = writeRoaringWithLen(memSegment.Postings[postingID], w)
_, err = writeRoaringWithLen(memSegment.Postings[postingID], w, &reuseBuf, reuseBufVarint)
if err != nil {
return nil, err
}
@ -394,6 +463,8 @@ func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter,
func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs []uint64) ([]uint64, error) {
rv := make([]uint64, 0, len(memSegment.DictKeys))
varintBuf := make([]byte, binary.MaxVarintLen64)
var buffer bytes.Buffer
for fieldID, fieldTerms := range memSegment.DictKeys {
if fieldID != 0 {
@ -427,10 +498,8 @@ func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs
vellumData := buffer.Bytes()
// write out the length of the vellum data
buf := make([]byte, binary.MaxVarintLen64)
// write out the number of chunks
n := binary.PutUvarint(buf, uint64(len(vellumData)))
_, err = w.Write(buf[:n])
n := binary.PutUvarint(varintBuf, uint64(len(vellumData)))
_, err = w.Write(varintBuf[:n])
if err != nil {
return nil, err
}
@ -521,9 +590,8 @@ func persistDocValues(memSegment *mem.Segment, w *CountHashWriter,
return fieldChunkOffsets, nil
}
func persistFieldDocValues(w *CountHashWriter, chunkFactor uint32,
memSegment *mem.Segment) (uint64, error) {
func persistFieldDocValues(memSegment *mem.Segment, w *CountHashWriter,
chunkFactor uint32) (uint64, error) {
fieldDvOffsets, err := persistDocValues(memSegment, w, chunkFactor)
if err != nil {
return 0, err
@ -548,3 +616,36 @@ func persistFieldDocValues(w *CountHashWriter, chunkFactor uint32,
return fieldDocValuesOffset, nil
}
func NewSegmentBase(memSegment *mem.Segment, chunkFactor uint32) (*SegmentBase, error) {
var br bytes.Buffer
cr := NewCountHashWriter(&br)
numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs, err :=
persistBase(memSegment, cr, chunkFactor)
if err != nil {
return nil, err
}
sb := &SegmentBase{
mem: br.Bytes(),
memCRC: cr.Sum32(),
chunkFactor: chunkFactor,
fieldsMap: memSegment.FieldsMap,
fieldsInv: memSegment.FieldsInv,
numDocs: numDocs,
storedIndexOffset: storedIndexOffset,
fieldsIndexOffset: fieldsIndexOffset,
docValueOffset: docValueOffset,
dictLocs: dictLocs,
fieldDvIterMap: make(map[uint16]*docValueIterator),
}
err = sb.loadDvIterators()
if err != nil {
return nil, err
}
return sb, nil
}

View File

@ -15,32 +15,28 @@
package zap
import (
"hash"
"hash/crc32"
"io"
)
// CountHashWriter is a wrapper around a Writer which counts the number of
// bytes which have been written
// bytes which have been written and computes a crc32 hash
type CountHashWriter struct {
w io.Writer
h hash.Hash32
n int
w io.Writer
crc uint32
n int
}
// NewCountHashWriter returns a CountHashWriter which wraps the provided Writer
func NewCountHashWriter(w io.Writer) *CountHashWriter {
return &CountHashWriter{
w: w,
h: crc32.NewIEEE(),
}
return &CountHashWriter{w: w}
}
// Write writes the provided bytes to the wrapped writer and counts the bytes
func (c *CountHashWriter) Write(b []byte) (int, error) {
n, err := c.w.Write(b)
c.crc = crc32.Update(c.crc, crc32.IEEETable, b[:n])
c.n += n
_, _ = c.h.Write(b)
return n, err
}
@ -51,5 +47,5 @@ func (c *CountHashWriter) Count() int {
// Sum32 returns the CRC-32 hash of the content written to this writer
func (c *CountHashWriter) Sum32() uint32 {
return c.h.Sum32()
return c.crc
}

View File

@ -27,7 +27,7 @@ import (
// Dictionary is the zap representation of the term dictionary
type Dictionary struct {
segment *Segment
sb *SegmentBase
field string
fieldID uint16
fst *vellum.FST
@ -35,18 +35,18 @@ type Dictionary struct {
// PostingsList returns the postings list for the specified term
func (d *Dictionary) PostingsList(term string, except *roaring.Bitmap) (segment.PostingsList, error) {
return d.postingsList(term, except)
return d.postingsList([]byte(term), except)
}
func (d *Dictionary) postingsList(term string, except *roaring.Bitmap) (*PostingsList, error) {
func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap) (*PostingsList, error) {
rv := &PostingsList{
dictionary: d,
term: term,
except: except,
sb: d.sb,
term: term,
except: except,
}
if d.fst != nil {
postingsOffset, exists, err := d.fst.Get([]byte(term))
postingsOffset, exists, err := d.fst.Get(term)
if err != nil {
return nil, fmt.Errorf("vellum err: %v", err)
}
@ -56,19 +56,19 @@ func (d *Dictionary) postingsList(term string, except *roaring.Bitmap) (*Posting
var n uint64
var read int
rv.freqOffset, read = binary.Uvarint(d.segment.mm[postingsOffset+n : postingsOffset+binary.MaxVarintLen64])
rv.freqOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+binary.MaxVarintLen64])
n += uint64(read)
rv.locOffset, read = binary.Uvarint(d.segment.mm[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
rv.locOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
n += uint64(read)
var locBitmapOffset uint64
locBitmapOffset, read = binary.Uvarint(d.segment.mm[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
locBitmapOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
n += uint64(read)
// go ahead and load loc bitmap
var locBitmapLen uint64
locBitmapLen, read = binary.Uvarint(d.segment.mm[locBitmapOffset : locBitmapOffset+binary.MaxVarintLen64])
locRoaringBytes := d.segment.mm[locBitmapOffset+uint64(read) : locBitmapOffset+uint64(read)+locBitmapLen]
locBitmapLen, read = binary.Uvarint(d.sb.mem[locBitmapOffset : locBitmapOffset+binary.MaxVarintLen64])
locRoaringBytes := d.sb.mem[locBitmapOffset+uint64(read) : locBitmapOffset+uint64(read)+locBitmapLen]
rv.locBitmap = roaring.NewBitmap()
_, err := rv.locBitmap.FromBuffer(locRoaringBytes)
if err != nil {
@ -76,10 +76,10 @@ func (d *Dictionary) postingsList(term string, except *roaring.Bitmap) (*Posting
}
var postingsLen uint64
postingsLen, read = binary.Uvarint(d.segment.mm[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
postingsLen, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64])
n += uint64(read)
roaringBytes := d.segment.mm[postingsOffset+n : postingsOffset+n+postingsLen]
roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen]
bitmap := roaring.NewBitmap()
_, err = bitmap.FromBuffer(roaringBytes)
@ -96,7 +96,6 @@ func (d *Dictionary) postingsList(term string, except *roaring.Bitmap) (*Posting
// Iterator returns an iterator for this dictionary
func (d *Dictionary) Iterator() segment.DictionaryIterator {
rv := &DictionaryIterator{
d: d,
}

View File

@ -61,17 +61,17 @@ func (di *docValueIterator) curChunkNumber() uint64 {
return di.curChunkNum
}
func (s *Segment) loadFieldDocValueIterator(field string,
func (s *SegmentBase) loadFieldDocValueIterator(field string,
fieldDvLoc uint64) (*docValueIterator, error) {
// get the docValue offset for the given fields
if fieldDvLoc == fieldNotUninverted {
return nil, fmt.Errorf("loadFieldDocValueConfigs: "+
return nil, fmt.Errorf("loadFieldDocValueIterator: "+
"no docValues found for field: %s", field)
}
// read the number of chunks, chunk lengths
var offset, clen uint64
numChunks, read := binary.Uvarint(s.mm[fieldDvLoc : fieldDvLoc+binary.MaxVarintLen64])
numChunks, read := binary.Uvarint(s.mem[fieldDvLoc : fieldDvLoc+binary.MaxVarintLen64])
if read <= 0 {
return nil, fmt.Errorf("failed to read the field "+
"doc values for field %s", field)
@ -84,7 +84,7 @@ func (s *Segment) loadFieldDocValueIterator(field string,
chunkLens: make([]uint64, int(numChunks)),
}
for i := 0; i < int(numChunks); i++ {
clen, read = binary.Uvarint(s.mm[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64])
clen, read = binary.Uvarint(s.mem[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64])
if read <= 0 {
return nil, fmt.Errorf("corrupted chunk length during segment load")
}
@ -97,7 +97,7 @@ func (s *Segment) loadFieldDocValueIterator(field string,
}
func (di *docValueIterator) loadDvChunk(chunkNumber,
localDocNum uint64, s *Segment) error {
localDocNum uint64, s *SegmentBase) error {
// advance to the chunk where the docValues
// reside for the given docID
destChunkDataLoc := di.dvDataLoc
@ -107,7 +107,7 @@ func (di *docValueIterator) loadDvChunk(chunkNumber,
curChunkSize := di.chunkLens[chunkNumber]
// read the number of docs reside in the chunk
numDocs, read := binary.Uvarint(s.mm[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64])
numDocs, read := binary.Uvarint(s.mem[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64])
if read <= 0 {
return fmt.Errorf("failed to read the chunk")
}
@ -116,17 +116,17 @@ func (di *docValueIterator) loadDvChunk(chunkNumber,
offset := uint64(0)
di.curChunkHeader = make([]MetaData, int(numDocs))
for i := 0; i < int(numDocs); i++ {
di.curChunkHeader[i].DocID, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
di.curChunkHeader[i].DocID, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(read)
di.curChunkHeader[i].DocDvLoc, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
di.curChunkHeader[i].DocDvLoc, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(read)
di.curChunkHeader[i].DocDvLen, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
di.curChunkHeader[i].DocDvLen, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
offset += uint64(read)
}
compressedDataLoc := chunkMetaLoc + offset
dataLength := destChunkDataLoc + curChunkSize - compressedDataLoc
di.curChunkData = s.mm[compressedDataLoc : compressedDataLoc+dataLength]
di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength]
di.curChunkNum = chunkNumber
return nil
}
@ -171,18 +171,18 @@ func (di *docValueIterator) getDocValueLocs(docID uint64) (uint64, uint64) {
// VisitDocumentFieldTerms is an implementation of the
// DocumentFieldTermVisitable interface
func (s *Segment) VisitDocumentFieldTerms(localDocNum uint64, fields []string,
func (s *SegmentBase) VisitDocumentFieldTerms(localDocNum uint64, fields []string,
visitor index.DocumentFieldTermVisitor) error {
fieldID := uint16(0)
fieldIDPlus1 := uint16(0)
ok := true
for _, field := range fields {
if fieldID, ok = s.fieldsMap[field]; !ok {
if fieldIDPlus1, ok = s.fieldsMap[field]; !ok {
continue
}
// find the chunkNumber where the docValues are stored
docInChunk := localDocNum / uint64(s.chunkFactor)
if dvIter, exists := s.fieldDvIterMap[fieldID-1]; exists &&
if dvIter, exists := s.fieldDvIterMap[fieldIDPlus1-1]; exists &&
dvIter != nil {
// check if the chunk is already loaded
if docInChunk != dvIter.curChunkNumber() {

View File

@ -29,10 +29,10 @@ import (
"github.com/golang/snappy"
)
// Merge takes a slice of zap segments, bit masks describing which documents
// from the may be dropped, and creates a new segment containing the remaining
// data. This new segment is built at the specified path, with the provided
// chunkFactor.
// Merge takes a slice of zap segments and bit masks describing which
// documents may be dropped, and creates a new segment containing the
// remaining data. This new segment is built at the specified path,
// with the provided chunkFactor.
func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
chunkFactor uint32) ([][]uint64, error) {
flag := os.O_RDWR | os.O_CREATE
@ -42,6 +42,11 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
return nil, err
}
cleanup := func() {
_ = f.Close()
_ = os.Remove(path)
}
// buffer the output
br := bufio.NewWriter(f)
@ -50,52 +55,59 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
fieldsInv := mergeFields(segments)
fieldsMap := mapFields(fieldsInv)
newSegDocCount := computeNewDocCount(segments, drops)
var newDocNums [][]uint64
var storedIndexOffset uint64
fieldDvLocsOffset := uint64(fieldNotUninverted)
var dictLocs []uint64
newSegDocCount := computeNewDocCount(segments, drops)
if newSegDocCount > 0 {
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
fieldsMap, fieldsInv, newSegDocCount, cr)
if err != nil {
cleanup()
return nil, err
}
dictLocs, fieldDvLocsOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap,
newDocNums, newSegDocCount, chunkFactor, cr)
if err != nil {
cleanup()
return nil, err
}
} else {
dictLocs = make([]uint64, len(fieldsInv))
}
var fieldsIndexOffset uint64
fieldsIndexOffset, err = persistFields(fieldsInv, cr, dictLocs)
fieldsIndexOffset, err := persistFields(fieldsInv, cr, dictLocs)
if err != nil {
cleanup()
return nil, err
}
err = persistFooter(newSegDocCount, storedIndexOffset,
fieldsIndexOffset, fieldDvLocsOffset, chunkFactor, cr)
fieldsIndexOffset, fieldDvLocsOffset, chunkFactor, cr.Sum32(), cr)
if err != nil {
cleanup()
return nil, err
}
err = br.Flush()
if err != nil {
cleanup()
return nil, err
}
err = f.Sync()
if err != nil {
cleanup()
return nil, err
}
err = f.Close()
if err != nil {
cleanup()
return nil, err
}
@ -104,7 +116,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string,
// mapFields takes the fieldsInv list and builds the map
func mapFields(fields []string) map[string]uint16 {
rv := make(map[string]uint16)
rv := make(map[string]uint16, len(fields))
for i, fieldName := range fields {
rv[fieldName] = uint16(i)
}
@ -114,15 +126,14 @@ func mapFields(fields []string) map[string]uint16 {
// computeNewDocCount determines how many documents will be in the newly
// merged segment when obsoleted docs are dropped
func computeNewDocCount(segments []*Segment, drops []*roaring.Bitmap) uint64 {
var newSegDocCount uint64
var newDocCount uint64
for segI, segment := range segments {
segIAfterDrop := segment.NumDocs()
newDocCount += segment.NumDocs()
if drops[segI] != nil {
segIAfterDrop -= drops[segI].GetCardinality()
newDocCount -= drops[segI].GetCardinality()
}
newSegDocCount += segIAfterDrop
}
return newSegDocCount
return newDocCount
}
func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
@ -130,14 +141,18 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
newSegDocCount uint64, chunkFactor uint32,
w *CountHashWriter) ([]uint64, uint64, error) {
var bufReuse bytes.Buffer
var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64)
var bufLoc []uint64
rv1 := make([]uint64, len(fieldsInv))
rv := make([]uint64, len(fieldsInv))
fieldDvLocs := make([]uint64, len(fieldsInv))
fieldDvLocsOffset := uint64(fieldNotUninverted)
var docNumbers docIDRange
var vellumBuf bytes.Buffer
// for each field
for fieldID, fieldName := range fieldsInv {
if fieldID != 0 {
@ -177,13 +192,15 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1)
fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), newSegDocCount-1)
docTermMap := make(map[uint64][]byte, 0)
docTermMap := make(map[uint64][]byte, newSegDocCount)
for err == nil {
term, _ := mergeItr.Current()
newRoaring := roaring.NewBitmap()
newRoaringLocs := roaring.NewBitmap()
tfEncoder.Reset()
locEncoder.Reset()
@ -193,7 +210,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
if dict == nil {
continue
}
postings, err2 := dict.postingsList(string(term), drops[dictI])
postings, err2 := dict.postingsList(term, drops[dictI])
if err2 != nil {
return nil, 0, err2
}
@ -209,9 +226,9 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
// encode norm bits
norm := next.Norm()
normBits := math.Float32bits(float32(norm))
err3 := tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits))
if err3 != nil {
return nil, 0, err3
err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits))
if err != nil {
return nil, 0, err
}
locs := next.Locations()
if len(locs) > 0 {
@ -234,15 +251,16 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
}
}
docTermMap[hitNewDocNum] = append(docTermMap[hitNewDocNum], []byte(term)...)
docTermMap[hitNewDocNum] = append(docTermMap[hitNewDocNum], termSeparator)
docTermMap[hitNewDocNum] =
append(append(docTermMap[hitNewDocNum], term...), termSeparator)
next, err2 = postItr.Next()
}
if err != nil {
return nil, 0, err
if err2 != nil {
return nil, 0, err2
}
}
tfEncoder.Close()
locEncoder.Close()
@ -259,7 +277,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
return nil, 0, err
}
postingLocOffset := uint64(w.Count())
_, err = writeRoaringWithLen(newRoaringLocs, w)
_, err = writeRoaringWithLen(newRoaringLocs, w, &bufReuse, bufMaxVarintLen64)
if err != nil {
return nil, 0, err
}
@ -285,7 +303,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
if err != nil {
return nil, 0, err
}
_, err = writeRoaringWithLen(newRoaring, w)
_, err = writeRoaringWithLen(newRoaring, w, &bufReuse, bufMaxVarintLen64)
if err != nil {
return nil, 0, err
}
@ -303,6 +321,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
}
dictOffset := uint64(w.Count())
err = newVellum.Close()
if err != nil {
return nil, 0, err
@ -310,10 +329,8 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
vellumData := vellumBuf.Bytes()
// write out the length of the vellum data
buf := bufMaxVarintLen64
// write out the number of chunks
n := binary.PutUvarint(buf, uint64(len(vellumData)))
_, err = w.Write(buf[:n])
n := binary.PutUvarint(bufMaxVarintLen64, uint64(len(vellumData)))
_, err = w.Write(bufMaxVarintLen64[:n])
if err != nil {
return nil, 0, err
}
@ -324,27 +341,33 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
return nil, 0, err
}
rv1[fieldID] = dictOffset
rv[fieldID] = dictOffset
// update the doc value
var docNumbers docIDRange
// update the doc nums
if cap(docNumbers) < len(docTermMap) {
docNumbers = make(docIDRange, 0, len(docTermMap))
}
docNumbers = docNumbers[:0]
for k := range docTermMap {
docNumbers = append(docNumbers, k)
}
sort.Sort(docNumbers)
fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), newSegDocCount-1)
for _, docNum := range docNumbers {
err = fdvEncoder.Add(docNum, docTermMap[docNum])
if err != nil {
return nil, 0, err
}
}
// get the field doc value offset
fieldDvLocs[fieldID] = uint64(w.Count())
err = fdvEncoder.Close()
if err != nil {
return nil, 0, err
}
// get the field doc value offset
fieldDvLocs[fieldID] = uint64(w.Count())
// persist the doc value details for this field
_, err = fdvEncoder.Write(w)
if err != nil {
@ -353,7 +376,8 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
}
fieldDvLocsOffset = uint64(w.Count())
buf := make([]byte, binary.MaxVarintLen64)
buf := bufMaxVarintLen64
for _, offset := range fieldDvLocs {
n := binary.PutUvarint(buf, uint64(offset))
_, err := w.Write(buf[:n])
@ -362,7 +386,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap,
}
}
return rv1, fieldDvLocsOffset, nil
return rv, fieldDvLocsOffset, nil
}
const docDropped = math.MaxUint64
@ -370,13 +394,16 @@ const docDropped = math.MaxUint64
func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap,
fieldsMap map[string]uint16, fieldsInv []string, newSegDocCount uint64,
w *CountHashWriter) (uint64, [][]uint64, error) {
var rv [][]uint64
var newDocNum int
var rv [][]uint64 // The remapped or newDocNums for each segment.
var newDocNum uint64
var curr int
var metaBuf bytes.Buffer
var data, compressed []byte
metaEncoder := govarint.NewU64Base128Encoder(&metaBuf)
vals := make([][][]byte, len(fieldsInv))
typs := make([][]byte, len(fieldsInv))
poss := make([][][]uint64, len(fieldsInv))
@ -385,118 +412,121 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap,
// for each segment
for segI, segment := range segments {
var segNewDocNums []uint64
segNewDocNums := make([]uint64, segment.numDocs)
// for each doc num
for docNum := uint64(0); docNum < segment.numDocs; docNum++ {
// TODO: roaring's API limits docNums to 32-bits?
if drops[segI] != nil && drops[segI].Contains(uint32(docNum)) {
segNewDocNums[docNum] = docDropped
continue
}
segNewDocNums[docNum] = newDocNum
curr = 0
metaBuf.Reset()
data = data[:0]
compressed = compressed[:0]
curr = 0
metaEncoder := govarint.NewU64Base128Encoder(&metaBuf)
if drops[segI] != nil && drops[segI].Contains(uint32(docNum)) {
segNewDocNums = append(segNewDocNums, docDropped)
} else {
segNewDocNums = append(segNewDocNums, uint64(newDocNum))
// collect all the data
for i := 0; i < len(fieldsInv); i++ {
vals[i] = vals[i][:0]
typs[i] = typs[i][:0]
poss[i] = poss[i][:0]
}
err := segment.VisitDocument(docNum, func(field string, typ byte, value []byte, pos []uint64) bool {
fieldID := int(fieldsMap[field])
vals[fieldID] = append(vals[fieldID], value)
typs[fieldID] = append(typs[fieldID], typ)
poss[fieldID] = append(poss[fieldID], pos)
return true
})
if err != nil {
return 0, nil, err
}
// now walk the fields in order
for fieldID := range fieldsInv {
storedFieldValues := vals[int(fieldID)]
// has stored values for this field
num := len(storedFieldValues)
// process each value
for i := 0; i < num; i++ {
// encode field
_, err2 := metaEncoder.PutU64(uint64(fieldID))
if err2 != nil {
return 0, nil, err2
}
// encode type
_, err2 = metaEncoder.PutU64(uint64(typs[int(fieldID)][i]))
if err2 != nil {
return 0, nil, err2
}
// encode start offset
_, err2 = metaEncoder.PutU64(uint64(curr))
if err2 != nil {
return 0, nil, err2
}
// end len
_, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i])))
if err2 != nil {
return 0, nil, err2
}
// encode number of array pos
_, err2 = metaEncoder.PutU64(uint64(len(poss[int(fieldID)][i])))
if err2 != nil {
return 0, nil, err2
}
// encode all array positions
for j := 0; j < len(poss[int(fieldID)][i]); j++ {
_, err2 = metaEncoder.PutU64(poss[int(fieldID)][i][j])
if err2 != nil {
return 0, nil, err2
}
}
// append data
data = append(data, storedFieldValues[i]...)
// update curr
curr += len(storedFieldValues[i])
}
}
metaEncoder.Close()
metaBytes := metaBuf.Bytes()
compressed = snappy.Encode(compressed, data)
// record where we're about to start writing
docNumOffsets[newDocNum] = uint64(w.Count())
// write out the meta len and compressed data len
_, err = writeUvarints(w,
uint64(len(metaBytes)), uint64(len(compressed)))
if err != nil {
return 0, nil, err
}
// now write the meta
_, err = w.Write(metaBytes)
if err != nil {
return 0, nil, err
}
// now write the compressed data
_, err = w.Write(compressed)
if err != nil {
return 0, nil, err
}
newDocNum++
// collect all the data
for i := 0; i < len(fieldsInv); i++ {
vals[i] = vals[i][:0]
typs[i] = typs[i][:0]
poss[i] = poss[i][:0]
}
err := segment.VisitDocument(docNum, func(field string, typ byte, value []byte, pos []uint64) bool {
fieldID := int(fieldsMap[field])
vals[fieldID] = append(vals[fieldID], value)
typs[fieldID] = append(typs[fieldID], typ)
poss[fieldID] = append(poss[fieldID], pos)
return true
})
if err != nil {
return 0, nil, err
}
// now walk the fields in order
for fieldID := range fieldsInv {
storedFieldValues := vals[int(fieldID)]
// has stored values for this field
num := len(storedFieldValues)
// process each value
for i := 0; i < num; i++ {
// encode field
_, err2 := metaEncoder.PutU64(uint64(fieldID))
if err2 != nil {
return 0, nil, err2
}
// encode type
_, err2 = metaEncoder.PutU64(uint64(typs[int(fieldID)][i]))
if err2 != nil {
return 0, nil, err2
}
// encode start offset
_, err2 = metaEncoder.PutU64(uint64(curr))
if err2 != nil {
return 0, nil, err2
}
// end len
_, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i])))
if err2 != nil {
return 0, nil, err2
}
// encode number of array pos
_, err2 = metaEncoder.PutU64(uint64(len(poss[int(fieldID)][i])))
if err2 != nil {
return 0, nil, err2
}
// encode all array positions
for j := 0; j < len(poss[int(fieldID)][i]); j++ {
_, err2 = metaEncoder.PutU64(poss[int(fieldID)][i][j])
if err2 != nil {
return 0, nil, err2
}
}
// append data
data = append(data, storedFieldValues[i]...)
// update curr
curr += len(storedFieldValues[i])
}
}
metaEncoder.Close()
metaBytes := metaBuf.Bytes()
compressed = snappy.Encode(compressed, data)
// record where we're about to start writing
docNumOffsets[newDocNum] = uint64(w.Count())
// write out the meta len and compressed data len
_, err = writeUvarints(w, uint64(len(metaBytes)), uint64(len(compressed)))
if err != nil {
return 0, nil, err
}
// now write the meta
_, err = w.Write(metaBytes)
if err != nil {
return 0, nil, err
}
// now write the compressed data
_, err = w.Write(compressed)
if err != nil {
return 0, nil, err
}
newDocNum++
}
rv = append(rv, segNewDocNums)
}
// return value is the start of the stored index
offset := uint64(w.Count())
// now write out the stored doc index
for docNum := range docNumOffsets {
err := binary.Write(w, binary.BigEndian, docNumOffsets[docNum])
@ -511,13 +541,13 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap,
// mergeFields builds a unified list of fields used across all the input segments
func mergeFields(segments []*Segment) []string {
fieldsMap := map[string]struct{}{}
for _, segment := range segments {
fields := segment.Fields()
for _, field := range fields {
fieldsMap[field] = struct{}{}
}
}
rv := make([]string, 0, len(fieldsMap))
// ensure _id stays first
rv = append(rv, "_id")
@ -526,6 +556,5 @@ func mergeFields(segments []*Segment) []string {
rv = append(rv, k)
}
}
return rv
}

View File

@ -27,8 +27,8 @@ import (
// PostingsList is an in-memory represenation of a postings list
type PostingsList struct {
dictionary *Dictionary
term string
sb *SegmentBase
term []byte
postingsOffset uint64
freqOffset uint64
locOffset uint64
@ -48,11 +48,11 @@ func (p *PostingsList) Iterator() segment.PostingsIterator {
var n uint64
var read int
var numFreqChunks uint64
numFreqChunks, read = binary.Uvarint(p.dictionary.segment.mm[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
n += uint64(read)
rv.freqChunkLens = make([]uint64, int(numFreqChunks))
for i := 0; i < int(numFreqChunks); i++ {
rv.freqChunkLens[i], read = binary.Uvarint(p.dictionary.segment.mm[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
rv.freqChunkLens[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64])
n += uint64(read)
}
rv.freqChunkStart = p.freqOffset + n
@ -60,11 +60,11 @@ func (p *PostingsList) Iterator() segment.PostingsIterator {
// prepare the loc chunk details
n = 0
var numLocChunks uint64
numLocChunks, read = binary.Uvarint(p.dictionary.segment.mm[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
n += uint64(read)
rv.locChunkLens = make([]uint64, int(numLocChunks))
for i := 0; i < int(numLocChunks); i++ {
rv.locChunkLens[i], read = binary.Uvarint(p.dictionary.segment.mm[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
rv.locChunkLens[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64])
n += uint64(read)
}
rv.locChunkStart = p.locOffset + n
@ -133,7 +133,7 @@ func (i *PostingsIterator) loadChunk(chunk int) error {
start += i.freqChunkLens[j]
}
end := start + i.freqChunkLens[chunk]
i.currChunkFreqNorm = i.postings.dictionary.segment.mm[start:end]
i.currChunkFreqNorm = i.postings.sb.mem[start:end]
i.freqNormDecoder = govarint.NewU64Base128Decoder(bytes.NewReader(i.currChunkFreqNorm))
start = i.locChunkStart
@ -141,7 +141,7 @@ func (i *PostingsIterator) loadChunk(chunk int) error {
start += i.locChunkLens[j]
}
end = start + i.locChunkLens[chunk]
i.currChunkLoc = i.postings.dictionary.segment.mm[start:end]
i.currChunkLoc = i.postings.sb.mem[start:end]
i.locDecoder = govarint.NewU64Base128Decoder(bytes.NewReader(i.currChunkLoc))
i.currChunk = uint32(chunk)
return nil
@ -192,7 +192,7 @@ func (i *PostingsIterator) readLocation(l *Location) error {
// group these together for less branching
if l != nil {
l.field = i.postings.dictionary.segment.fieldsInv[fieldID]
l.field = i.postings.sb.fieldsInv[fieldID]
l.pos = pos
l.start = start
l.end = end
@ -221,9 +221,9 @@ func (i *PostingsIterator) Next() (segment.Posting, error) {
return nil, nil
}
n := i.actual.Next()
nChunk := n / i.postings.dictionary.segment.chunkFactor
nChunk := n / i.postings.sb.chunkFactor
allN := i.all.Next()
allNChunk := allN / i.postings.dictionary.segment.chunkFactor
allNChunk := allN / i.postings.sb.chunkFactor
// n is the next actual hit (excluding some postings)
// allN is the next hit in the full postings

View File

@ -16,16 +16,16 @@ package zap
import "encoding/binary"
func (s *Segment) getStoredMetaAndCompressed(docNum uint64) ([]byte, []byte) {
func (s *SegmentBase) getDocStoredMetaAndCompressed(docNum uint64) ([]byte, []byte) {
docStoredStartAddr := s.storedIndexOffset + (8 * docNum)
docStoredStart := binary.BigEndian.Uint64(s.mm[docStoredStartAddr : docStoredStartAddr+8])
docStoredStart := binary.BigEndian.Uint64(s.mem[docStoredStartAddr : docStoredStartAddr+8])
var n uint64
metaLen, read := binary.Uvarint(s.mm[docStoredStart : docStoredStart+binary.MaxVarintLen64])
metaLen, read := binary.Uvarint(s.mem[docStoredStart : docStoredStart+binary.MaxVarintLen64])
n += uint64(read)
var dataLen uint64
dataLen, read = binary.Uvarint(s.mm[docStoredStart+n : docStoredStart+n+binary.MaxVarintLen64])
dataLen, read = binary.Uvarint(s.mem[docStoredStart+n : docStoredStart+n+binary.MaxVarintLen64])
n += uint64(read)
meta := s.mm[docStoredStart+n : docStoredStart+n+metaLen]
data := s.mm[docStoredStart+n+metaLen : docStoredStart+n+metaLen+dataLen]
meta := s.mem[docStoredStart+n : docStoredStart+n+metaLen]
data := s.mem[docStoredStart+n+metaLen : docStoredStart+n+metaLen+dataLen]
return meta, data
}

View File

@ -44,12 +44,15 @@ func Open(path string) (segment.Segment, error) {
}
rv := &Segment{
f: f,
mm: mm,
path: path,
fieldsMap: make(map[string]uint16),
fieldDvIterMap: make(map[uint16]*docValueIterator),
refs: 1,
SegmentBase: SegmentBase{
mem: mm[0 : len(mm)-FooterSize],
fieldsMap: make(map[string]uint16),
fieldDvIterMap: make(map[uint16]*docValueIterator),
},
f: f,
mm: mm,
path: path,
refs: 1,
}
err = rv.loadConfig()
@ -73,24 +76,36 @@ func Open(path string) (segment.Segment, error) {
return rv, nil
}
// Segment implements the segment.Segment inteface over top the zap file format
type Segment struct {
f *os.File
mm mmap.MMap
path string
crc uint32
version uint32
// SegmentBase is a memory only, read-only implementation of the
// segment.Segment interface, using zap's data representation.
type SegmentBase struct {
mem []byte
memCRC uint32
chunkFactor uint32
fieldsMap map[string]uint16 // fieldName -> fieldID+1
fieldsInv []string // fieldID -> fieldName
numDocs uint64
storedIndexOffset uint64
fieldsIndexOffset uint64
docValueOffset uint64
dictLocs []uint64
fieldDvIterMap map[uint16]*docValueIterator // naive chunk cache per field
}
fieldsMap map[string]uint16
fieldsInv []string
fieldsOffsets []uint64
func (sb *SegmentBase) AddRef() {}
func (sb *SegmentBase) DecRef() (err error) { return nil }
func (sb *SegmentBase) Close() (err error) { return nil }
docValueOffset uint64
fieldDvIterMap map[uint16]*docValueIterator // naive chunk cache per field
// Segment implements a persisted segment.Segment interface, by
// embedding an mmap()'ed SegmentBase.
type Segment struct {
SegmentBase
f *os.File
mm mmap.MMap
path string
version uint32
crc uint32
m sync.Mutex // Protects the fields that follow.
refs int64
@ -98,17 +113,29 @@ type Segment struct {
func (s *Segment) SizeInBytes() uint64 {
// 8 /* size of file pointer */
// 4 /* size of crc -> uint32 */
// 4 /* size of version -> uint32 */
// 4 /* size of crc -> uint32 */
sizeOfUints := 16
sizeInBytes := (len(s.path) + int(segment.SizeOfString)) + sizeOfUints
// mutex, refs -> int64
sizeInBytes += 16
// do not include the mmap'ed part
return uint64(sizeInBytes) + s.SegmentBase.SizeInBytes() - uint64(len(s.mem))
}
func (s *SegmentBase) SizeInBytes() uint64 {
// 4 /* size of memCRC -> uint32 */
// 4 /* size of chunkFactor -> uint32 */
// 8 /* size of numDocs -> uint64 */
// 8 /* size of storedIndexOffset -> uint64 */
// 8 /* size of fieldsIndexOffset -> uint64 */
// 8 /* size of docValueOffset -> uint64 */
sizeOfUints := 52
sizeInBytes := 40
// Do not include the mmap'ed part
sizeInBytes := (len(s.path) + int(segment.SizeOfString)) + sizeOfUints
sizeInBytes += len(s.mem) + int(segment.SizeOfSlice)
// fieldsMap
for k, _ := range s.fieldsMap {
@ -116,12 +143,12 @@ func (s *Segment) SizeInBytes() uint64 {
}
sizeInBytes += int(segment.SizeOfMap) /* overhead from map */
// fieldsInv, fieldsOffsets
// fieldsInv, dictLocs
for _, entry := range s.fieldsInv {
sizeInBytes += (len(entry) + int(segment.SizeOfString))
}
sizeInBytes += len(s.fieldsOffsets) * 8 /* size of uint64 */
sizeInBytes += int(segment.SizeOfSlice) * 2 /* overhead from slices */
sizeInBytes += len(s.dictLocs) * 8 /* size of uint64 */
sizeInBytes += int(segment.SizeOfSlice) * 3 /* overhead from slices */
// fieldDvIterMap
sizeInBytes += len(s.fieldDvIterMap) *
@ -133,9 +160,6 @@ func (s *Segment) SizeInBytes() uint64 {
}
sizeInBytes += int(segment.SizeOfMap)
// mutex, refs -> int64
sizeInBytes += 16
return uint64(sizeInBytes)
}
@ -158,47 +182,50 @@ func (s *Segment) DecRef() (err error) {
func (s *Segment) loadConfig() error {
crcOffset := len(s.mm) - 4
s.crc = binary.BigEndian.Uint32(s.mm[crcOffset : crcOffset+4])
verOffset := crcOffset - 4
s.version = binary.BigEndian.Uint32(s.mm[verOffset : verOffset+4])
if s.version != version {
return fmt.Errorf("unsupported version %d", s.version)
}
chunkOffset := verOffset - 4
s.chunkFactor = binary.BigEndian.Uint32(s.mm[chunkOffset : chunkOffset+4])
docValueOffset := chunkOffset - 8
s.docValueOffset = binary.BigEndian.Uint64(s.mm[docValueOffset : docValueOffset+8])
fieldsOffset := docValueOffset - 8
s.fieldsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsOffset : fieldsOffset+8])
storedOffset := fieldsOffset - 8
s.storedIndexOffset = binary.BigEndian.Uint64(s.mm[storedOffset : storedOffset+8])
docNumOffset := storedOffset - 8
s.numDocs = binary.BigEndian.Uint64(s.mm[docNumOffset : docNumOffset+8])
fieldsIndexOffset := docValueOffset - 8
s.fieldsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8])
storedIndexOffset := fieldsIndexOffset - 8
s.storedIndexOffset = binary.BigEndian.Uint64(s.mm[storedIndexOffset : storedIndexOffset+8])
numDocsOffset := storedIndexOffset - 8
s.numDocs = binary.BigEndian.Uint64(s.mm[numDocsOffset : numDocsOffset+8])
return nil
}
func (s *Segment) loadFields() error {
// NOTE for now we assume the fields index immediately preceeds the footer
// if this changes, need to adjust accordingly (or store epxlicit length)
fieldsIndexEnd := uint64(len(s.mm) - FooterSize)
func (s *SegmentBase) loadFields() error {
// NOTE for now we assume the fields index immediately preceeds
// the footer, and if this changes, need to adjust accordingly (or
// store explicit length), where s.mem was sliced from s.mm in Open().
fieldsIndexEnd := uint64(len(s.mem))
// iterate through fields index
var fieldID uint64
for s.fieldsIndexOffset+(8*fieldID) < fieldsIndexEnd {
addr := binary.BigEndian.Uint64(s.mm[s.fieldsIndexOffset+(8*fieldID) : s.fieldsIndexOffset+(8*fieldID)+8])
var n uint64
addr := binary.BigEndian.Uint64(s.mem[s.fieldsIndexOffset+(8*fieldID) : s.fieldsIndexOffset+(8*fieldID)+8])
dictLoc, read := binary.Uvarint(s.mm[addr+n : fieldsIndexEnd])
n += uint64(read)
s.fieldsOffsets = append(s.fieldsOffsets, dictLoc)
dictLoc, read := binary.Uvarint(s.mem[addr:fieldsIndexEnd])
n := uint64(read)
s.dictLocs = append(s.dictLocs, dictLoc)
var nameLen uint64
nameLen, read = binary.Uvarint(s.mm[addr+n : fieldsIndexEnd])
nameLen, read = binary.Uvarint(s.mem[addr+n : fieldsIndexEnd])
n += uint64(read)
name := string(s.mm[addr+n : addr+n+nameLen])
name := string(s.mem[addr+n : addr+n+nameLen])
s.fieldsInv = append(s.fieldsInv, name)
s.fieldsMap[name] = uint16(fieldID + 1)
@ -208,7 +235,7 @@ func (s *Segment) loadFields() error {
}
// Dictionary returns the term dictionary for the specified field
func (s *Segment) Dictionary(field string) (segment.TermDictionary, error) {
func (s *SegmentBase) Dictionary(field string) (segment.TermDictionary, error) {
dict, err := s.dictionary(field)
if err == nil && dict == nil {
return &segment.EmptyDictionary{}, nil
@ -216,21 +243,20 @@ func (s *Segment) Dictionary(field string) (segment.TermDictionary, error) {
return dict, err
}
func (s *Segment) dictionary(field string) (rv *Dictionary, err error) {
rv = &Dictionary{
segment: s,
field: field,
}
func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) {
fieldIDPlus1 := sb.fieldsMap[field]
if fieldIDPlus1 > 0 {
rv = &Dictionary{
sb: sb,
field: field,
fieldID: fieldIDPlus1 - 1,
}
rv.fieldID = s.fieldsMap[field]
if rv.fieldID > 0 {
rv.fieldID = rv.fieldID - 1
dictStart := s.fieldsOffsets[rv.fieldID]
dictStart := sb.dictLocs[rv.fieldID]
if dictStart > 0 {
// read the length of the vellum data
vellumLen, read := binary.Uvarint(s.mm[dictStart : dictStart+binary.MaxVarintLen64])
fstBytes := s.mm[dictStart+uint64(read) : dictStart+uint64(read)+vellumLen]
vellumLen, read := binary.Uvarint(sb.mem[dictStart : dictStart+binary.MaxVarintLen64])
fstBytes := sb.mem[dictStart+uint64(read) : dictStart+uint64(read)+vellumLen]
if fstBytes != nil {
rv.fst, err = vellum.Load(fstBytes)
if err != nil {
@ -238,9 +264,6 @@ func (s *Segment) dictionary(field string) (rv *Dictionary, err error) {
}
}
}
} else {
return nil, nil
}
return rv, nil
@ -248,10 +271,10 @@ func (s *Segment) dictionary(field string) (rv *Dictionary, err error) {
// VisitDocument invokes the DocFieldValueVistor for each stored field
// for the specified doc number
func (s *Segment) VisitDocument(num uint64, visitor segment.DocumentFieldValueVisitor) error {
func (s *SegmentBase) VisitDocument(num uint64, visitor segment.DocumentFieldValueVisitor) error {
// first make sure this is a valid number in this segment
if num < s.numDocs {
meta, compressed := s.getStoredMetaAndCompressed(num)
meta, compressed := s.getDocStoredMetaAndCompressed(num)
uncompressed, err := snappy.Decode(nil, compressed)
if err != nil {
return err
@ -305,13 +328,13 @@ func (s *Segment) VisitDocument(num uint64, visitor segment.DocumentFieldValueVi
}
// Count returns the number of documents in this segment.
func (s *Segment) Count() uint64 {
func (s *SegmentBase) Count() uint64 {
return s.numDocs
}
// DocNumbers returns a bitset corresponding to the doc numbers of all the
// provided _id strings
func (s *Segment) DocNumbers(ids []string) (*roaring.Bitmap, error) {
func (s *SegmentBase) DocNumbers(ids []string) (*roaring.Bitmap, error) {
rv := roaring.New()
if len(s.fieldsMap) > 0 {
@ -321,7 +344,7 @@ func (s *Segment) DocNumbers(ids []string) (*roaring.Bitmap, error) {
}
for _, id := range ids {
postings, err := idDict.postingsList(id, nil)
postings, err := idDict.postingsList([]byte(id), nil)
if err != nil {
return nil, err
}
@ -335,7 +358,7 @@ func (s *Segment) DocNumbers(ids []string) (*roaring.Bitmap, error) {
}
// Fields returns the field names used in this segment
func (s *Segment) Fields() []string {
func (s *SegmentBase) Fields() []string {
return s.fieldsInv
}
@ -409,23 +432,22 @@ func (s *Segment) NumDocs() uint64 {
// DictAddr is a helper function to compute the file offset where the
// dictionary is stored for the specified field.
func (s *Segment) DictAddr(field string) (uint64, error) {
var fieldID uint16
var ok bool
if fieldID, ok = s.fieldsMap[field]; !ok {
fieldIDPlus1, ok := s.fieldsMap[field]
if !ok {
return 0, fmt.Errorf("no such field '%s'", field)
}
return s.fieldsOffsets[fieldID-1], nil
return s.dictLocs[fieldIDPlus1-1], nil
}
func (s *Segment) loadDvIterators() error {
func (s *SegmentBase) loadDvIterators() error {
if s.docValueOffset == fieldNotUninverted {
return nil
}
var read uint64
for fieldID, field := range s.fieldsInv {
fieldLoc, n := binary.Uvarint(s.mm[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64])
fieldLoc, n := binary.Uvarint(s.mem[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64])
if n <= 0 {
return fmt.Errorf("loadDvIterators: failed to read the docvalue offsets for field %d", fieldID)
}

View File

@ -23,42 +23,40 @@ import (
)
// writes out the length of the roaring bitmap in bytes as varint
// then writs out the roaring bitmap itself
func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer) (int, error) {
var buffer bytes.Buffer
// then writes out the roaring bitmap itself
func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer,
reuseBuf *bytes.Buffer, reuseBufVarint []byte) (int, error) {
reuseBuf.Reset()
// write out postings list to memory so we know the len
postingsListLen, err := r.WriteTo(&buffer)
postingsListLen, err := r.WriteTo(reuseBuf)
if err != nil {
return 0, err
}
var tw int
// write out the length of this postings list
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(buf, uint64(postingsListLen))
nw, err := w.Write(buf[:n])
n := binary.PutUvarint(reuseBufVarint, uint64(postingsListLen))
nw, err := w.Write(reuseBufVarint[:n])
tw += nw
if err != nil {
return tw, err
}
// write out the postings list itself
nw, err = w.Write(buffer.Bytes())
nw, err = w.Write(reuseBuf.Bytes())
tw += nw
if err != nil {
return tw, err
}
return tw, nil
}
func persistFields(fieldsInv []string, w *CountHashWriter, dictLocs []uint64) (uint64, error) {
var rv uint64
var fieldsOffsets []uint64
var fieldStarts []uint64
for fieldID, fieldName := range fieldsInv {
// record start of this field
fieldStarts = append(fieldStarts, uint64(w.Count()))
fieldsOffsets = append(fieldsOffsets, uint64(w.Count()))
// write out the dict location and field name length
_, err := writeUvarints(w, dictLocs[fieldID], uint64(len(fieldName)))
@ -76,7 +74,7 @@ func persistFields(fieldsInv []string, w *CountHashWriter, dictLocs []uint64) (u
// now write out the fields index
rv = uint64(w.Count())
for fieldID := range fieldsInv {
err := binary.Write(w, binary.BigEndian, fieldStarts[fieldID])
err := binary.Write(w, binary.BigEndian, fieldsOffsets[fieldID])
if err != nil {
return 0, err
}
@ -89,8 +87,11 @@ func persistFields(fieldsInv []string, w *CountHashWriter, dictLocs []uint64) (u
// crc + ver + chunk + field offset + stored offset + num docs + docValueOffset
const FooterSize = 4 + 4 + 4 + 8 + 8 + 8 + 8
func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset, docValueOffset uint64,
chunkFactor uint32, w *CountHashWriter) error {
func persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64,
chunkFactor uint32, crcBeforeFooter uint32, writerIn io.Writer) error {
w := NewCountHashWriter(writerIn)
w.crc = crcBeforeFooter
// write out the number of docs
err := binary.Write(w, binary.BigEndian, numDocs)
if err != nil {
@ -102,7 +103,7 @@ func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset, docValueOffset
return err
}
// write out the field index location
err = binary.Write(w, binary.BigEndian, fieldIndexOffset)
err = binary.Write(w, binary.BigEndian, fieldsIndexOffset)
if err != nil {
return err
}
@ -122,7 +123,7 @@ func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset, docValueOffset
return err
}
// write out CRC-32 of everything upto but not including this CRC
err = binary.Write(w, binary.BigEndian, w.Sum32())
err = binary.Write(w, binary.BigEndian, w.crc)
if err != nil {
return err
}