From 1066ee7d221cc6f5b6e57f2fdc99d4fb1a2a26a1 Mon Sep 17 00:00:00 2001 From: Sreekanth Sivasankaran Date: Thu, 14 Dec 2017 12:38:29 +0530 Subject: [PATCH 1/2] DocumentVisitFieldTerms Scorch implementation level1 --- index/scorch/introducer.go | 2 + index/scorch/scorch_test.go | 152 +++++++++++++++++++++++++++++++ index/scorch/snapshot_index.go | 69 +++++++++++--- index/scorch/snapshot_segment.go | 98 +++++++++++++++++++- 4 files changed, 305 insertions(+), 16 deletions(-) diff --git a/index/scorch/introducer.go b/index/scorch/introducer.go index a80e5ac1..56c95143 100644 --- a/index/scorch/introducer.go +++ b/index/scorch/introducer.go @@ -96,6 +96,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { id: s.root.segment[i].id, segment: s.root.segment[i].segment, notify: s.root.segment[i].notify, + cachedDocs: s.root.segment[i].cachedDocs, } s.root.segment[i].segment.AddRef() // apply new obsoletions @@ -113,6 +114,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { newSnapshot.segment[len(s.root.segment)] = &SegmentSnapshot{ id: next.id, segment: next.data, // Take ownership of next.data's ref-count. + cachedDocs: &cachedDocs{cache: nil}, } newSnapshot.offsets[len(s.root.segment)] = running if !s.unsafeBatch { diff --git a/index/scorch/scorch_test.go b/index/scorch/scorch_test.go index b46a5ffd..8d2db4d2 100644 --- a/index/scorch/scorch_test.go +++ b/index/scorch/scorch_test.go @@ -1377,3 +1377,155 @@ Mechanism[edit] This section needs additional citations for verification. Please help improve this article by adding citations to reliable sources. Unsourced material may be challenged and removed. (July 2013) There are three characteristics of liquids which are relevant to the discussion of a BLEVE:`) + +func TestIndexDocumentVisitFieldTermsWithMultipleDocs(t *testing.T) { + defer func() { + err := DestroyTest() + if err != nil { + t.Fatal(err) + } + }() + + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, testConfig, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Fatalf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + doc := document.NewDocument("1") + doc.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("test"), document.IndexField|document.StoreField|document.IncludeTermVectors)) + doc.AddField(document.NewTextFieldWithIndexingOptions("title", []uint64{}, []byte("mister"), document.IndexField|document.StoreField|document.IncludeTermVectors)) + + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } + + fieldTerms := make(index.FieldTerms) + docNumber, err := indexReader.InternalID("1") + if err != nil { + t.Fatal(err) + } + err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name", "title"}, func(field string, term []byte) { + fieldTerms[field] = append(fieldTerms[field], string(term)) + }) + if err != nil { + t.Error(err) + } + expectedFieldTerms := index.FieldTerms{ + "name": []string{"test"}, + "title": []string{"mister"}, + } + if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) { + t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms) + } + err = indexReader.Close() + if err != nil { + t.Fatal(err) + } + + doc2 := document.NewDocument("2") + doc2.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("test2"), document.IndexField|document.StoreField|document.IncludeTermVectors)) + doc2.AddField(document.NewTextFieldWithIndexingOptions("title", []uint64{}, []byte("mister2"), document.IndexField|document.StoreField|document.IncludeTermVectors)) + err = idx.Update(doc2) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + indexReader, err = idx.Reader() + if err != nil { + t.Error(err) + } + + fieldTerms = make(index.FieldTerms) + docNumber, err = indexReader.InternalID("2") + if err != nil { + t.Fatal(err) + } + err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name", "title"}, func(field string, term []byte) { + fieldTerms[field] = append(fieldTerms[field], string(term)) + }) + if err != nil { + t.Error(err) + } + expectedFieldTerms = index.FieldTerms{ + "name": []string{"test2"}, + "title": []string{"mister2"}, + } + if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) { + t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms) + } + err = indexReader.Close() + if err != nil { + t.Fatal(err) + } + + doc3 := document.NewDocument("3") + doc3.AddField(document.NewTextFieldWithIndexingOptions("name3", []uint64{}, []byte("test3"), document.IndexField|document.StoreField|document.IncludeTermVectors)) + doc3.AddField(document.NewTextFieldWithIndexingOptions("title3", []uint64{}, []byte("mister3"), document.IndexField|document.StoreField|document.IncludeTermVectors)) + err = idx.Update(doc3) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + indexReader, err = idx.Reader() + if err != nil { + t.Error(err) + } + + fieldTerms = make(index.FieldTerms) + docNumber, err = indexReader.InternalID("3") + if err != nil { + t.Fatal(err) + } + err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name3", "title3"}, func(field string, term []byte) { + fieldTerms[field] = append(fieldTerms[field], string(term)) + }) + if err != nil { + t.Error(err) + } + expectedFieldTerms = index.FieldTerms{ + "name3": []string{"test3"}, + "title3": []string{"mister3"}, + } + if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) { + t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms) + } + + fieldTerms = make(index.FieldTerms) + docNumber, err = indexReader.InternalID("1") + if err != nil { + t.Fatal(err) + } + err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name", "title"}, func(field string, term []byte) { + fieldTerms[field] = append(fieldTerms[field], string(term)) + }) + if err != nil { + t.Error(err) + } + expectedFieldTerms = index.FieldTerms{ + "name": []string{"test"}, + "title": []string{"mister"}, + } + if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) { + t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms) + } + err = indexReader.Close() + if err != nil { + t.Fatal(err) + } + +} diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index 6dd77ff4..cb25efc6 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -248,7 +248,10 @@ func (i *IndexSnapshot) Document(id string) (rv *document.Document, err error) { return nil, nil } - docNum := docInternalToNumber(next.ID) + docNum, err := docInternalToNumber(next.ID) + if err != nil { + return nil, err + } segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) rv = document.NewDocument(id) @@ -286,12 +289,15 @@ func (i *IndexSnapshot) segmentIndexAndLocalDocNumFromGlobal(docNum uint64) (int } func (i *IndexSnapshot) ExternalID(id index.IndexInternalID) (string, error) { - docNum := docInternalToNumber(id) + docNum, err := docInternalToNumber(id) + if err != nil { + return "", err + } segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) var found bool var rv string - err := i.segment[segmentIndex].VisitDocument(localDocNum, func(field string, typ byte, value []byte, pos []uint64) bool { + err = i.segment[segmentIndex].VisitDocument(localDocNum, func(field string, typ byte, value []byte, pos []uint64) bool { if field == "_id" { found = true rv = string(value) @@ -377,15 +383,6 @@ func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq, return rv, nil } -func (i *IndexSnapshot) DocumentVisitFieldTerms(id index.IndexInternalID, fields []string, - visitor index.DocumentFieldTermVisitor) error { - - docNum := docInternalToNumber(id) - segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) - - return i.segment[segmentIndex].DocumentVisitFieldTerms(localDocNum, fields, visitor) -} - func docNumberToBytes(in uint64) []byte { buf := new(bytes.Buffer) @@ -393,8 +390,50 @@ func docNumberToBytes(in uint64) []byte { return buf.Bytes() } -func docInternalToNumber(in index.IndexInternalID) uint64 { +func docInternalToNumber(in index.IndexInternalID) (uint64, error) { var res uint64 - _ = binary.Read(bytes.NewReader(in), binary.BigEndian, &res) - return res + err := binary.Read(bytes.NewReader(in), binary.BigEndian, &res) + if err != nil { + return res, err + } + return res, nil +} + +func (i *IndexSnapshot) DocumentVisitFieldTerms(id index.IndexInternalID, + fields []string, visitor index.DocumentFieldTermVisitor) error { + docNum, err := docInternalToNumber(id) + if err != nil { + return err + } + segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) + if segmentIndex >= len(i.segment) { + return nil + } + + i.m.Lock() + ss := i.segment[segmentIndex] + if ss.cachedDocs == nil { + ss.cachedDocs = &cachedDocs{cache: nil} + } + i.m.Unlock() + + err = ss.cachedDocs.prepareFields(localDocNum, fields, ss) + if err != nil { + return err + } + + for _, field := range fields { + if cachedFieldDocs, exists := ss.cachedDocs.cache[field]; exists { + if tlist, exists := cachedFieldDocs.docs[localDocNum]; exists { + terms := bytes.SplitN(tlist, TermSeparatorSplitSlice, -1) + for _, term := range terms { + if len(term) > 0 { + visitor(field, term) + } + } + } + } + } + + return nil } diff --git a/index/scorch/snapshot_segment.go b/index/scorch/snapshot_segment.go index ffd38cac..836ec6fa 100644 --- a/index/scorch/snapshot_segment.go +++ b/index/scorch/snapshot_segment.go @@ -15,11 +15,17 @@ package scorch import ( + "sync" + "github.com/RoaringBitmap/roaring" "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/index/scorch/segment" ) +var TermSeparator byte = 0xff + +var TermSeparatorSplitSlice = []byte{TermSeparator} + type SegmentDictionarySnapshot struct { s *SegmentSnapshot d segment.TermDictionary @@ -46,7 +52,8 @@ type SegmentSnapshot struct { segment segment.Segment deleted *roaring.Bitmap - notify []chan error + notify []chan error + cachedDocs *cachedDocs } func (s *SegmentSnapshot) Id() uint64 { @@ -157,3 +164,92 @@ func (s *SegmentSnapshot) DocNumbersLive() *roaring.Bitmap { func (s *SegmentSnapshot) Fields() []string { return s.segment.Fields() } + +type cachedFieldDocs struct { + readyCh chan struct{} // closed when the cachedFieldDocs.docs is ready to be used. + err error // Non-nil if there was an error when preparing this cachedFieldDocs. + docs map[uint64][]byte // Keyed by localDocNum, value is a list of terms delimited by 0xFF. +} + +func (cfd *cachedFieldDocs) prepareFields(docNum uint64, field string, + ss *SegmentSnapshot) { + defer close(cfd.readyCh) + + dict, err := ss.segment.Dictionary(field) + if err != nil { + cfd.err = err + return + } + + dictItr := dict.Iterator() + next, err := dictItr.Next() + for next != nil && err == nil { + postings, err1 := dict.PostingsList(next.Term, nil) + if err1 != nil { + cfd.err = err1 + return + } + + postingsItr := postings.Iterator() + nextPosting, err2 := postingsItr.Next() + for err2 == nil && nextPosting != nil && nextPosting.Number() <= docNum { + if nextPosting.Number() == docNum { + // got what we're looking for + cfd.docs[docNum] = append(cfd.docs[docNum], []byte(next.Term)...) + cfd.docs[docNum] = append(cfd.docs[docNum], TermSeparator) + } + nextPosting, err2 = postingsItr.Next() + } + + if err2 != nil { + cfd.err = err2 + return + } + + next, err = dictItr.Next() + } + + if err != nil { + cfd.err = err + return + } +} + +type cachedDocs struct { + m sync.Mutex // As the cache is asynchronously prepared, need a lock + cache map[string]*cachedFieldDocs // Keyed by field +} + +func (c *cachedDocs) prepareFields(docNum uint64, wantedFields []string, + ss *SegmentSnapshot) error { + c.m.Lock() + if c.cache == nil { + c.cache = make(map[string]*cachedFieldDocs, len(ss.Fields())) + } + + for _, field := range wantedFields { + _, exists := c.cache[field] + if !exists { + c.cache[field] = &cachedFieldDocs{ + readyCh: make(chan struct{}), + docs: make(map[uint64][]byte), + } + + go c.cache[field].prepareFields(docNum, field, ss) + } + } + + for _, field := range wantedFields { + cachedFieldDocs := c.cache[field] + c.m.Unlock() + <-cachedFieldDocs.readyCh + + if cachedFieldDocs.err != nil { + return cachedFieldDocs.err + } + c.m.Lock() + } + + c.m.Unlock() + return nil +} From 95b65ade3e266db161f900aa847f36f912f8abd1 Mon Sep 17 00:00:00 2001 From: Sreekanth Sivasankaran Date: Thu, 14 Dec 2017 17:16:47 +0530 Subject: [PATCH 2/2] getting right internalID for doc in UT --- index/scorch/scorch_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index/scorch/scorch_test.go b/index/scorch/scorch_test.go index 8d2db4d2..6feabb30 100644 --- a/index/scorch/scorch_test.go +++ b/index/scorch/scorch_test.go @@ -1240,7 +1240,7 @@ func TestIndexDocumentVisitFieldTerms(t *testing.T) { fieldTerms := make(index.FieldTerms) - internalID, err := indexReader.GetInternal([]byte("1")) + internalID, err := indexReader.InternalID("1") if err != nil { t.Fatal(err) }