diff --git a/index/scorch/introducer.go b/index/scorch/introducer.go index a80e5ac1..56c95143 100644 --- a/index/scorch/introducer.go +++ b/index/scorch/introducer.go @@ -96,6 +96,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { id: s.root.segment[i].id, segment: s.root.segment[i].segment, notify: s.root.segment[i].notify, + cachedDocs: s.root.segment[i].cachedDocs, } s.root.segment[i].segment.AddRef() // apply new obsoletions @@ -113,6 +114,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { newSnapshot.segment[len(s.root.segment)] = &SegmentSnapshot{ id: next.id, segment: next.data, // Take ownership of next.data's ref-count. + cachedDocs: &cachedDocs{cache: nil}, } newSnapshot.offsets[len(s.root.segment)] = running if !s.unsafeBatch { diff --git a/index/scorch/scorch_test.go b/index/scorch/scorch_test.go index b46a5ffd..6feabb30 100644 --- a/index/scorch/scorch_test.go +++ b/index/scorch/scorch_test.go @@ -1240,7 +1240,7 @@ func TestIndexDocumentVisitFieldTerms(t *testing.T) { fieldTerms := make(index.FieldTerms) - internalID, err := indexReader.GetInternal([]byte("1")) + internalID, err := indexReader.InternalID("1") if err != nil { t.Fatal(err) } @@ -1377,3 +1377,155 @@ Mechanism[edit] This section needs additional citations for verification. Please help improve this article by adding citations to reliable sources. Unsourced material may be challenged and removed. (July 2013) There are three characteristics of liquids which are relevant to the discussion of a BLEVE:`) + +func TestIndexDocumentVisitFieldTermsWithMultipleDocs(t *testing.T) { + defer func() { + err := DestroyTest() + if err != nil { + t.Fatal(err) + } + }() + + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, testConfig, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Fatalf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + doc := document.NewDocument("1") + doc.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("test"), document.IndexField|document.StoreField|document.IncludeTermVectors)) + doc.AddField(document.NewTextFieldWithIndexingOptions("title", []uint64{}, []byte("mister"), document.IndexField|document.StoreField|document.IncludeTermVectors)) + + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } + + fieldTerms := make(index.FieldTerms) + docNumber, err := indexReader.InternalID("1") + if err != nil { + t.Fatal(err) + } + err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name", "title"}, func(field string, term []byte) { + fieldTerms[field] = append(fieldTerms[field], string(term)) + }) + if err != nil { + t.Error(err) + } + expectedFieldTerms := index.FieldTerms{ + "name": []string{"test"}, + "title": []string{"mister"}, + } + if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) { + t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms) + } + err = indexReader.Close() + if err != nil { + t.Fatal(err) + } + + doc2 := document.NewDocument("2") + doc2.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("test2"), document.IndexField|document.StoreField|document.IncludeTermVectors)) + doc2.AddField(document.NewTextFieldWithIndexingOptions("title", []uint64{}, []byte("mister2"), document.IndexField|document.StoreField|document.IncludeTermVectors)) + err = idx.Update(doc2) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + indexReader, err = idx.Reader() + if err != nil { + t.Error(err) + } + + fieldTerms = make(index.FieldTerms) + docNumber, err = indexReader.InternalID("2") + if err != nil { + t.Fatal(err) + } + err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name", "title"}, func(field string, term []byte) { + fieldTerms[field] = append(fieldTerms[field], string(term)) + }) + if err != nil { + t.Error(err) + } + expectedFieldTerms = index.FieldTerms{ + "name": []string{"test2"}, + "title": []string{"mister2"}, + } + if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) { + t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms) + } + err = indexReader.Close() + if err != nil { + t.Fatal(err) + } + + doc3 := document.NewDocument("3") + doc3.AddField(document.NewTextFieldWithIndexingOptions("name3", []uint64{}, []byte("test3"), document.IndexField|document.StoreField|document.IncludeTermVectors)) + doc3.AddField(document.NewTextFieldWithIndexingOptions("title3", []uint64{}, []byte("mister3"), document.IndexField|document.StoreField|document.IncludeTermVectors)) + err = idx.Update(doc3) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + indexReader, err = idx.Reader() + if err != nil { + t.Error(err) + } + + fieldTerms = make(index.FieldTerms) + docNumber, err = indexReader.InternalID("3") + if err != nil { + t.Fatal(err) + } + err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name3", "title3"}, func(field string, term []byte) { + fieldTerms[field] = append(fieldTerms[field], string(term)) + }) + if err != nil { + t.Error(err) + } + expectedFieldTerms = index.FieldTerms{ + "name3": []string{"test3"}, + "title3": []string{"mister3"}, + } + if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) { + t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms) + } + + fieldTerms = make(index.FieldTerms) + docNumber, err = indexReader.InternalID("1") + if err != nil { + t.Fatal(err) + } + err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name", "title"}, func(field string, term []byte) { + fieldTerms[field] = append(fieldTerms[field], string(term)) + }) + if err != nil { + t.Error(err) + } + expectedFieldTerms = index.FieldTerms{ + "name": []string{"test"}, + "title": []string{"mister"}, + } + if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) { + t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms) + } + err = indexReader.Close() + if err != nil { + t.Fatal(err) + } + +} diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index 6dd77ff4..cb25efc6 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -248,7 +248,10 @@ func (i *IndexSnapshot) Document(id string) (rv *document.Document, err error) { return nil, nil } - docNum := docInternalToNumber(next.ID) + docNum, err := docInternalToNumber(next.ID) + if err != nil { + return nil, err + } segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) rv = document.NewDocument(id) @@ -286,12 +289,15 @@ func (i *IndexSnapshot) segmentIndexAndLocalDocNumFromGlobal(docNum uint64) (int } func (i *IndexSnapshot) ExternalID(id index.IndexInternalID) (string, error) { - docNum := docInternalToNumber(id) + docNum, err := docInternalToNumber(id) + if err != nil { + return "", err + } segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) var found bool var rv string - err := i.segment[segmentIndex].VisitDocument(localDocNum, func(field string, typ byte, value []byte, pos []uint64) bool { + err = i.segment[segmentIndex].VisitDocument(localDocNum, func(field string, typ byte, value []byte, pos []uint64) bool { if field == "_id" { found = true rv = string(value) @@ -377,15 +383,6 @@ func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq, return rv, nil } -func (i *IndexSnapshot) DocumentVisitFieldTerms(id index.IndexInternalID, fields []string, - visitor index.DocumentFieldTermVisitor) error { - - docNum := docInternalToNumber(id) - segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) - - return i.segment[segmentIndex].DocumentVisitFieldTerms(localDocNum, fields, visitor) -} - func docNumberToBytes(in uint64) []byte { buf := new(bytes.Buffer) @@ -393,8 +390,50 @@ func docNumberToBytes(in uint64) []byte { return buf.Bytes() } -func docInternalToNumber(in index.IndexInternalID) uint64 { +func docInternalToNumber(in index.IndexInternalID) (uint64, error) { var res uint64 - _ = binary.Read(bytes.NewReader(in), binary.BigEndian, &res) - return res + err := binary.Read(bytes.NewReader(in), binary.BigEndian, &res) + if err != nil { + return res, err + } + return res, nil +} + +func (i *IndexSnapshot) DocumentVisitFieldTerms(id index.IndexInternalID, + fields []string, visitor index.DocumentFieldTermVisitor) error { + docNum, err := docInternalToNumber(id) + if err != nil { + return err + } + segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) + if segmentIndex >= len(i.segment) { + return nil + } + + i.m.Lock() + ss := i.segment[segmentIndex] + if ss.cachedDocs == nil { + ss.cachedDocs = &cachedDocs{cache: nil} + } + i.m.Unlock() + + err = ss.cachedDocs.prepareFields(localDocNum, fields, ss) + if err != nil { + return err + } + + for _, field := range fields { + if cachedFieldDocs, exists := ss.cachedDocs.cache[field]; exists { + if tlist, exists := cachedFieldDocs.docs[localDocNum]; exists { + terms := bytes.SplitN(tlist, TermSeparatorSplitSlice, -1) + for _, term := range terms { + if len(term) > 0 { + visitor(field, term) + } + } + } + } + } + + return nil } diff --git a/index/scorch/snapshot_segment.go b/index/scorch/snapshot_segment.go index ffd38cac..836ec6fa 100644 --- a/index/scorch/snapshot_segment.go +++ b/index/scorch/snapshot_segment.go @@ -15,11 +15,17 @@ package scorch import ( + "sync" + "github.com/RoaringBitmap/roaring" "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/index/scorch/segment" ) +var TermSeparator byte = 0xff + +var TermSeparatorSplitSlice = []byte{TermSeparator} + type SegmentDictionarySnapshot struct { s *SegmentSnapshot d segment.TermDictionary @@ -46,7 +52,8 @@ type SegmentSnapshot struct { segment segment.Segment deleted *roaring.Bitmap - notify []chan error + notify []chan error + cachedDocs *cachedDocs } func (s *SegmentSnapshot) Id() uint64 { @@ -157,3 +164,92 @@ func (s *SegmentSnapshot) DocNumbersLive() *roaring.Bitmap { func (s *SegmentSnapshot) Fields() []string { return s.segment.Fields() } + +type cachedFieldDocs struct { + readyCh chan struct{} // closed when the cachedFieldDocs.docs is ready to be used. + err error // Non-nil if there was an error when preparing this cachedFieldDocs. + docs map[uint64][]byte // Keyed by localDocNum, value is a list of terms delimited by 0xFF. +} + +func (cfd *cachedFieldDocs) prepareFields(docNum uint64, field string, + ss *SegmentSnapshot) { + defer close(cfd.readyCh) + + dict, err := ss.segment.Dictionary(field) + if err != nil { + cfd.err = err + return + } + + dictItr := dict.Iterator() + next, err := dictItr.Next() + for next != nil && err == nil { + postings, err1 := dict.PostingsList(next.Term, nil) + if err1 != nil { + cfd.err = err1 + return + } + + postingsItr := postings.Iterator() + nextPosting, err2 := postingsItr.Next() + for err2 == nil && nextPosting != nil && nextPosting.Number() <= docNum { + if nextPosting.Number() == docNum { + // got what we're looking for + cfd.docs[docNum] = append(cfd.docs[docNum], []byte(next.Term)...) + cfd.docs[docNum] = append(cfd.docs[docNum], TermSeparator) + } + nextPosting, err2 = postingsItr.Next() + } + + if err2 != nil { + cfd.err = err2 + return + } + + next, err = dictItr.Next() + } + + if err != nil { + cfd.err = err + return + } +} + +type cachedDocs struct { + m sync.Mutex // As the cache is asynchronously prepared, need a lock + cache map[string]*cachedFieldDocs // Keyed by field +} + +func (c *cachedDocs) prepareFields(docNum uint64, wantedFields []string, + ss *SegmentSnapshot) error { + c.m.Lock() + if c.cache == nil { + c.cache = make(map[string]*cachedFieldDocs, len(ss.Fields())) + } + + for _, field := range wantedFields { + _, exists := c.cache[field] + if !exists { + c.cache[field] = &cachedFieldDocs{ + readyCh: make(chan struct{}), + docs: make(map[uint64][]byte), + } + + go c.cache[field].prepareFields(docNum, field, ss) + } + } + + for _, field := range wantedFields { + cachedFieldDocs := c.cache[field] + c.m.Unlock() + <-cachedFieldDocs.readyCh + + if cachedFieldDocs.err != nil { + return cachedFieldDocs.err + } + c.m.Lock() + } + + c.m.Unlock() + return nil +}