diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 78c27ddb..5ded29b5 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -54,7 +54,6 @@ OUTER: lastEpochMergePlanned = ourSnapshot.epoch s.fireEvent(EventKindMergerProgress, time.Since(startTime)) - } _ = ourSnapshot.DecRef() @@ -81,6 +80,7 @@ OUTER: // lets get started err := s.planMergeAtSnapshot(ourSnapshot) if err != nil { + s.fireAsyncError(fmt.Errorf("merging err: %v", err)) _ = ourSnapshot.DecRef() continue OUTER } @@ -141,7 +141,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error { filename := zapFileName(newSegmentID) s.markIneligibleForRemoval(filename) path := s.path + string(os.PathSeparator) + filename - newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, 1024) + newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, DefaultChunkFactor) if err != nil { s.unmarkIneligibleForRemoval(filename) return fmt.Errorf("merging failed: %v", err) diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 658e57ae..cdcee37c 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -28,11 +28,12 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/blevesearch/bleve/index/scorch/segment" - "github.com/blevesearch/bleve/index/scorch/segment/mem" "github.com/blevesearch/bleve/index/scorch/segment/zap" "github.com/boltdb/bolt" ) +var DefaultChunkFactor uint32 = 1024 + type notificationChan chan struct{} func (s *Scorch) persisterLoop() { @@ -178,11 +179,11 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error { return err2 } switch seg := segmentSnapshot.segment.(type) { - case *mem.Segment: + case *zap.SegmentBase: // need to persist this to disk filename := zapFileName(segmentSnapshot.id) path := s.path + string(os.PathSeparator) + filename - err2 := zap.PersistSegment(seg, path, 1024) + err2 := zap.PersistSegmentBase(seg, path) if err2 != nil { return fmt.Errorf("error persisting segment: %v", err2) } diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 99d6dcd5..69328d9b 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -28,6 +28,7 @@ import ( "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/index/scorch/segment" "github.com/blevesearch/bleve/index/scorch/segment/mem" + "github.com/blevesearch/bleve/index/scorch/segment/zap" "github.com/blevesearch/bleve/index/store" "github.com/blevesearch/bleve/registry" "github.com/boltdb/bolt" @@ -217,7 +218,7 @@ func (s *Scorch) Delete(id string) error { } // Batch applices a batch of changes to the index atomically -func (s *Scorch) Batch(batch *index.Batch) error { +func (s *Scorch) Batch(batch *index.Batch) (err error) { start := time.Now() defer func() { @@ -271,10 +272,13 @@ func (s *Scorch) Batch(batch *index.Batch) error { var newSegment segment.Segment if len(analysisResults) > 0 { - newSegment = mem.NewFromAnalyzedDocs(analysisResults) + newSegment, err = zap.NewSegmentBase(mem.NewFromAnalyzedDocs(analysisResults), DefaultChunkFactor) + if err != nil { + return err + } } - err := s.prepareSegment(newSegment, ids, batch.InternalOps) + err = s.prepareSegment(newSegment, ids, batch.InternalOps) if err != nil { if newSegment != nil { _ = newSegment.Close() diff --git a/index/scorch/scorch_test.go b/index/scorch/scorch_test.go index 6e8ecb0c..87e9bdb2 100644 --- a/index/scorch/scorch_test.go +++ b/index/scorch/scorch_test.go @@ -1395,7 +1395,7 @@ func TestConcurrentUpdate(t *testing.T) { // do some concurrent updates var wg sync.WaitGroup - for i := 0; i < 10; i++ { + for i := 0; i < 100; i++ { wg.Add(1) go func(i int) { doc := document.NewDocument("1") diff --git a/index/scorch/segment/mem/build.go b/index/scorch/segment/mem/build.go index 554de890..d3344ce3 100644 --- a/index/scorch/segment/mem/build.go +++ b/index/scorch/segment/mem/build.go @@ -267,15 +267,15 @@ func (s *Segment) processDocument(result *index.AnalysisResult) { } func (s *Segment) getOrDefineField(name string) int { - fieldID, ok := s.FieldsMap[name] + fieldIDPlus1, ok := s.FieldsMap[name] if !ok { - fieldID = uint16(len(s.FieldsInv) + 1) - s.FieldsMap[name] = fieldID + fieldIDPlus1 = uint16(len(s.FieldsInv) + 1) + s.FieldsMap[name] = fieldIDPlus1 s.FieldsInv = append(s.FieldsInv, name) s.Dicts = append(s.Dicts, make(map[string]uint64)) s.DictKeys = append(s.DictKeys, make([]string, 0)) } - return int(fieldID - 1) + return int(fieldIDPlus1 - 1) } func (s *Segment) addDocument() int { diff --git a/index/scorch/segment/mem/segment.go b/index/scorch/segment/mem/segment.go index baa4811a..04bdb368 100644 --- a/index/scorch/segment/mem/segment.go +++ b/index/scorch/segment/mem/segment.go @@ -40,35 +40,38 @@ const idFieldID uint16 = 0 // Segment is an in memory implementation of scorch.Segment type Segment struct { - // FieldsMap name -> id+1 + // FieldsMap adds 1 to field id to avoid zero value issues + // name -> field id + 1 FieldsMap map[string]uint16 - // fields id -> name + + // FieldsInv is the inverse of FieldsMap + // field id -> name FieldsInv []string - // term dictionary + // Term dictionaries for each field // field id -> term -> postings list id + 1 Dicts []map[string]uint64 - // term dictionary keys - // field id -> []dictionary keys + // Terms for each field, where terms are sorted ascending + // field id -> []term DictKeys [][]string // Postings list - // postings list id -> Postings bitmap + // postings list id -> bitmap by docNum Postings []*roaring.Bitmap - // Postings List has locations + // Postings list has locations PostingsLocs []*roaring.Bitmap - // term frequencies + // Term frequencies // postings list id -> Freqs (one for each hit in bitmap) Freqs [][]uint64 - // field Norms + // Field norms // postings list id -> Norms (one for each hit in bitmap) Norms [][]float32 - // field/start/end/pos/locarraypos + // Field/start/end/pos/locarraypos // postings list id -> start/end/pos/locarraypos (one for each freq) Locfields [][]uint16 Locstarts [][]uint64 @@ -80,18 +83,18 @@ type Segment struct { // docNum -> field id -> slice of values (each value []byte) Stored []map[uint16][][]byte - // stored field types + // Stored field types // docNum -> field id -> slice of types (each type byte) StoredTypes []map[uint16][]byte - // stored field array positions + // Stored field array positions // docNum -> field id -> slice of array positions (each is []uint64) StoredPos []map[uint16][][]uint64 - // for storing the docValue persisted fields + // For storing the docValue persisted fields DocValueFields map[uint16]bool - // footprint of the segment, updated when analyzed document mutations + // Footprint of the segment, updated when analyzed document mutations // are added into the segment sizeInBytes uint64 } diff --git a/index/scorch/segment/zap/build.go b/index/scorch/segment/zap/build.go index 1b16b5e3..769c0795 100644 --- a/index/scorch/segment/zap/build.go +++ b/index/scorch/segment/zap/build.go @@ -32,10 +32,8 @@ const version uint32 = 2 const fieldNotUninverted = math.MaxUint64 -// PersistSegment takes the in-memory segment and persists it to the specified -// path in the zap file format. -func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (err error) { - +// PersistSegmentBase persists SegmentBase in the zap file format. +func PersistSegmentBase(sb *SegmentBase, path string) error { flag := os.O_RDWR | os.O_CREATE f, err := os.OpenFile(path, flag, 0600) @@ -43,84 +41,151 @@ func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) (e return err } - // bufer the output - br := bufio.NewWriter(f) - - // wrap it for counting (tracking offsets) - cr := NewCountHashWriter(br) - - var storedIndexOffset uint64 - var dictLocs []uint64 - docValueOffset := uint64(fieldNotUninverted) - if len(memSegment.Stored) > 0 { - - storedIndexOffset, err = persistStored(memSegment, cr) - if err != nil { - return err - } - - var freqOffsets, locOffsets []uint64 - freqOffsets, locOffsets, err = persistPostingDetails(memSegment, cr, chunkFactor) - if err != nil { - return err - } - - var postingsListLocs []uint64 - postingsListLocs, err = persistPostingsLocs(memSegment, cr) - if err != nil { - return err - } - - var postingsLocs []uint64 - postingsLocs, err = persistPostingsLists(memSegment, cr, postingsListLocs, freqOffsets, locOffsets) - if err != nil { - return err - } - - dictLocs, err = persistDictionary(memSegment, cr, postingsLocs) - if err != nil { - return err - } - - docValueOffset, err = persistFieldDocValues(cr, chunkFactor, memSegment) - if err != nil { - return err - } - - } else { - dictLocs = make([]uint64, len(memSegment.FieldsInv)) + cleanup := func() { + _ = f.Close() + _ = os.Remove(path) } - var fieldIndexStart uint64 - fieldIndexStart, err = persistFields(memSegment.FieldsInv, cr, dictLocs) + br := bufio.NewWriter(f) + + _, err = br.Write(sb.mem) if err != nil { + cleanup() return err } - err = persistFooter(uint64(len(memSegment.Stored)), storedIndexOffset, - fieldIndexStart, docValueOffset, chunkFactor, cr) + err = persistFooter(sb.numDocs, sb.storedIndexOffset, sb.fieldsIndexOffset, sb.docValueOffset, + sb.chunkFactor, sb.memCRC, br) if err != nil { + cleanup() return err } err = br.Flush() if err != nil { + cleanup() return err } err = f.Sync() if err != nil { + cleanup() return err } err = f.Close() if err != nil { + cleanup() return err } return nil } +// PersistSegment takes the in-memory segment and persists it to +// the specified path in the zap file format. +func PersistSegment(memSegment *mem.Segment, path string, chunkFactor uint32) error { + flag := os.O_RDWR | os.O_CREATE + + f, err := os.OpenFile(path, flag, 0600) + if err != nil { + return err + } + + cleanup := func() { + _ = f.Close() + _ = os.Remove(path) + } + + // buffer the output + br := bufio.NewWriter(f) + + // wrap it for counting (tracking offsets) + cr := NewCountHashWriter(br) + + numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, _, err := + persistBase(memSegment, cr, chunkFactor) + if err != nil { + cleanup() + return err + } + + err = persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, + chunkFactor, cr.Sum32(), cr) + if err != nil { + cleanup() + return err + } + + err = br.Flush() + if err != nil { + cleanup() + return err + } + + err = f.Sync() + if err != nil { + cleanup() + return err + } + + err = f.Close() + if err != nil { + cleanup() + return err + } + + return nil +} + +func persistBase(memSegment *mem.Segment, cr *CountHashWriter, chunkFactor uint32) ( + numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64, + dictLocs []uint64, err error) { + docValueOffset = uint64(fieldNotUninverted) + + if len(memSegment.Stored) > 0 { + storedIndexOffset, err = persistStored(memSegment, cr) + if err != nil { + return 0, 0, 0, 0, nil, err + } + + freqOffsets, locOffsets, err := persistPostingDetails(memSegment, cr, chunkFactor) + if err != nil { + return 0, 0, 0, 0, nil, err + } + + postingsListLocs, err := persistPostingsLocs(memSegment, cr) + if err != nil { + return 0, 0, 0, 0, nil, err + } + + postingsLocs, err := persistPostingsLists(memSegment, cr, postingsListLocs, freqOffsets, locOffsets) + if err != nil { + return 0, 0, 0, 0, nil, err + } + + dictLocs, err = persistDictionary(memSegment, cr, postingsLocs) + if err != nil { + return 0, 0, 0, 0, nil, err + } + + docValueOffset, err = persistFieldDocValues(memSegment, cr, chunkFactor) + if err != nil { + return 0, 0, 0, 0, nil, err + } + } else { + dictLocs = make([]uint64, len(memSegment.FieldsInv)) + } + + fieldsIndexOffset, err = persistFields(memSegment.FieldsInv, cr, dictLocs) + if err != nil { + return 0, 0, 0, 0, nil, err + } + + return uint64(len(memSegment.Stored)), storedIndexOffset, fieldsIndexOffset, docValueOffset, + dictLocs, nil +} + func persistStored(memSegment *mem.Segment, w *CountHashWriter) (uint64, error) { var curr int @@ -356,11 +421,13 @@ func persistPostingDetails(memSegment *mem.Segment, w *CountHashWriter, chunkFac func persistPostingsLocs(memSegment *mem.Segment, w *CountHashWriter) (rv []uint64, err error) { rv = make([]uint64, 0, len(memSegment.PostingsLocs)) + var reuseBuf bytes.Buffer + reuseBufVarint := make([]byte, binary.MaxVarintLen64) for postingID := range memSegment.PostingsLocs { // record where we start this posting loc rv = append(rv, uint64(w.Count())) // write out the length and bitmap - _, err = writeRoaringWithLen(memSegment.PostingsLocs[postingID], w) + _, err = writeRoaringWithLen(memSegment.PostingsLocs[postingID], w, &reuseBuf, reuseBufVarint) if err != nil { return nil, err } @@ -371,6 +438,8 @@ func persistPostingsLocs(memSegment *mem.Segment, w *CountHashWriter) (rv []uint func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter, postingsListLocs, freqOffsets, locOffsets []uint64) (rv []uint64, err error) { rv = make([]uint64, 0, len(memSegment.Postings)) + var reuseBuf bytes.Buffer + reuseBufVarint := make([]byte, binary.MaxVarintLen64) for postingID := range memSegment.Postings { // record where we start this posting list rv = append(rv, uint64(w.Count())) @@ -383,7 +452,7 @@ func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter, } // write out the length and bitmap - _, err = writeRoaringWithLen(memSegment.Postings[postingID], w) + _, err = writeRoaringWithLen(memSegment.Postings[postingID], w, &reuseBuf, reuseBufVarint) if err != nil { return nil, err } @@ -394,6 +463,8 @@ func persistPostingsLists(memSegment *mem.Segment, w *CountHashWriter, func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs []uint64) ([]uint64, error) { rv := make([]uint64, 0, len(memSegment.DictKeys)) + varintBuf := make([]byte, binary.MaxVarintLen64) + var buffer bytes.Buffer for fieldID, fieldTerms := range memSegment.DictKeys { if fieldID != 0 { @@ -427,10 +498,8 @@ func persistDictionary(memSegment *mem.Segment, w *CountHashWriter, postingsLocs vellumData := buffer.Bytes() // write out the length of the vellum data - buf := make([]byte, binary.MaxVarintLen64) - // write out the number of chunks - n := binary.PutUvarint(buf, uint64(len(vellumData))) - _, err = w.Write(buf[:n]) + n := binary.PutUvarint(varintBuf, uint64(len(vellumData))) + _, err = w.Write(varintBuf[:n]) if err != nil { return nil, err } @@ -521,9 +590,8 @@ func persistDocValues(memSegment *mem.Segment, w *CountHashWriter, return fieldChunkOffsets, nil } -func persistFieldDocValues(w *CountHashWriter, chunkFactor uint32, - memSegment *mem.Segment) (uint64, error) { - +func persistFieldDocValues(memSegment *mem.Segment, w *CountHashWriter, + chunkFactor uint32) (uint64, error) { fieldDvOffsets, err := persistDocValues(memSegment, w, chunkFactor) if err != nil { return 0, err @@ -548,3 +616,36 @@ func persistFieldDocValues(w *CountHashWriter, chunkFactor uint32, return fieldDocValuesOffset, nil } + +func NewSegmentBase(memSegment *mem.Segment, chunkFactor uint32) (*SegmentBase, error) { + var br bytes.Buffer + + cr := NewCountHashWriter(&br) + + numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset, dictLocs, err := + persistBase(memSegment, cr, chunkFactor) + if err != nil { + return nil, err + } + + sb := &SegmentBase{ + mem: br.Bytes(), + memCRC: cr.Sum32(), + chunkFactor: chunkFactor, + fieldsMap: memSegment.FieldsMap, + fieldsInv: memSegment.FieldsInv, + numDocs: numDocs, + storedIndexOffset: storedIndexOffset, + fieldsIndexOffset: fieldsIndexOffset, + docValueOffset: docValueOffset, + dictLocs: dictLocs, + fieldDvIterMap: make(map[uint16]*docValueIterator), + } + + err = sb.loadDvIterators() + if err != nil { + return nil, err + } + + return sb, nil +} diff --git a/index/scorch/segment/zap/count.go b/index/scorch/segment/zap/count.go index 2f0b92de..d75e83c0 100644 --- a/index/scorch/segment/zap/count.go +++ b/index/scorch/segment/zap/count.go @@ -15,32 +15,28 @@ package zap import ( - "hash" "hash/crc32" "io" ) // CountHashWriter is a wrapper around a Writer which counts the number of -// bytes which have been written +// bytes which have been written and computes a crc32 hash type CountHashWriter struct { - w io.Writer - h hash.Hash32 - n int + w io.Writer + crc uint32 + n int } // NewCountHashWriter returns a CountHashWriter which wraps the provided Writer func NewCountHashWriter(w io.Writer) *CountHashWriter { - return &CountHashWriter{ - w: w, - h: crc32.NewIEEE(), - } + return &CountHashWriter{w: w} } // Write writes the provided bytes to the wrapped writer and counts the bytes func (c *CountHashWriter) Write(b []byte) (int, error) { n, err := c.w.Write(b) + c.crc = crc32.Update(c.crc, crc32.IEEETable, b[:n]) c.n += n - _, _ = c.h.Write(b) return n, err } @@ -51,5 +47,5 @@ func (c *CountHashWriter) Count() int { // Sum32 returns the CRC-32 hash of the content written to this writer func (c *CountHashWriter) Sum32() uint32 { - return c.h.Sum32() + return c.crc } diff --git a/index/scorch/segment/zap/dict.go b/index/scorch/segment/zap/dict.go index 3221d061..bb6fd947 100644 --- a/index/scorch/segment/zap/dict.go +++ b/index/scorch/segment/zap/dict.go @@ -27,7 +27,7 @@ import ( // Dictionary is the zap representation of the term dictionary type Dictionary struct { - segment *Segment + sb *SegmentBase field string fieldID uint16 fst *vellum.FST @@ -35,18 +35,18 @@ type Dictionary struct { // PostingsList returns the postings list for the specified term func (d *Dictionary) PostingsList(term string, except *roaring.Bitmap) (segment.PostingsList, error) { - return d.postingsList(term, except) + return d.postingsList([]byte(term), except) } -func (d *Dictionary) postingsList(term string, except *roaring.Bitmap) (*PostingsList, error) { +func (d *Dictionary) postingsList(term []byte, except *roaring.Bitmap) (*PostingsList, error) { rv := &PostingsList{ - dictionary: d, - term: term, - except: except, + sb: d.sb, + term: term, + except: except, } if d.fst != nil { - postingsOffset, exists, err := d.fst.Get([]byte(term)) + postingsOffset, exists, err := d.fst.Get(term) if err != nil { return nil, fmt.Errorf("vellum err: %v", err) } @@ -56,19 +56,19 @@ func (d *Dictionary) postingsList(term string, except *roaring.Bitmap) (*Posting var n uint64 var read int - rv.freqOffset, read = binary.Uvarint(d.segment.mm[postingsOffset+n : postingsOffset+binary.MaxVarintLen64]) + rv.freqOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+binary.MaxVarintLen64]) n += uint64(read) - rv.locOffset, read = binary.Uvarint(d.segment.mm[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64]) + rv.locOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64]) n += uint64(read) var locBitmapOffset uint64 - locBitmapOffset, read = binary.Uvarint(d.segment.mm[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64]) + locBitmapOffset, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64]) n += uint64(read) // go ahead and load loc bitmap var locBitmapLen uint64 - locBitmapLen, read = binary.Uvarint(d.segment.mm[locBitmapOffset : locBitmapOffset+binary.MaxVarintLen64]) - locRoaringBytes := d.segment.mm[locBitmapOffset+uint64(read) : locBitmapOffset+uint64(read)+locBitmapLen] + locBitmapLen, read = binary.Uvarint(d.sb.mem[locBitmapOffset : locBitmapOffset+binary.MaxVarintLen64]) + locRoaringBytes := d.sb.mem[locBitmapOffset+uint64(read) : locBitmapOffset+uint64(read)+locBitmapLen] rv.locBitmap = roaring.NewBitmap() _, err := rv.locBitmap.FromBuffer(locRoaringBytes) if err != nil { @@ -76,10 +76,10 @@ func (d *Dictionary) postingsList(term string, except *roaring.Bitmap) (*Posting } var postingsLen uint64 - postingsLen, read = binary.Uvarint(d.segment.mm[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64]) + postingsLen, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64]) n += uint64(read) - roaringBytes := d.segment.mm[postingsOffset+n : postingsOffset+n+postingsLen] + roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen] bitmap := roaring.NewBitmap() _, err = bitmap.FromBuffer(roaringBytes) @@ -96,7 +96,6 @@ func (d *Dictionary) postingsList(term string, except *roaring.Bitmap) (*Posting // Iterator returns an iterator for this dictionary func (d *Dictionary) Iterator() segment.DictionaryIterator { - rv := &DictionaryIterator{ d: d, } diff --git a/index/scorch/segment/zap/docvalues.go b/index/scorch/segment/zap/docvalues.go index e37ecc74..fb5b348a 100644 --- a/index/scorch/segment/zap/docvalues.go +++ b/index/scorch/segment/zap/docvalues.go @@ -61,17 +61,17 @@ func (di *docValueIterator) curChunkNumber() uint64 { return di.curChunkNum } -func (s *Segment) loadFieldDocValueIterator(field string, +func (s *SegmentBase) loadFieldDocValueIterator(field string, fieldDvLoc uint64) (*docValueIterator, error) { // get the docValue offset for the given fields if fieldDvLoc == fieldNotUninverted { - return nil, fmt.Errorf("loadFieldDocValueConfigs: "+ + return nil, fmt.Errorf("loadFieldDocValueIterator: "+ "no docValues found for field: %s", field) } // read the number of chunks, chunk lengths var offset, clen uint64 - numChunks, read := binary.Uvarint(s.mm[fieldDvLoc : fieldDvLoc+binary.MaxVarintLen64]) + numChunks, read := binary.Uvarint(s.mem[fieldDvLoc : fieldDvLoc+binary.MaxVarintLen64]) if read <= 0 { return nil, fmt.Errorf("failed to read the field "+ "doc values for field %s", field) @@ -84,7 +84,7 @@ func (s *Segment) loadFieldDocValueIterator(field string, chunkLens: make([]uint64, int(numChunks)), } for i := 0; i < int(numChunks); i++ { - clen, read = binary.Uvarint(s.mm[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64]) + clen, read = binary.Uvarint(s.mem[fieldDvLoc+offset : fieldDvLoc+offset+binary.MaxVarintLen64]) if read <= 0 { return nil, fmt.Errorf("corrupted chunk length during segment load") } @@ -97,7 +97,7 @@ func (s *Segment) loadFieldDocValueIterator(field string, } func (di *docValueIterator) loadDvChunk(chunkNumber, - localDocNum uint64, s *Segment) error { + localDocNum uint64, s *SegmentBase) error { // advance to the chunk where the docValues // reside for the given docID destChunkDataLoc := di.dvDataLoc @@ -107,7 +107,7 @@ func (di *docValueIterator) loadDvChunk(chunkNumber, curChunkSize := di.chunkLens[chunkNumber] // read the number of docs reside in the chunk - numDocs, read := binary.Uvarint(s.mm[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64]) + numDocs, read := binary.Uvarint(s.mem[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64]) if read <= 0 { return fmt.Errorf("failed to read the chunk") } @@ -116,17 +116,17 @@ func (di *docValueIterator) loadDvChunk(chunkNumber, offset := uint64(0) di.curChunkHeader = make([]MetaData, int(numDocs)) for i := 0; i < int(numDocs); i++ { - di.curChunkHeader[i].DocID, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) + di.curChunkHeader[i].DocID, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) offset += uint64(read) - di.curChunkHeader[i].DocDvLoc, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) + di.curChunkHeader[i].DocDvLoc, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) offset += uint64(read) - di.curChunkHeader[i].DocDvLen, read = binary.Uvarint(s.mm[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) + di.curChunkHeader[i].DocDvLen, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) offset += uint64(read) } compressedDataLoc := chunkMetaLoc + offset dataLength := destChunkDataLoc + curChunkSize - compressedDataLoc - di.curChunkData = s.mm[compressedDataLoc : compressedDataLoc+dataLength] + di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength] di.curChunkNum = chunkNumber return nil } @@ -171,18 +171,18 @@ func (di *docValueIterator) getDocValueLocs(docID uint64) (uint64, uint64) { // VisitDocumentFieldTerms is an implementation of the // DocumentFieldTermVisitable interface -func (s *Segment) VisitDocumentFieldTerms(localDocNum uint64, fields []string, +func (s *SegmentBase) VisitDocumentFieldTerms(localDocNum uint64, fields []string, visitor index.DocumentFieldTermVisitor) error { - fieldID := uint16(0) + fieldIDPlus1 := uint16(0) ok := true for _, field := range fields { - if fieldID, ok = s.fieldsMap[field]; !ok { + if fieldIDPlus1, ok = s.fieldsMap[field]; !ok { continue } // find the chunkNumber where the docValues are stored docInChunk := localDocNum / uint64(s.chunkFactor) - if dvIter, exists := s.fieldDvIterMap[fieldID-1]; exists && + if dvIter, exists := s.fieldDvIterMap[fieldIDPlus1-1]; exists && dvIter != nil { // check if the chunk is already loaded if docInChunk != dvIter.curChunkNumber() { diff --git a/index/scorch/segment/zap/merge.go b/index/scorch/segment/zap/merge.go index 16ec848b..8c06f2fe 100644 --- a/index/scorch/segment/zap/merge.go +++ b/index/scorch/segment/zap/merge.go @@ -29,10 +29,10 @@ import ( "github.com/golang/snappy" ) -// Merge takes a slice of zap segments, bit masks describing which documents -// from the may be dropped, and creates a new segment containing the remaining -// data. This new segment is built at the specified path, with the provided -// chunkFactor. +// Merge takes a slice of zap segments and bit masks describing which +// documents may be dropped, and creates a new segment containing the +// remaining data. This new segment is built at the specified path, +// with the provided chunkFactor. func Merge(segments []*Segment, drops []*roaring.Bitmap, path string, chunkFactor uint32) ([][]uint64, error) { flag := os.O_RDWR | os.O_CREATE @@ -42,6 +42,11 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string, return nil, err } + cleanup := func() { + _ = f.Close() + _ = os.Remove(path) + } + // buffer the output br := bufio.NewWriter(f) @@ -50,52 +55,59 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string, fieldsInv := mergeFields(segments) fieldsMap := mapFields(fieldsInv) - newSegDocCount := computeNewDocCount(segments, drops) var newDocNums [][]uint64 var storedIndexOffset uint64 fieldDvLocsOffset := uint64(fieldNotUninverted) var dictLocs []uint64 + + newSegDocCount := computeNewDocCount(segments, drops) if newSegDocCount > 0 { storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, fieldsMap, fieldsInv, newSegDocCount, cr) if err != nil { + cleanup() return nil, err } dictLocs, fieldDvLocsOffset, err = persistMergedRest(segments, drops, fieldsInv, fieldsMap, newDocNums, newSegDocCount, chunkFactor, cr) if err != nil { + cleanup() return nil, err } } else { dictLocs = make([]uint64, len(fieldsInv)) } - var fieldsIndexOffset uint64 - fieldsIndexOffset, err = persistFields(fieldsInv, cr, dictLocs) + fieldsIndexOffset, err := persistFields(fieldsInv, cr, dictLocs) if err != nil { + cleanup() return nil, err } err = persistFooter(newSegDocCount, storedIndexOffset, - fieldsIndexOffset, fieldDvLocsOffset, chunkFactor, cr) + fieldsIndexOffset, fieldDvLocsOffset, chunkFactor, cr.Sum32(), cr) if err != nil { + cleanup() return nil, err } err = br.Flush() if err != nil { + cleanup() return nil, err } err = f.Sync() if err != nil { + cleanup() return nil, err } err = f.Close() if err != nil { + cleanup() return nil, err } @@ -104,7 +116,7 @@ func Merge(segments []*Segment, drops []*roaring.Bitmap, path string, // mapFields takes the fieldsInv list and builds the map func mapFields(fields []string) map[string]uint16 { - rv := make(map[string]uint16) + rv := make(map[string]uint16, len(fields)) for i, fieldName := range fields { rv[fieldName] = uint16(i) } @@ -114,15 +126,14 @@ func mapFields(fields []string) map[string]uint16 { // computeNewDocCount determines how many documents will be in the newly // merged segment when obsoleted docs are dropped func computeNewDocCount(segments []*Segment, drops []*roaring.Bitmap) uint64 { - var newSegDocCount uint64 + var newDocCount uint64 for segI, segment := range segments { - segIAfterDrop := segment.NumDocs() + newDocCount += segment.NumDocs() if drops[segI] != nil { - segIAfterDrop -= drops[segI].GetCardinality() + newDocCount -= drops[segI].GetCardinality() } - newSegDocCount += segIAfterDrop } - return newSegDocCount + return newDocCount } func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, @@ -130,14 +141,18 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, newSegDocCount uint64, chunkFactor uint32, w *CountHashWriter) ([]uint64, uint64, error) { + var bufReuse bytes.Buffer var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64) var bufLoc []uint64 - rv1 := make([]uint64, len(fieldsInv)) + rv := make([]uint64, len(fieldsInv)) fieldDvLocs := make([]uint64, len(fieldsInv)) fieldDvLocsOffset := uint64(fieldNotUninverted) + var docNumbers docIDRange + var vellumBuf bytes.Buffer + // for each field for fieldID, fieldName := range fieldsInv { if fieldID != 0 { @@ -177,13 +192,15 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, tfEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) locEncoder := newChunkedIntCoder(uint64(chunkFactor), newSegDocCount-1) - fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), newSegDocCount-1) - docTermMap := make(map[uint64][]byte, 0) + + docTermMap := make(map[uint64][]byte, newSegDocCount) + for err == nil { term, _ := mergeItr.Current() newRoaring := roaring.NewBitmap() newRoaringLocs := roaring.NewBitmap() + tfEncoder.Reset() locEncoder.Reset() @@ -193,7 +210,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, if dict == nil { continue } - postings, err2 := dict.postingsList(string(term), drops[dictI]) + postings, err2 := dict.postingsList(term, drops[dictI]) if err2 != nil { return nil, 0, err2 } @@ -209,9 +226,9 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, // encode norm bits norm := next.Norm() normBits := math.Float32bits(float32(norm)) - err3 := tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits)) - if err3 != nil { - return nil, 0, err3 + err = tfEncoder.Add(hitNewDocNum, next.Frequency(), uint64(normBits)) + if err != nil { + return nil, 0, err } locs := next.Locations() if len(locs) > 0 { @@ -234,15 +251,16 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, } } - docTermMap[hitNewDocNum] = append(docTermMap[hitNewDocNum], []byte(term)...) - docTermMap[hitNewDocNum] = append(docTermMap[hitNewDocNum], termSeparator) + docTermMap[hitNewDocNum] = + append(append(docTermMap[hitNewDocNum], term...), termSeparator) + next, err2 = postItr.Next() } - if err != nil { - return nil, 0, err + if err2 != nil { + return nil, 0, err2 } - } + tfEncoder.Close() locEncoder.Close() @@ -259,7 +277,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, return nil, 0, err } postingLocOffset := uint64(w.Count()) - _, err = writeRoaringWithLen(newRoaringLocs, w) + _, err = writeRoaringWithLen(newRoaringLocs, w, &bufReuse, bufMaxVarintLen64) if err != nil { return nil, 0, err } @@ -285,7 +303,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, if err != nil { return nil, 0, err } - _, err = writeRoaringWithLen(newRoaring, w) + _, err = writeRoaringWithLen(newRoaring, w, &bufReuse, bufMaxVarintLen64) if err != nil { return nil, 0, err } @@ -303,6 +321,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, } dictOffset := uint64(w.Count()) + err = newVellum.Close() if err != nil { return nil, 0, err @@ -310,10 +329,8 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, vellumData := vellumBuf.Bytes() // write out the length of the vellum data - buf := bufMaxVarintLen64 - // write out the number of chunks - n := binary.PutUvarint(buf, uint64(len(vellumData))) - _, err = w.Write(buf[:n]) + n := binary.PutUvarint(bufMaxVarintLen64, uint64(len(vellumData))) + _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { return nil, 0, err } @@ -324,27 +341,33 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, return nil, 0, err } - rv1[fieldID] = dictOffset + rv[fieldID] = dictOffset - // update the doc value - var docNumbers docIDRange + // update the doc nums + if cap(docNumbers) < len(docTermMap) { + docNumbers = make(docIDRange, 0, len(docTermMap)) + } + docNumbers = docNumbers[:0] for k := range docTermMap { docNumbers = append(docNumbers, k) } sort.Sort(docNumbers) + fdvEncoder := newChunkedContentCoder(uint64(chunkFactor), newSegDocCount-1) for _, docNum := range docNumbers { err = fdvEncoder.Add(docNum, docTermMap[docNum]) if err != nil { return nil, 0, err } } - // get the field doc value offset - fieldDvLocs[fieldID] = uint64(w.Count()) err = fdvEncoder.Close() if err != nil { return nil, 0, err } + + // get the field doc value offset + fieldDvLocs[fieldID] = uint64(w.Count()) + // persist the doc value details for this field _, err = fdvEncoder.Write(w) if err != nil { @@ -353,7 +376,8 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, } fieldDvLocsOffset = uint64(w.Count()) - buf := make([]byte, binary.MaxVarintLen64) + + buf := bufMaxVarintLen64 for _, offset := range fieldDvLocs { n := binary.PutUvarint(buf, uint64(offset)) _, err := w.Write(buf[:n]) @@ -362,7 +386,7 @@ func persistMergedRest(segments []*Segment, drops []*roaring.Bitmap, } } - return rv1, fieldDvLocsOffset, nil + return rv, fieldDvLocsOffset, nil } const docDropped = math.MaxUint64 @@ -370,13 +394,16 @@ const docDropped = math.MaxUint64 func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, fieldsMap map[string]uint16, fieldsInv []string, newSegDocCount uint64, w *CountHashWriter) (uint64, [][]uint64, error) { - var rv [][]uint64 - var newDocNum int + var rv [][]uint64 // The remapped or newDocNums for each segment. + + var newDocNum uint64 var curr int var metaBuf bytes.Buffer var data, compressed []byte + metaEncoder := govarint.NewU64Base128Encoder(&metaBuf) + vals := make([][][]byte, len(fieldsInv)) typs := make([][]byte, len(fieldsInv)) poss := make([][][]uint64, len(fieldsInv)) @@ -385,118 +412,121 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, // for each segment for segI, segment := range segments { - var segNewDocNums []uint64 + segNewDocNums := make([]uint64, segment.numDocs) // for each doc num for docNum := uint64(0); docNum < segment.numDocs; docNum++ { + // TODO: roaring's API limits docNums to 32-bits? + if drops[segI] != nil && drops[segI].Contains(uint32(docNum)) { + segNewDocNums[docNum] = docDropped + continue + } + + segNewDocNums[docNum] = newDocNum + + curr = 0 metaBuf.Reset() data = data[:0] compressed = compressed[:0] - curr = 0 - metaEncoder := govarint.NewU64Base128Encoder(&metaBuf) - - if drops[segI] != nil && drops[segI].Contains(uint32(docNum)) { - segNewDocNums = append(segNewDocNums, docDropped) - } else { - segNewDocNums = append(segNewDocNums, uint64(newDocNum)) - // collect all the data - for i := 0; i < len(fieldsInv); i++ { - vals[i] = vals[i][:0] - typs[i] = typs[i][:0] - poss[i] = poss[i][:0] - } - err := segment.VisitDocument(docNum, func(field string, typ byte, value []byte, pos []uint64) bool { - fieldID := int(fieldsMap[field]) - vals[fieldID] = append(vals[fieldID], value) - typs[fieldID] = append(typs[fieldID], typ) - poss[fieldID] = append(poss[fieldID], pos) - return true - }) - if err != nil { - return 0, nil, err - } - - // now walk the fields in order - for fieldID := range fieldsInv { - - storedFieldValues := vals[int(fieldID)] - - // has stored values for this field - num := len(storedFieldValues) - - // process each value - for i := 0; i < num; i++ { - // encode field - _, err2 := metaEncoder.PutU64(uint64(fieldID)) - if err2 != nil { - return 0, nil, err2 - } - // encode type - _, err2 = metaEncoder.PutU64(uint64(typs[int(fieldID)][i])) - if err2 != nil { - return 0, nil, err2 - } - // encode start offset - _, err2 = metaEncoder.PutU64(uint64(curr)) - if err2 != nil { - return 0, nil, err2 - } - // end len - _, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) - if err2 != nil { - return 0, nil, err2 - } - // encode number of array pos - _, err2 = metaEncoder.PutU64(uint64(len(poss[int(fieldID)][i]))) - if err2 != nil { - return 0, nil, err2 - } - // encode all array positions - for j := 0; j < len(poss[int(fieldID)][i]); j++ { - _, err2 = metaEncoder.PutU64(poss[int(fieldID)][i][j]) - if err2 != nil { - return 0, nil, err2 - } - } - // append data - data = append(data, storedFieldValues[i]...) - // update curr - curr += len(storedFieldValues[i]) - } - } - - metaEncoder.Close() - metaBytes := metaBuf.Bytes() - compressed = snappy.Encode(compressed, data) - // record where we're about to start writing - docNumOffsets[newDocNum] = uint64(w.Count()) - - // write out the meta len and compressed data len - _, err = writeUvarints(w, - uint64(len(metaBytes)), uint64(len(compressed))) - if err != nil { - return 0, nil, err - } - // now write the meta - _, err = w.Write(metaBytes) - if err != nil { - return 0, nil, err - } - // now write the compressed data - _, err = w.Write(compressed) - if err != nil { - return 0, nil, err - } - - newDocNum++ + // collect all the data + for i := 0; i < len(fieldsInv); i++ { + vals[i] = vals[i][:0] + typs[i] = typs[i][:0] + poss[i] = poss[i][:0] } + err := segment.VisitDocument(docNum, func(field string, typ byte, value []byte, pos []uint64) bool { + fieldID := int(fieldsMap[field]) + vals[fieldID] = append(vals[fieldID], value) + typs[fieldID] = append(typs[fieldID], typ) + poss[fieldID] = append(poss[fieldID], pos) + return true + }) + if err != nil { + return 0, nil, err + } + + // now walk the fields in order + for fieldID := range fieldsInv { + storedFieldValues := vals[int(fieldID)] + + // has stored values for this field + num := len(storedFieldValues) + + // process each value + for i := 0; i < num; i++ { + // encode field + _, err2 := metaEncoder.PutU64(uint64(fieldID)) + if err2 != nil { + return 0, nil, err2 + } + // encode type + _, err2 = metaEncoder.PutU64(uint64(typs[int(fieldID)][i])) + if err2 != nil { + return 0, nil, err2 + } + // encode start offset + _, err2 = metaEncoder.PutU64(uint64(curr)) + if err2 != nil { + return 0, nil, err2 + } + // end len + _, err2 = metaEncoder.PutU64(uint64(len(storedFieldValues[i]))) + if err2 != nil { + return 0, nil, err2 + } + // encode number of array pos + _, err2 = metaEncoder.PutU64(uint64(len(poss[int(fieldID)][i]))) + if err2 != nil { + return 0, nil, err2 + } + // encode all array positions + for j := 0; j < len(poss[int(fieldID)][i]); j++ { + _, err2 = metaEncoder.PutU64(poss[int(fieldID)][i][j]) + if err2 != nil { + return 0, nil, err2 + } + } + // append data + data = append(data, storedFieldValues[i]...) + // update curr + curr += len(storedFieldValues[i]) + } + } + + metaEncoder.Close() + metaBytes := metaBuf.Bytes() + + compressed = snappy.Encode(compressed, data) + + // record where we're about to start writing + docNumOffsets[newDocNum] = uint64(w.Count()) + + // write out the meta len and compressed data len + _, err = writeUvarints(w, uint64(len(metaBytes)), uint64(len(compressed))) + if err != nil { + return 0, nil, err + } + // now write the meta + _, err = w.Write(metaBytes) + if err != nil { + return 0, nil, err + } + // now write the compressed data + _, err = w.Write(compressed) + if err != nil { + return 0, nil, err + } + + newDocNum++ } + rv = append(rv, segNewDocNums) } // return value is the start of the stored index offset := uint64(w.Count()) + // now write out the stored doc index for docNum := range docNumOffsets { err := binary.Write(w, binary.BigEndian, docNumOffsets[docNum]) @@ -511,13 +541,13 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, // mergeFields builds a unified list of fields used across all the input segments func mergeFields(segments []*Segment) []string { fieldsMap := map[string]struct{}{} - for _, segment := range segments { fields := segment.Fields() for _, field := range fields { fieldsMap[field] = struct{}{} } } + rv := make([]string, 0, len(fieldsMap)) // ensure _id stays first rv = append(rv, "_id") @@ -526,6 +556,5 @@ func mergeFields(segments []*Segment) []string { rv = append(rv, k) } } - return rv } diff --git a/index/scorch/segment/zap/posting.go b/index/scorch/segment/zap/posting.go index 1b7a0a58..e8533a12 100644 --- a/index/scorch/segment/zap/posting.go +++ b/index/scorch/segment/zap/posting.go @@ -27,8 +27,8 @@ import ( // PostingsList is an in-memory represenation of a postings list type PostingsList struct { - dictionary *Dictionary - term string + sb *SegmentBase + term []byte postingsOffset uint64 freqOffset uint64 locOffset uint64 @@ -48,11 +48,11 @@ func (p *PostingsList) Iterator() segment.PostingsIterator { var n uint64 var read int var numFreqChunks uint64 - numFreqChunks, read = binary.Uvarint(p.dictionary.segment.mm[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) + numFreqChunks, read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) n += uint64(read) rv.freqChunkLens = make([]uint64, int(numFreqChunks)) for i := 0; i < int(numFreqChunks); i++ { - rv.freqChunkLens[i], read = binary.Uvarint(p.dictionary.segment.mm[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) + rv.freqChunkLens[i], read = binary.Uvarint(p.sb.mem[p.freqOffset+n : p.freqOffset+n+binary.MaxVarintLen64]) n += uint64(read) } rv.freqChunkStart = p.freqOffset + n @@ -60,11 +60,11 @@ func (p *PostingsList) Iterator() segment.PostingsIterator { // prepare the loc chunk details n = 0 var numLocChunks uint64 - numLocChunks, read = binary.Uvarint(p.dictionary.segment.mm[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) + numLocChunks, read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) n += uint64(read) rv.locChunkLens = make([]uint64, int(numLocChunks)) for i := 0; i < int(numLocChunks); i++ { - rv.locChunkLens[i], read = binary.Uvarint(p.dictionary.segment.mm[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) + rv.locChunkLens[i], read = binary.Uvarint(p.sb.mem[p.locOffset+n : p.locOffset+n+binary.MaxVarintLen64]) n += uint64(read) } rv.locChunkStart = p.locOffset + n @@ -133,7 +133,7 @@ func (i *PostingsIterator) loadChunk(chunk int) error { start += i.freqChunkLens[j] } end := start + i.freqChunkLens[chunk] - i.currChunkFreqNorm = i.postings.dictionary.segment.mm[start:end] + i.currChunkFreqNorm = i.postings.sb.mem[start:end] i.freqNormDecoder = govarint.NewU64Base128Decoder(bytes.NewReader(i.currChunkFreqNorm)) start = i.locChunkStart @@ -141,7 +141,7 @@ func (i *PostingsIterator) loadChunk(chunk int) error { start += i.locChunkLens[j] } end = start + i.locChunkLens[chunk] - i.currChunkLoc = i.postings.dictionary.segment.mm[start:end] + i.currChunkLoc = i.postings.sb.mem[start:end] i.locDecoder = govarint.NewU64Base128Decoder(bytes.NewReader(i.currChunkLoc)) i.currChunk = uint32(chunk) return nil @@ -192,7 +192,7 @@ func (i *PostingsIterator) readLocation(l *Location) error { // group these together for less branching if l != nil { - l.field = i.postings.dictionary.segment.fieldsInv[fieldID] + l.field = i.postings.sb.fieldsInv[fieldID] l.pos = pos l.start = start l.end = end @@ -221,9 +221,9 @@ func (i *PostingsIterator) Next() (segment.Posting, error) { return nil, nil } n := i.actual.Next() - nChunk := n / i.postings.dictionary.segment.chunkFactor + nChunk := n / i.postings.sb.chunkFactor allN := i.all.Next() - allNChunk := allN / i.postings.dictionary.segment.chunkFactor + allNChunk := allN / i.postings.sb.chunkFactor // n is the next actual hit (excluding some postings) // allN is the next hit in the full postings diff --git a/index/scorch/segment/zap/read.go b/index/scorch/segment/zap/read.go index c9b3e772..0c5b9e17 100644 --- a/index/scorch/segment/zap/read.go +++ b/index/scorch/segment/zap/read.go @@ -16,16 +16,16 @@ package zap import "encoding/binary" -func (s *Segment) getStoredMetaAndCompressed(docNum uint64) ([]byte, []byte) { +func (s *SegmentBase) getDocStoredMetaAndCompressed(docNum uint64) ([]byte, []byte) { docStoredStartAddr := s.storedIndexOffset + (8 * docNum) - docStoredStart := binary.BigEndian.Uint64(s.mm[docStoredStartAddr : docStoredStartAddr+8]) + docStoredStart := binary.BigEndian.Uint64(s.mem[docStoredStartAddr : docStoredStartAddr+8]) var n uint64 - metaLen, read := binary.Uvarint(s.mm[docStoredStart : docStoredStart+binary.MaxVarintLen64]) + metaLen, read := binary.Uvarint(s.mem[docStoredStart : docStoredStart+binary.MaxVarintLen64]) n += uint64(read) var dataLen uint64 - dataLen, read = binary.Uvarint(s.mm[docStoredStart+n : docStoredStart+n+binary.MaxVarintLen64]) + dataLen, read = binary.Uvarint(s.mem[docStoredStart+n : docStoredStart+n+binary.MaxVarintLen64]) n += uint64(read) - meta := s.mm[docStoredStart+n : docStoredStart+n+metaLen] - data := s.mm[docStoredStart+n+metaLen : docStoredStart+n+metaLen+dataLen] + meta := s.mem[docStoredStart+n : docStoredStart+n+metaLen] + data := s.mem[docStoredStart+n+metaLen : docStoredStart+n+metaLen+dataLen] return meta, data } diff --git a/index/scorch/segment/zap/segment.go b/index/scorch/segment/zap/segment.go index 18d4ea56..94268cac 100644 --- a/index/scorch/segment/zap/segment.go +++ b/index/scorch/segment/zap/segment.go @@ -44,12 +44,15 @@ func Open(path string) (segment.Segment, error) { } rv := &Segment{ - f: f, - mm: mm, - path: path, - fieldsMap: make(map[string]uint16), - fieldDvIterMap: make(map[uint16]*docValueIterator), - refs: 1, + SegmentBase: SegmentBase{ + mem: mm[0 : len(mm)-FooterSize], + fieldsMap: make(map[string]uint16), + fieldDvIterMap: make(map[uint16]*docValueIterator), + }, + f: f, + mm: mm, + path: path, + refs: 1, } err = rv.loadConfig() @@ -73,24 +76,36 @@ func Open(path string) (segment.Segment, error) { return rv, nil } -// Segment implements the segment.Segment inteface over top the zap file format -type Segment struct { - f *os.File - mm mmap.MMap - path string - crc uint32 - version uint32 +// SegmentBase is a memory only, read-only implementation of the +// segment.Segment interface, using zap's data representation. +type SegmentBase struct { + mem []byte + memCRC uint32 chunkFactor uint32 + fieldsMap map[string]uint16 // fieldName -> fieldID+1 + fieldsInv []string // fieldID -> fieldName numDocs uint64 storedIndexOffset uint64 fieldsIndexOffset uint64 + docValueOffset uint64 + dictLocs []uint64 + fieldDvIterMap map[uint16]*docValueIterator // naive chunk cache per field +} - fieldsMap map[string]uint16 - fieldsInv []string - fieldsOffsets []uint64 +func (sb *SegmentBase) AddRef() {} +func (sb *SegmentBase) DecRef() (err error) { return nil } +func (sb *SegmentBase) Close() (err error) { return nil } - docValueOffset uint64 - fieldDvIterMap map[uint16]*docValueIterator // naive chunk cache per field +// Segment implements a persisted segment.Segment interface, by +// embedding an mmap()'ed SegmentBase. +type Segment struct { + SegmentBase + + f *os.File + mm mmap.MMap + path string + version uint32 + crc uint32 m sync.Mutex // Protects the fields that follow. refs int64 @@ -98,17 +113,29 @@ type Segment struct { func (s *Segment) SizeInBytes() uint64 { // 8 /* size of file pointer */ - // 4 /* size of crc -> uint32 */ // 4 /* size of version -> uint32 */ + // 4 /* size of crc -> uint32 */ + sizeOfUints := 16 + + sizeInBytes := (len(s.path) + int(segment.SizeOfString)) + sizeOfUints + + // mutex, refs -> int64 + sizeInBytes += 16 + + // do not include the mmap'ed part + return uint64(sizeInBytes) + s.SegmentBase.SizeInBytes() - uint64(len(s.mem)) +} + +func (s *SegmentBase) SizeInBytes() uint64 { + // 4 /* size of memCRC -> uint32 */ // 4 /* size of chunkFactor -> uint32 */ // 8 /* size of numDocs -> uint64 */ // 8 /* size of storedIndexOffset -> uint64 */ // 8 /* size of fieldsIndexOffset -> uint64 */ // 8 /* size of docValueOffset -> uint64 */ - sizeOfUints := 52 + sizeInBytes := 40 - // Do not include the mmap'ed part - sizeInBytes := (len(s.path) + int(segment.SizeOfString)) + sizeOfUints + sizeInBytes += len(s.mem) + int(segment.SizeOfSlice) // fieldsMap for k, _ := range s.fieldsMap { @@ -116,12 +143,12 @@ func (s *Segment) SizeInBytes() uint64 { } sizeInBytes += int(segment.SizeOfMap) /* overhead from map */ - // fieldsInv, fieldsOffsets + // fieldsInv, dictLocs for _, entry := range s.fieldsInv { sizeInBytes += (len(entry) + int(segment.SizeOfString)) } - sizeInBytes += len(s.fieldsOffsets) * 8 /* size of uint64 */ - sizeInBytes += int(segment.SizeOfSlice) * 2 /* overhead from slices */ + sizeInBytes += len(s.dictLocs) * 8 /* size of uint64 */ + sizeInBytes += int(segment.SizeOfSlice) * 3 /* overhead from slices */ // fieldDvIterMap sizeInBytes += len(s.fieldDvIterMap) * @@ -133,9 +160,6 @@ func (s *Segment) SizeInBytes() uint64 { } sizeInBytes += int(segment.SizeOfMap) - // mutex, refs -> int64 - sizeInBytes += 16 - return uint64(sizeInBytes) } @@ -158,47 +182,50 @@ func (s *Segment) DecRef() (err error) { func (s *Segment) loadConfig() error { crcOffset := len(s.mm) - 4 s.crc = binary.BigEndian.Uint32(s.mm[crcOffset : crcOffset+4]) + verOffset := crcOffset - 4 s.version = binary.BigEndian.Uint32(s.mm[verOffset : verOffset+4]) if s.version != version { return fmt.Errorf("unsupported version %d", s.version) } + chunkOffset := verOffset - 4 s.chunkFactor = binary.BigEndian.Uint32(s.mm[chunkOffset : chunkOffset+4]) docValueOffset := chunkOffset - 8 s.docValueOffset = binary.BigEndian.Uint64(s.mm[docValueOffset : docValueOffset+8]) - fieldsOffset := docValueOffset - 8 - s.fieldsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsOffset : fieldsOffset+8]) - storedOffset := fieldsOffset - 8 - s.storedIndexOffset = binary.BigEndian.Uint64(s.mm[storedOffset : storedOffset+8]) - docNumOffset := storedOffset - 8 - s.numDocs = binary.BigEndian.Uint64(s.mm[docNumOffset : docNumOffset+8]) + fieldsIndexOffset := docValueOffset - 8 + s.fieldsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8]) + + storedIndexOffset := fieldsIndexOffset - 8 + s.storedIndexOffset = binary.BigEndian.Uint64(s.mm[storedIndexOffset : storedIndexOffset+8]) + + numDocsOffset := storedIndexOffset - 8 + s.numDocs = binary.BigEndian.Uint64(s.mm[numDocsOffset : numDocsOffset+8]) return nil - } -func (s *Segment) loadFields() error { - // NOTE for now we assume the fields index immediately preceeds the footer - // if this changes, need to adjust accordingly (or store epxlicit length) - fieldsIndexEnd := uint64(len(s.mm) - FooterSize) +func (s *SegmentBase) loadFields() error { + // NOTE for now we assume the fields index immediately preceeds + // the footer, and if this changes, need to adjust accordingly (or + // store explicit length), where s.mem was sliced from s.mm in Open(). + fieldsIndexEnd := uint64(len(s.mem)) // iterate through fields index var fieldID uint64 for s.fieldsIndexOffset+(8*fieldID) < fieldsIndexEnd { - addr := binary.BigEndian.Uint64(s.mm[s.fieldsIndexOffset+(8*fieldID) : s.fieldsIndexOffset+(8*fieldID)+8]) - var n uint64 + addr := binary.BigEndian.Uint64(s.mem[s.fieldsIndexOffset+(8*fieldID) : s.fieldsIndexOffset+(8*fieldID)+8]) - dictLoc, read := binary.Uvarint(s.mm[addr+n : fieldsIndexEnd]) - n += uint64(read) - s.fieldsOffsets = append(s.fieldsOffsets, dictLoc) + dictLoc, read := binary.Uvarint(s.mem[addr:fieldsIndexEnd]) + n := uint64(read) + s.dictLocs = append(s.dictLocs, dictLoc) var nameLen uint64 - nameLen, read = binary.Uvarint(s.mm[addr+n : fieldsIndexEnd]) + nameLen, read = binary.Uvarint(s.mem[addr+n : fieldsIndexEnd]) n += uint64(read) - name := string(s.mm[addr+n : addr+n+nameLen]) + name := string(s.mem[addr+n : addr+n+nameLen]) s.fieldsInv = append(s.fieldsInv, name) s.fieldsMap[name] = uint16(fieldID + 1) @@ -208,7 +235,7 @@ func (s *Segment) loadFields() error { } // Dictionary returns the term dictionary for the specified field -func (s *Segment) Dictionary(field string) (segment.TermDictionary, error) { +func (s *SegmentBase) Dictionary(field string) (segment.TermDictionary, error) { dict, err := s.dictionary(field) if err == nil && dict == nil { return &segment.EmptyDictionary{}, nil @@ -216,21 +243,20 @@ func (s *Segment) Dictionary(field string) (segment.TermDictionary, error) { return dict, err } -func (s *Segment) dictionary(field string) (rv *Dictionary, err error) { - rv = &Dictionary{ - segment: s, - field: field, - } +func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) { + fieldIDPlus1 := sb.fieldsMap[field] + if fieldIDPlus1 > 0 { + rv = &Dictionary{ + sb: sb, + field: field, + fieldID: fieldIDPlus1 - 1, + } - rv.fieldID = s.fieldsMap[field] - if rv.fieldID > 0 { - rv.fieldID = rv.fieldID - 1 - - dictStart := s.fieldsOffsets[rv.fieldID] + dictStart := sb.dictLocs[rv.fieldID] if dictStart > 0 { // read the length of the vellum data - vellumLen, read := binary.Uvarint(s.mm[dictStart : dictStart+binary.MaxVarintLen64]) - fstBytes := s.mm[dictStart+uint64(read) : dictStart+uint64(read)+vellumLen] + vellumLen, read := binary.Uvarint(sb.mem[dictStart : dictStart+binary.MaxVarintLen64]) + fstBytes := sb.mem[dictStart+uint64(read) : dictStart+uint64(read)+vellumLen] if fstBytes != nil { rv.fst, err = vellum.Load(fstBytes) if err != nil { @@ -238,9 +264,6 @@ func (s *Segment) dictionary(field string) (rv *Dictionary, err error) { } } } - - } else { - return nil, nil } return rv, nil @@ -248,10 +271,10 @@ func (s *Segment) dictionary(field string) (rv *Dictionary, err error) { // VisitDocument invokes the DocFieldValueVistor for each stored field // for the specified doc number -func (s *Segment) VisitDocument(num uint64, visitor segment.DocumentFieldValueVisitor) error { +func (s *SegmentBase) VisitDocument(num uint64, visitor segment.DocumentFieldValueVisitor) error { // first make sure this is a valid number in this segment if num < s.numDocs { - meta, compressed := s.getStoredMetaAndCompressed(num) + meta, compressed := s.getDocStoredMetaAndCompressed(num) uncompressed, err := snappy.Decode(nil, compressed) if err != nil { return err @@ -305,13 +328,13 @@ func (s *Segment) VisitDocument(num uint64, visitor segment.DocumentFieldValueVi } // Count returns the number of documents in this segment. -func (s *Segment) Count() uint64 { +func (s *SegmentBase) Count() uint64 { return s.numDocs } // DocNumbers returns a bitset corresponding to the doc numbers of all the // provided _id strings -func (s *Segment) DocNumbers(ids []string) (*roaring.Bitmap, error) { +func (s *SegmentBase) DocNumbers(ids []string) (*roaring.Bitmap, error) { rv := roaring.New() if len(s.fieldsMap) > 0 { @@ -321,7 +344,7 @@ func (s *Segment) DocNumbers(ids []string) (*roaring.Bitmap, error) { } for _, id := range ids { - postings, err := idDict.postingsList(id, nil) + postings, err := idDict.postingsList([]byte(id), nil) if err != nil { return nil, err } @@ -335,7 +358,7 @@ func (s *Segment) DocNumbers(ids []string) (*roaring.Bitmap, error) { } // Fields returns the field names used in this segment -func (s *Segment) Fields() []string { +func (s *SegmentBase) Fields() []string { return s.fieldsInv } @@ -409,23 +432,22 @@ func (s *Segment) NumDocs() uint64 { // DictAddr is a helper function to compute the file offset where the // dictionary is stored for the specified field. func (s *Segment) DictAddr(field string) (uint64, error) { - var fieldID uint16 - var ok bool - if fieldID, ok = s.fieldsMap[field]; !ok { + fieldIDPlus1, ok := s.fieldsMap[field] + if !ok { return 0, fmt.Errorf("no such field '%s'", field) } - return s.fieldsOffsets[fieldID-1], nil + return s.dictLocs[fieldIDPlus1-1], nil } -func (s *Segment) loadDvIterators() error { +func (s *SegmentBase) loadDvIterators() error { if s.docValueOffset == fieldNotUninverted { return nil } var read uint64 for fieldID, field := range s.fieldsInv { - fieldLoc, n := binary.Uvarint(s.mm[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64]) + fieldLoc, n := binary.Uvarint(s.mem[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64]) if n <= 0 { return fmt.Errorf("loadDvIterators: failed to read the docvalue offsets for field %d", fieldID) } diff --git a/index/scorch/segment/zap/write.go b/index/scorch/segment/zap/write.go index cfb7e46e..c5316a99 100644 --- a/index/scorch/segment/zap/write.go +++ b/index/scorch/segment/zap/write.go @@ -23,42 +23,40 @@ import ( ) // writes out the length of the roaring bitmap in bytes as varint -// then writs out the roaring bitmap itself -func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer) (int, error) { - var buffer bytes.Buffer +// then writes out the roaring bitmap itself +func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer, + reuseBuf *bytes.Buffer, reuseBufVarint []byte) (int, error) { + reuseBuf.Reset() + // write out postings list to memory so we know the len - postingsListLen, err := r.WriteTo(&buffer) + postingsListLen, err := r.WriteTo(reuseBuf) if err != nil { return 0, err } var tw int // write out the length of this postings list - buf := make([]byte, binary.MaxVarintLen64) - n := binary.PutUvarint(buf, uint64(postingsListLen)) - nw, err := w.Write(buf[:n]) + n := binary.PutUvarint(reuseBufVarint, uint64(postingsListLen)) + nw, err := w.Write(reuseBufVarint[:n]) tw += nw if err != nil { return tw, err } - // write out the postings list itself - nw, err = w.Write(buffer.Bytes()) + nw, err = w.Write(reuseBuf.Bytes()) tw += nw if err != nil { return tw, err } - return tw, nil } func persistFields(fieldsInv []string, w *CountHashWriter, dictLocs []uint64) (uint64, error) { var rv uint64 + var fieldsOffsets []uint64 - var fieldStarts []uint64 for fieldID, fieldName := range fieldsInv { - // record start of this field - fieldStarts = append(fieldStarts, uint64(w.Count())) + fieldsOffsets = append(fieldsOffsets, uint64(w.Count())) // write out the dict location and field name length _, err := writeUvarints(w, dictLocs[fieldID], uint64(len(fieldName))) @@ -76,7 +74,7 @@ func persistFields(fieldsInv []string, w *CountHashWriter, dictLocs []uint64) (u // now write out the fields index rv = uint64(w.Count()) for fieldID := range fieldsInv { - err := binary.Write(w, binary.BigEndian, fieldStarts[fieldID]) + err := binary.Write(w, binary.BigEndian, fieldsOffsets[fieldID]) if err != nil { return 0, err } @@ -89,8 +87,11 @@ func persistFields(fieldsInv []string, w *CountHashWriter, dictLocs []uint64) (u // crc + ver + chunk + field offset + stored offset + num docs + docValueOffset const FooterSize = 4 + 4 + 4 + 8 + 8 + 8 + 8 -func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset, docValueOffset uint64, - chunkFactor uint32, w *CountHashWriter) error { +func persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, docValueOffset uint64, + chunkFactor uint32, crcBeforeFooter uint32, writerIn io.Writer) error { + w := NewCountHashWriter(writerIn) + w.crc = crcBeforeFooter + // write out the number of docs err := binary.Write(w, binary.BigEndian, numDocs) if err != nil { @@ -102,7 +103,7 @@ func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset, docValueOffset return err } // write out the field index location - err = binary.Write(w, binary.BigEndian, fieldIndexOffset) + err = binary.Write(w, binary.BigEndian, fieldsIndexOffset) if err != nil { return err } @@ -122,7 +123,7 @@ func persistFooter(numDocs, storedIndexOffset, fieldIndexOffset, docValueOffset return err } // write out CRC-32 of everything upto but not including this CRC - err = binary.Write(w, binary.BigEndian, w.Sum32()) + err = binary.Write(w, binary.BigEndian, w.crc) if err != nil { return err }