Merge branch 'scorch', remote-tracking branch 'origin' into docValue_persisted
This commit is contained in:
commit
61ba81e964
|
@ -16,6 +16,7 @@ package scorch
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment"
|
||||
|
@ -38,8 +39,9 @@ type epochWatcher struct {
|
|||
}
|
||||
|
||||
type snapshotReversion struct {
|
||||
snapshot *IndexSnapshot
|
||||
applied chan error
|
||||
snapshot *IndexSnapshot
|
||||
applied chan error
|
||||
persisted chan error
|
||||
}
|
||||
|
||||
func (s *Scorch) mainLoop() {
|
||||
|
@ -141,12 +143,17 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
|
|||
}
|
||||
// append new segment, if any, to end of the new index snapshot
|
||||
if next.data != nil {
|
||||
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
|
||||
newSegmentSnapshot := &SegmentSnapshot{
|
||||
id: next.id,
|
||||
segment: next.data, // take ownership of next.data's ref-count
|
||||
cachedDocs: &cachedDocs{cache: nil},
|
||||
})
|
||||
}
|
||||
newSnapshot.segment = append(newSnapshot.segment, newSegmentSnapshot)
|
||||
newSnapshot.offsets = append(newSnapshot.offsets, running)
|
||||
|
||||
// increment numItemsIntroduced which tracks the number of items
|
||||
// queued for persistence.
|
||||
atomic.AddUint64(&s.stats.numItemsIntroduced, newSegmentSnapshot.Count())
|
||||
}
|
||||
// copy old values
|
||||
for key, oldVal := range s.root.internal {
|
||||
|
@ -285,6 +292,10 @@ func (s *Scorch) revertToSnapshot(revertTo *snapshotReversion) error {
|
|||
segmentSnapshot.segment.AddRef()
|
||||
}
|
||||
|
||||
if revertTo.persisted != nil {
|
||||
s.rootPersisted = append(s.rootPersisted, revertTo.persisted)
|
||||
}
|
||||
|
||||
// swap in new snapshot
|
||||
rootPrev := s.root
|
||||
s.root = newSnapshot
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"log"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/blevesearch/bleve/index/scorch/mergeplan"
|
||||
|
@ -42,6 +43,8 @@ OUTER:
|
|||
s.rootLock.RUnlock()
|
||||
|
||||
if ourSnapshot.epoch != lastEpochMergePlanned {
|
||||
startTime := time.Now()
|
||||
|
||||
// lets get started
|
||||
err := s.planMergeAtSnapshot(ourSnapshot)
|
||||
if err != nil {
|
||||
|
@ -50,6 +53,9 @@ OUTER:
|
|||
continue OUTER
|
||||
}
|
||||
lastEpochMergePlanned = ourSnapshot.epoch
|
||||
|
||||
s.fireEvent(EventKindMergerProgress, time.Since(startTime))
|
||||
|
||||
}
|
||||
_ = ourSnapshot.DecRef()
|
||||
|
||||
|
@ -71,6 +77,8 @@ OUTER:
|
|||
s.rootLock.RUnlock()
|
||||
|
||||
if ourSnapshot.epoch != lastEpochMergePlanned {
|
||||
startTime := time.Now()
|
||||
|
||||
// lets get started
|
||||
err := s.planMergeAtSnapshot(ourSnapshot)
|
||||
if err != nil {
|
||||
|
@ -78,6 +86,8 @@ OUTER:
|
|||
continue OUTER
|
||||
}
|
||||
lastEpochMergePlanned = ourSnapshot.epoch
|
||||
|
||||
s.fireEvent(EventKindMergerProgress, time.Since(startTime))
|
||||
}
|
||||
_ = ourSnapshot.DecRef()
|
||||
|
||||
|
|
|
@ -24,6 +24,8 @@ import (
|
|||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment"
|
||||
|
@ -63,6 +65,8 @@ OUTER:
|
|||
s.rootLock.Unlock()
|
||||
|
||||
if ourSnapshot != nil {
|
||||
startTime := time.Now()
|
||||
|
||||
err := s.persistSnapshot(ourSnapshot)
|
||||
for _, ch := range ourPersisted {
|
||||
if err != nil {
|
||||
|
@ -75,6 +79,7 @@ OUTER:
|
|||
_ = ourSnapshot.DecRef()
|
||||
continue OUTER
|
||||
}
|
||||
|
||||
lastPersistedEpoch = ourSnapshot.epoch
|
||||
for _, notifyCh := range notifyChs {
|
||||
close(notifyCh)
|
||||
|
@ -88,6 +93,9 @@ OUTER:
|
|||
changed = true
|
||||
}
|
||||
s.rootLock.RUnlock()
|
||||
|
||||
s.fireEvent(EventKindPersisterProgress, time.Since(startTime))
|
||||
|
||||
if changed {
|
||||
continue OUTER
|
||||
}
|
||||
|
@ -243,6 +251,8 @@ func (s *Scorch) persistSnapshot(snapshot *IndexSnapshot) error {
|
|||
cachedDocs: segmentSnapshot.cachedDocs,
|
||||
}
|
||||
newIndexSnapshot.segment[i] = newSegmentSnapshot
|
||||
// update items persisted incase of a new segment snapshot
|
||||
atomic.AddUint64(&s.stats.numItemsPersisted, newSegmentSnapshot.Count())
|
||||
} else {
|
||||
newIndexSnapshot.segment[i] = s.root.segment[i]
|
||||
newIndexSnapshot.segment[i].segment.AddRef()
|
||||
|
|
|
@ -72,9 +72,44 @@ type Scorch struct {
|
|||
|
||||
eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC.
|
||||
ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet.
|
||||
|
||||
onEvent func(event Event)
|
||||
}
|
||||
|
||||
func NewScorch(storeName string, config map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
|
||||
// Event represents the information provided in an OnEvent() callback.
|
||||
type Event struct {
|
||||
Kind EventKind
|
||||
Scorch *Scorch
|
||||
Duration time.Duration
|
||||
}
|
||||
|
||||
// EventKind represents an event code for OnEvent() callbacks.
|
||||
type EventKind int
|
||||
|
||||
// EventKindCLoseStart is fired when a Scorch.Close() has begun.
|
||||
var EventKindCloseStart = EventKind(1)
|
||||
|
||||
// EventKindClose is fired when a scorch index has been fully closed.
|
||||
var EventKindClose = EventKind(2)
|
||||
|
||||
// EventKindMergerProgress is fired when the merger has completed a
|
||||
// round of merge processing.
|
||||
var EventKindMergerProgress = EventKind(3)
|
||||
|
||||
// EventKindPersisterProgress is fired when the persister has completed
|
||||
// a round of persistence processing.
|
||||
var EventKindPersisterProgress = EventKind(4)
|
||||
|
||||
// EventKindBatchIntroductionStart is fired when Batch() is invoked which
|
||||
// introduces a new segment.
|
||||
var EventKindBatchIntroductionStart = EventKind(5)
|
||||
|
||||
// EventKindBatchIntroduction is fired when Batch() completes.
|
||||
var EventKindBatchIntroduction = EventKind(6)
|
||||
|
||||
func NewScorch(storeName string,
|
||||
config map[string]interface{},
|
||||
analysisQueue *index.AnalysisQueue) (index.Index, error) {
|
||||
rv := &Scorch{
|
||||
version: Version,
|
||||
config: config,
|
||||
|
@ -96,6 +131,16 @@ func NewScorch(storeName string, config map[string]interface{}, analysisQueue *i
|
|||
return rv, nil
|
||||
}
|
||||
|
||||
func (s *Scorch) SetEventCallback(f func(Event)) {
|
||||
s.onEvent = f
|
||||
}
|
||||
|
||||
func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) {
|
||||
if s.onEvent != nil {
|
||||
s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scorch) Open() error {
|
||||
var ok bool
|
||||
s.path, ok = s.config["path"].(string)
|
||||
|
@ -163,6 +208,13 @@ func (s *Scorch) Open() error {
|
|||
}
|
||||
|
||||
func (s *Scorch) Close() (err error) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
s.fireEvent(EventKindClose, time.Since(startTime))
|
||||
}()
|
||||
|
||||
s.fireEvent(EventKindCloseStart, 0)
|
||||
|
||||
// signal to async tasks we want to close
|
||||
close(s.closeCh)
|
||||
// wait for them to close
|
||||
|
@ -195,11 +247,16 @@ func (s *Scorch) Delete(id string) error {
|
|||
|
||||
// Batch applices a batch of changes to the index atomically
|
||||
func (s *Scorch) Batch(batch *index.Batch) error {
|
||||
analysisStart := time.Now()
|
||||
start := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.fireEvent(EventKindBatchIntroduction, time.Since(start))
|
||||
}()
|
||||
|
||||
resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
|
||||
|
||||
var numUpdates uint64
|
||||
var numDeletes uint64
|
||||
var numPlainTextBytes uint64
|
||||
var ids []string
|
||||
for docID, doc := range batch.IndexOps {
|
||||
|
@ -208,6 +265,8 @@ func (s *Scorch) Batch(batch *index.Batch) error {
|
|||
doc.AddField(document.NewTextFieldCustom("_id", nil, []byte(doc.ID), document.IndexField|document.StoreField, nil))
|
||||
numUpdates++
|
||||
numPlainTextBytes += doc.NumPlainTextBytes()
|
||||
} else {
|
||||
numDeletes++
|
||||
}
|
||||
ids = append(ids, docID)
|
||||
}
|
||||
|
@ -234,7 +293,10 @@ func (s *Scorch) Batch(batch *index.Batch) error {
|
|||
}
|
||||
close(resultChan)
|
||||
|
||||
atomic.AddUint64(&s.stats.analysisTime, uint64(time.Since(analysisStart)))
|
||||
atomic.AddUint64(&s.stats.analysisTime, uint64(time.Since(start)))
|
||||
|
||||
// notify handlers that we're about to introduce a segment
|
||||
s.fireEvent(EventKindBatchIntroductionStart, 0)
|
||||
|
||||
var newSegment segment.Segment
|
||||
if len(analysisResults) > 0 {
|
||||
|
@ -242,8 +304,16 @@ func (s *Scorch) Batch(batch *index.Batch) error {
|
|||
}
|
||||
|
||||
err := s.prepareSegment(newSegment, ids, batch.InternalOps)
|
||||
if err != nil && newSegment != nil {
|
||||
_ = newSegment.Close()
|
||||
if err != nil {
|
||||
if newSegment != nil {
|
||||
_ = newSegment.Close()
|
||||
}
|
||||
atomic.AddUint64(&s.stats.errors, 1)
|
||||
} else {
|
||||
atomic.AddUint64(&s.stats.updates, numUpdates)
|
||||
atomic.AddUint64(&s.stats.deletes, numDeletes)
|
||||
atomic.AddUint64(&s.stats.batches, 1)
|
||||
atomic.AddUint64(&s.stats.numPlainTextBytesIndexed, numPlainTextBytes)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -358,6 +428,21 @@ func (s *Scorch) AddEligibleForRemoval(epoch uint64) {
|
|||
s.rootLock.Unlock()
|
||||
}
|
||||
|
||||
func (s *Scorch) MemoryUsed() uint64 {
|
||||
var memUsed uint64
|
||||
s.rootLock.RLock()
|
||||
for _, segmentSnapshot := range s.root.segment {
|
||||
memUsed += 8 /* size of id -> uint64 */ +
|
||||
segmentSnapshot.segment.SizeInBytes()
|
||||
if segmentSnapshot.deleted != nil {
|
||||
memUsed += segmentSnapshot.deleted.GetSizeInBytes()
|
||||
}
|
||||
memUsed += segmentSnapshot.cachedDocs.sizeInBytes()
|
||||
}
|
||||
s.rootLock.RUnlock()
|
||||
return memUsed
|
||||
}
|
||||
|
||||
func (s *Scorch) markIneligibleForRemoval(filename string) {
|
||||
s.rootLock.Lock()
|
||||
s.ineligibleForRemoval[filename] = true
|
||||
|
|
|
@ -41,6 +41,9 @@ func NewFromAnalyzedDocs(results []*index.AnalysisResult) *Segment {
|
|||
sort.Strings(dict)
|
||||
}
|
||||
|
||||
// compute memory usage of segment
|
||||
s.updateSizeInBytes()
|
||||
|
||||
// professional debugging
|
||||
//
|
||||
// log.Printf("fields: %v\n", s.FieldsMap)
|
||||
|
|
|
@ -91,6 +91,10 @@ type Segment struct {
|
|||
// for storing the docValue persisted fields
|
||||
// field id
|
||||
DocValueFields []uint16
|
||||
|
||||
// footprint of the segment, updated when analyzed document mutations
|
||||
// are added into the segment
|
||||
sizeInBytes uint64
|
||||
}
|
||||
|
||||
// New builds a new empty Segment
|
||||
|
@ -100,6 +104,70 @@ func New() *Segment {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Segment) updateSizeInBytes() {
|
||||
var sizeInBytes uint64
|
||||
|
||||
for k, _ := range s.FieldsMap {
|
||||
sizeInBytes += uint64(len(k)*2 /* FieldsMap + FieldsInv */ +
|
||||
2 /* size of uint16 */)
|
||||
}
|
||||
|
||||
for _, entry := range s.Dicts {
|
||||
for k, _ := range entry {
|
||||
sizeInBytes += uint64(len(k)*2 /* Dicts + DictKeys */ +
|
||||
8 /* size of uint64 */)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < len(s.Postings); i++ {
|
||||
sizeInBytes += s.Postings[i].GetSizeInBytes() + s.PostingsLocs[i].GetSizeInBytes()
|
||||
}
|
||||
|
||||
for i := 0; i < len(s.Freqs); i++ {
|
||||
sizeInBytes += uint64(len(s.Freqs[i])*8 /* size of uint64 */ +
|
||||
len(s.Norms[i])*4 /* size of float32 */)
|
||||
}
|
||||
|
||||
for i := 0; i < len(s.Locfields); i++ {
|
||||
sizeInBytes += uint64(len(s.Locfields[i])*2 /* size of uint16 */ +
|
||||
len(s.Locstarts[i])*8 /* size of uint64 */ +
|
||||
len(s.Locends[i])*8 /* size of uint64 */ +
|
||||
len(s.Locpos[i])*8 /* size of uint64 */)
|
||||
|
||||
for j := 0; j < len(s.Locarraypos[i]); j++ {
|
||||
sizeInBytes += uint64(len(s.Locarraypos[i][j]) * 8 /* size of uint64 */)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < len(s.Stored); i++ {
|
||||
for _, v := range s.Stored[i] {
|
||||
sizeInBytes += uint64(2 /* size of uint16 */)
|
||||
for _, arr := range v {
|
||||
sizeInBytes += uint64(len(arr))
|
||||
}
|
||||
}
|
||||
|
||||
for _, v := range s.StoredTypes[i] {
|
||||
sizeInBytes += uint64(2 /* size of uint16 */ + len(v))
|
||||
}
|
||||
|
||||
for _, v := range s.StoredPos[i] {
|
||||
sizeInBytes += uint64(2 /* size of uint16 */)
|
||||
for _, arr := range v {
|
||||
sizeInBytes += uint64(len(arr) * 8 /* size of uint64 */)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sizeInBytes += uint64(8 /* size of sizeInBytes -> uint64*/)
|
||||
|
||||
s.sizeInBytes = sizeInBytes
|
||||
}
|
||||
|
||||
func (s *Segment) SizeInBytes() uint64 {
|
||||
return s.sizeInBytes
|
||||
}
|
||||
|
||||
func (s *Segment) AddRef() {
|
||||
}
|
||||
|
||||
|
|
|
@ -169,6 +169,10 @@ func TestSingle(t *testing.T) {
|
|||
t.Fatalf("segment nil, not expected")
|
||||
}
|
||||
|
||||
if segment.SizeInBytes() <= 0 {
|
||||
t.Fatalf("segment size not updated")
|
||||
}
|
||||
|
||||
expectFields := map[string]struct{}{
|
||||
"_id": struct{}{},
|
||||
"_all": struct{}{},
|
||||
|
|
|
@ -36,6 +36,8 @@ type Segment interface {
|
|||
|
||||
Close() error
|
||||
|
||||
SizeInBytes() uint64
|
||||
|
||||
AddRef()
|
||||
DecRef() error
|
||||
}
|
||||
|
|
|
@ -96,6 +96,31 @@ type Segment struct {
|
|||
refs int64
|
||||
}
|
||||
|
||||
func (s *Segment) SizeInBytes() uint64 {
|
||||
// 4 /* size of crc -> uint32 */ +
|
||||
// 4 /* size of version -> uint32 */ +
|
||||
// 4 /* size of chunkFactor -> uint32 */ +
|
||||
// 8 /* size of numDocs -> uint64 */ +
|
||||
// 8 /* size of storedIndexOffset -> uint64 */ +
|
||||
// 8 /* size of fieldsIndexOffset -> uint64 */
|
||||
sizeOfUints := 36
|
||||
|
||||
sizeInBytes := len(s.mm) + len(s.path) + sizeOfUints
|
||||
|
||||
for k, _ := range s.fieldsMap {
|
||||
sizeInBytes += len(k) + 2 /* size of uint16 */
|
||||
}
|
||||
|
||||
for _, entry := range s.fieldsInv {
|
||||
sizeInBytes += len(entry)
|
||||
}
|
||||
|
||||
sizeInBytes += len(s.fieldsOffsets) * 8 /* size of uint64 */
|
||||
sizeInBytes += 8 /* size of refs -> int64 */
|
||||
|
||||
return uint64(sizeInBytes)
|
||||
}
|
||||
|
||||
func (s *Segment) AddRef() {
|
||||
s.m.Lock()
|
||||
s.refs++
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/blevesearch/bleve/document"
|
||||
|
@ -363,6 +364,7 @@ func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
|
|||
rv.postings[i] = pl
|
||||
rv.iterators[i] = pl.Iterator()
|
||||
}
|
||||
atomic.AddUint64(&i.parent.stats.termSearchersStarted, uint64(1))
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ package scorch
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/blevesearch/bleve/index"
|
||||
"github.com/blevesearch/bleve/index/scorch/segment"
|
||||
|
@ -124,5 +125,8 @@ func (i *IndexSnapshotTermFieldReader) Count() uint64 {
|
|||
}
|
||||
|
||||
func (i *IndexSnapshotTermFieldReader) Close() error {
|
||||
if i.snapshot != nil {
|
||||
atomic.AddUint64(&i.snapshot.parent.stats.termSearchersFinished, uint64(1))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -95,8 +95,21 @@ func (s *Scorch) SnapshotRevert(revertTo *IndexSnapshot) error {
|
|||
applied: make(chan error),
|
||||
}
|
||||
|
||||
if !s.unsafeBatch {
|
||||
revert.persisted = make(chan error)
|
||||
}
|
||||
|
||||
s.revertToSnapshots <- revert
|
||||
|
||||
// block until this IndexSnapshot is applied
|
||||
return <-revert.applied
|
||||
err := <-revert.applied
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if revert.persisted != nil {
|
||||
err = <-revert.persisted
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -92,22 +92,23 @@ func TestIndexRollback(t *testing.T) {
|
|||
t.Error(err)
|
||||
}
|
||||
|
||||
err = sh.SnapshotRevert(prev)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if prev != nil {
|
||||
err = sh.SnapshotRevert(prev)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
newRoot, err := sh.PreviousPersistedSnapshot(nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if newRoot == nil {
|
||||
t.Errorf("Failed to retrieve latest persisted snapshot")
|
||||
}
|
||||
|
||||
newRoot := sh.root
|
||||
if newRoot != nil && prev != nil {
|
||||
if newRoot.epoch <= prev.epoch {
|
||||
t.Errorf("Unexpected epoch, %v <= %v", newRoot.epoch, prev.epoch)
|
||||
}
|
||||
} else {
|
||||
if prev == nil {
|
||||
t.Errorf("The last persisted snapshot before the revert was nil!")
|
||||
}
|
||||
if newRoot == nil {
|
||||
t.Errorf("The new root has been set to nil?")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -249,3 +249,18 @@ func (c *cachedDocs) prepareFields(wantedFields []string, ss *SegmentSnapshot) e
|
|||
c.m.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cachedDocs) sizeInBytes() uint64 {
|
||||
sizeInBytes := 0
|
||||
c.m.Lock()
|
||||
for k, v := range c.cache { // cachedFieldDocs
|
||||
sizeInBytes += len(k)
|
||||
if v != nil {
|
||||
for _, entry := range v.docs { // docs
|
||||
sizeInBytes += 8 /* size of uint64 */ + len(entry)
|
||||
}
|
||||
}
|
||||
}
|
||||
c.m.Unlock()
|
||||
return uint64(sizeInBytes)
|
||||
}
|
||||
|
|
|
@ -21,21 +21,28 @@ import (
|
|||
|
||||
// Stats tracks statistics about the index
|
||||
type Stats struct {
|
||||
analysisTime, indexTime uint64
|
||||
updates, deletes, batches, errors uint64
|
||||
analysisTime, indexTime uint64
|
||||
termSearchersStarted uint64
|
||||
termSearchersFinished uint64
|
||||
numPlainTextBytesIndexed uint64
|
||||
numItemsIntroduced uint64
|
||||
numItemsPersisted uint64
|
||||
}
|
||||
|
||||
// FIXME wire up these other stats again
|
||||
func (s *Stats) statsMap() map[string]interface{} {
|
||||
m := map[string]interface{}{}
|
||||
// m["updates"] = atomic.LoadUint64(&i.updates)
|
||||
// m["deletes"] = atomic.LoadUint64(&i.deletes)
|
||||
// m["batches"] = atomic.LoadUint64(&i.batches)
|
||||
// m["errors"] = atomic.LoadUint64(&i.errors)
|
||||
m["updates"] = atomic.LoadUint64(&s.updates)
|
||||
m["deletes"] = atomic.LoadUint64(&s.deletes)
|
||||
m["batches"] = atomic.LoadUint64(&s.batches)
|
||||
m["errors"] = atomic.LoadUint64(&s.errors)
|
||||
m["analysis_time"] = atomic.LoadUint64(&s.analysisTime)
|
||||
m["index_time"] = atomic.LoadUint64(&s.indexTime)
|
||||
// m["term_searchers_started"] = atomic.LoadUint64(&i.termSearchersStarted)
|
||||
// m["term_searchers_finished"] = atomic.LoadUint64(&i.termSearchersFinished)
|
||||
// m["num_plain_text_bytes_indexed"] = atomic.LoadUint64(&i.numPlainTextBytesIndexed)
|
||||
m["term_searchers_started"] = atomic.LoadUint64(&s.termSearchersStarted)
|
||||
m["term_searchers_finished"] = atomic.LoadUint64(&s.termSearchersFinished)
|
||||
m["num_plain_text_bytes_indexed"] = atomic.LoadUint64(&s.numPlainTextBytesIndexed)
|
||||
m["num_items_introduced"] = atomic.LoadUint64(&s.numItemsIntroduced)
|
||||
m["num_items_persisted"] = atomic.LoadUint64(&s.numItemsPersisted)
|
||||
|
||||
return m
|
||||
}
|
||||
|
|
|
@ -179,6 +179,7 @@ OUTER:
|
|||
continue OUTER
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
return current
|
||||
}
|
||||
|
|
|
@ -991,3 +991,26 @@ func TestMappingForNilTextMarshaler(t *testing.T) {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
func TestClosestDocDynamicMapping(t *testing.T) {
|
||||
mapping := NewIndexMapping()
|
||||
mapping.IndexDynamic = false
|
||||
mapping.DefaultMapping = NewDocumentStaticMapping()
|
||||
mapping.DefaultMapping.AddFieldMappingsAt("foo", NewTextFieldMapping())
|
||||
|
||||
doc := document.NewDocument("x")
|
||||
err := mapping.MapDocument(doc, map[string]interface{}{
|
||||
"foo": "value",
|
||||
"bar": map[string]string{
|
||||
"foo": "value2",
|
||||
"baz": "value3",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(doc.Fields) != 1 {
|
||||
t.Fatalf("expected 1 field, got: %d", len(doc.Fields))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -319,6 +319,9 @@ func (s *PhraseSearcher) Advance(ctx *search.SearchContext, ID index.IndexIntern
|
|||
}
|
||||
ctx.DocumentMatchPool.Put(s.currMust)
|
||||
}
|
||||
if s.currMust == nil {
|
||||
return nil, nil
|
||||
}
|
||||
var err error
|
||||
s.currMust, err = s.mustSearcher.Advance(ctx, ID)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue