0
0
Fork 0

DocumentVisitFieldTerms Scorch implementation level1

This commit is contained in:
Sreekanth Sivasankaran 2017-12-14 12:38:29 +05:30
parent 2b92e5ff99
commit 1066ee7d22
4 changed files with 305 additions and 16 deletions

View File

@ -96,6 +96,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
id: s.root.segment[i].id,
segment: s.root.segment[i].segment,
notify: s.root.segment[i].notify,
cachedDocs: s.root.segment[i].cachedDocs,
}
s.root.segment[i].segment.AddRef()
// apply new obsoletions
@ -113,6 +114,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
newSnapshot.segment[len(s.root.segment)] = &SegmentSnapshot{
id: next.id,
segment: next.data, // Take ownership of next.data's ref-count.
cachedDocs: &cachedDocs{cache: nil},
}
newSnapshot.offsets[len(s.root.segment)] = running
if !s.unsafeBatch {

View File

@ -1377,3 +1377,155 @@ Mechanism[edit]
This section needs additional citations for verification. Please help improve this article by adding citations to reliable sources. Unsourced material may be challenged and removed. (July 2013)
There are three characteristics of liquids which are relevant to the discussion of a BLEVE:`)
func TestIndexDocumentVisitFieldTermsWithMultipleDocs(t *testing.T) {
defer func() {
err := DestroyTest()
if err != nil {
t.Fatal(err)
}
}()
analysisQueue := index.NewAnalysisQueue(1)
idx, err := NewScorch(Name, testConfig, analysisQueue)
if err != nil {
t.Fatal(err)
}
err = idx.Open()
if err != nil {
t.Fatalf("error opening index: %v", err)
}
defer func() {
err := idx.Close()
if err != nil {
t.Fatal(err)
}
}()
doc := document.NewDocument("1")
doc.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("test"), document.IndexField|document.StoreField|document.IncludeTermVectors))
doc.AddField(document.NewTextFieldWithIndexingOptions("title", []uint64{}, []byte("mister"), document.IndexField|document.StoreField|document.IncludeTermVectors))
err = idx.Update(doc)
if err != nil {
t.Errorf("Error updating index: %v", err)
}
indexReader, err := idx.Reader()
if err != nil {
t.Error(err)
}
fieldTerms := make(index.FieldTerms)
docNumber, err := indexReader.InternalID("1")
if err != nil {
t.Fatal(err)
}
err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name", "title"}, func(field string, term []byte) {
fieldTerms[field] = append(fieldTerms[field], string(term))
})
if err != nil {
t.Error(err)
}
expectedFieldTerms := index.FieldTerms{
"name": []string{"test"},
"title": []string{"mister"},
}
if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) {
t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms)
}
err = indexReader.Close()
if err != nil {
t.Fatal(err)
}
doc2 := document.NewDocument("2")
doc2.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("test2"), document.IndexField|document.StoreField|document.IncludeTermVectors))
doc2.AddField(document.NewTextFieldWithIndexingOptions("title", []uint64{}, []byte("mister2"), document.IndexField|document.StoreField|document.IncludeTermVectors))
err = idx.Update(doc2)
if err != nil {
t.Errorf("Error updating index: %v", err)
}
indexReader, err = idx.Reader()
if err != nil {
t.Error(err)
}
fieldTerms = make(index.FieldTerms)
docNumber, err = indexReader.InternalID("2")
if err != nil {
t.Fatal(err)
}
err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name", "title"}, func(field string, term []byte) {
fieldTerms[field] = append(fieldTerms[field], string(term))
})
if err != nil {
t.Error(err)
}
expectedFieldTerms = index.FieldTerms{
"name": []string{"test2"},
"title": []string{"mister2"},
}
if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) {
t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms)
}
err = indexReader.Close()
if err != nil {
t.Fatal(err)
}
doc3 := document.NewDocument("3")
doc3.AddField(document.NewTextFieldWithIndexingOptions("name3", []uint64{}, []byte("test3"), document.IndexField|document.StoreField|document.IncludeTermVectors))
doc3.AddField(document.NewTextFieldWithIndexingOptions("title3", []uint64{}, []byte("mister3"), document.IndexField|document.StoreField|document.IncludeTermVectors))
err = idx.Update(doc3)
if err != nil {
t.Errorf("Error updating index: %v", err)
}
indexReader, err = idx.Reader()
if err != nil {
t.Error(err)
}
fieldTerms = make(index.FieldTerms)
docNumber, err = indexReader.InternalID("3")
if err != nil {
t.Fatal(err)
}
err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name3", "title3"}, func(field string, term []byte) {
fieldTerms[field] = append(fieldTerms[field], string(term))
})
if err != nil {
t.Error(err)
}
expectedFieldTerms = index.FieldTerms{
"name3": []string{"test3"},
"title3": []string{"mister3"},
}
if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) {
t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms)
}
fieldTerms = make(index.FieldTerms)
docNumber, err = indexReader.InternalID("1")
if err != nil {
t.Fatal(err)
}
err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name", "title"}, func(field string, term []byte) {
fieldTerms[field] = append(fieldTerms[field], string(term))
})
if err != nil {
t.Error(err)
}
expectedFieldTerms = index.FieldTerms{
"name": []string{"test"},
"title": []string{"mister"},
}
if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) {
t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms)
}
err = indexReader.Close()
if err != nil {
t.Fatal(err)
}
}

View File

@ -248,7 +248,10 @@ func (i *IndexSnapshot) Document(id string) (rv *document.Document, err error) {
return nil, nil
}
docNum := docInternalToNumber(next.ID)
docNum, err := docInternalToNumber(next.ID)
if err != nil {
return nil, err
}
segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum)
rv = document.NewDocument(id)
@ -286,12 +289,15 @@ func (i *IndexSnapshot) segmentIndexAndLocalDocNumFromGlobal(docNum uint64) (int
}
func (i *IndexSnapshot) ExternalID(id index.IndexInternalID) (string, error) {
docNum := docInternalToNumber(id)
docNum, err := docInternalToNumber(id)
if err != nil {
return "", err
}
segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum)
var found bool
var rv string
err := i.segment[segmentIndex].VisitDocument(localDocNum, func(field string, typ byte, value []byte, pos []uint64) bool {
err = i.segment[segmentIndex].VisitDocument(localDocNum, func(field string, typ byte, value []byte, pos []uint64) bool {
if field == "_id" {
found = true
rv = string(value)
@ -377,15 +383,6 @@ func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
return rv, nil
}
func (i *IndexSnapshot) DocumentVisitFieldTerms(id index.IndexInternalID, fields []string,
visitor index.DocumentFieldTermVisitor) error {
docNum := docInternalToNumber(id)
segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum)
return i.segment[segmentIndex].DocumentVisitFieldTerms(localDocNum, fields, visitor)
}
func docNumberToBytes(in uint64) []byte {
buf := new(bytes.Buffer)
@ -393,8 +390,50 @@ func docNumberToBytes(in uint64) []byte {
return buf.Bytes()
}
func docInternalToNumber(in index.IndexInternalID) uint64 {
func docInternalToNumber(in index.IndexInternalID) (uint64, error) {
var res uint64
_ = binary.Read(bytes.NewReader(in), binary.BigEndian, &res)
return res
err := binary.Read(bytes.NewReader(in), binary.BigEndian, &res)
if err != nil {
return res, err
}
return res, nil
}
func (i *IndexSnapshot) DocumentVisitFieldTerms(id index.IndexInternalID,
fields []string, visitor index.DocumentFieldTermVisitor) error {
docNum, err := docInternalToNumber(id)
if err != nil {
return err
}
segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum)
if segmentIndex >= len(i.segment) {
return nil
}
i.m.Lock()
ss := i.segment[segmentIndex]
if ss.cachedDocs == nil {
ss.cachedDocs = &cachedDocs{cache: nil}
}
i.m.Unlock()
err = ss.cachedDocs.prepareFields(localDocNum, fields, ss)
if err != nil {
return err
}
for _, field := range fields {
if cachedFieldDocs, exists := ss.cachedDocs.cache[field]; exists {
if tlist, exists := cachedFieldDocs.docs[localDocNum]; exists {
terms := bytes.SplitN(tlist, TermSeparatorSplitSlice, -1)
for _, term := range terms {
if len(term) > 0 {
visitor(field, term)
}
}
}
}
}
return nil
}

View File

@ -15,11 +15,17 @@
package scorch
import (
"sync"
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment"
)
var TermSeparator byte = 0xff
var TermSeparatorSplitSlice = []byte{TermSeparator}
type SegmentDictionarySnapshot struct {
s *SegmentSnapshot
d segment.TermDictionary
@ -46,7 +52,8 @@ type SegmentSnapshot struct {
segment segment.Segment
deleted *roaring.Bitmap
notify []chan error
notify []chan error
cachedDocs *cachedDocs
}
func (s *SegmentSnapshot) Id() uint64 {
@ -157,3 +164,92 @@ func (s *SegmentSnapshot) DocNumbersLive() *roaring.Bitmap {
func (s *SegmentSnapshot) Fields() []string {
return s.segment.Fields()
}
type cachedFieldDocs struct {
readyCh chan struct{} // closed when the cachedFieldDocs.docs is ready to be used.
err error // Non-nil if there was an error when preparing this cachedFieldDocs.
docs map[uint64][]byte // Keyed by localDocNum, value is a list of terms delimited by 0xFF.
}
func (cfd *cachedFieldDocs) prepareFields(docNum uint64, field string,
ss *SegmentSnapshot) {
defer close(cfd.readyCh)
dict, err := ss.segment.Dictionary(field)
if err != nil {
cfd.err = err
return
}
dictItr := dict.Iterator()
next, err := dictItr.Next()
for next != nil && err == nil {
postings, err1 := dict.PostingsList(next.Term, nil)
if err1 != nil {
cfd.err = err1
return
}
postingsItr := postings.Iterator()
nextPosting, err2 := postingsItr.Next()
for err2 == nil && nextPosting != nil && nextPosting.Number() <= docNum {
if nextPosting.Number() == docNum {
// got what we're looking for
cfd.docs[docNum] = append(cfd.docs[docNum], []byte(next.Term)...)
cfd.docs[docNum] = append(cfd.docs[docNum], TermSeparator)
}
nextPosting, err2 = postingsItr.Next()
}
if err2 != nil {
cfd.err = err2
return
}
next, err = dictItr.Next()
}
if err != nil {
cfd.err = err
return
}
}
type cachedDocs struct {
m sync.Mutex // As the cache is asynchronously prepared, need a lock
cache map[string]*cachedFieldDocs // Keyed by field
}
func (c *cachedDocs) prepareFields(docNum uint64, wantedFields []string,
ss *SegmentSnapshot) error {
c.m.Lock()
if c.cache == nil {
c.cache = make(map[string]*cachedFieldDocs, len(ss.Fields()))
}
for _, field := range wantedFields {
_, exists := c.cache[field]
if !exists {
c.cache[field] = &cachedFieldDocs{
readyCh: make(chan struct{}),
docs: make(map[uint64][]byte),
}
go c.cache[field].prepareFields(docNum, field, ss)
}
}
for _, field := range wantedFields {
cachedFieldDocs := c.cache[field]
c.m.Unlock()
<-cachedFieldDocs.readyCh
if cachedFieldDocs.err != nil {
return cachedFieldDocs.err
}
c.m.Lock()
}
c.m.Unlock()
return nil
}