0
0

Merge branch 'sreekanth-cb-scorch_visit_fts' into scorch

This commit is contained in:
Marty Schoch 2017-12-14 10:24:50 -05:00
commit a79b450e0c
4 changed files with 306 additions and 17 deletions

View File

@ -96,6 +96,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
id: s.root.segment[i].id, id: s.root.segment[i].id,
segment: s.root.segment[i].segment, segment: s.root.segment[i].segment,
notify: s.root.segment[i].notify, notify: s.root.segment[i].notify,
cachedDocs: s.root.segment[i].cachedDocs,
} }
s.root.segment[i].segment.AddRef() s.root.segment[i].segment.AddRef()
// apply new obsoletions // apply new obsoletions
@ -113,6 +114,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
newSnapshot.segment[len(s.root.segment)] = &SegmentSnapshot{ newSnapshot.segment[len(s.root.segment)] = &SegmentSnapshot{
id: next.id, id: next.id,
segment: next.data, // Take ownership of next.data's ref-count. segment: next.data, // Take ownership of next.data's ref-count.
cachedDocs: &cachedDocs{cache: nil},
} }
newSnapshot.offsets[len(s.root.segment)] = running newSnapshot.offsets[len(s.root.segment)] = running
if !s.unsafeBatch { if !s.unsafeBatch {

View File

@ -1240,7 +1240,7 @@ func TestIndexDocumentVisitFieldTerms(t *testing.T) {
fieldTerms := make(index.FieldTerms) fieldTerms := make(index.FieldTerms)
internalID, err := indexReader.GetInternal([]byte("1")) internalID, err := indexReader.InternalID("1")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1377,3 +1377,155 @@ Mechanism[edit]
This section needs additional citations for verification. Please help improve this article by adding citations to reliable sources. Unsourced material may be challenged and removed. (July 2013) This section needs additional citations for verification. Please help improve this article by adding citations to reliable sources. Unsourced material may be challenged and removed. (July 2013)
There are three characteristics of liquids which are relevant to the discussion of a BLEVE:`) There are three characteristics of liquids which are relevant to the discussion of a BLEVE:`)
func TestIndexDocumentVisitFieldTermsWithMultipleDocs(t *testing.T) {
defer func() {
err := DestroyTest()
if err != nil {
t.Fatal(err)
}
}()
analysisQueue := index.NewAnalysisQueue(1)
idx, err := NewScorch(Name, testConfig, analysisQueue)
if err != nil {
t.Fatal(err)
}
err = idx.Open()
if err != nil {
t.Fatalf("error opening index: %v", err)
}
defer func() {
err := idx.Close()
if err != nil {
t.Fatal(err)
}
}()
doc := document.NewDocument("1")
doc.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("test"), document.IndexField|document.StoreField|document.IncludeTermVectors))
doc.AddField(document.NewTextFieldWithIndexingOptions("title", []uint64{}, []byte("mister"), document.IndexField|document.StoreField|document.IncludeTermVectors))
err = idx.Update(doc)
if err != nil {
t.Errorf("Error updating index: %v", err)
}
indexReader, err := idx.Reader()
if err != nil {
t.Error(err)
}
fieldTerms := make(index.FieldTerms)
docNumber, err := indexReader.InternalID("1")
if err != nil {
t.Fatal(err)
}
err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name", "title"}, func(field string, term []byte) {
fieldTerms[field] = append(fieldTerms[field], string(term))
})
if err != nil {
t.Error(err)
}
expectedFieldTerms := index.FieldTerms{
"name": []string{"test"},
"title": []string{"mister"},
}
if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) {
t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms)
}
err = indexReader.Close()
if err != nil {
t.Fatal(err)
}
doc2 := document.NewDocument("2")
doc2.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("test2"), document.IndexField|document.StoreField|document.IncludeTermVectors))
doc2.AddField(document.NewTextFieldWithIndexingOptions("title", []uint64{}, []byte("mister2"), document.IndexField|document.StoreField|document.IncludeTermVectors))
err = idx.Update(doc2)
if err != nil {
t.Errorf("Error updating index: %v", err)
}
indexReader, err = idx.Reader()
if err != nil {
t.Error(err)
}
fieldTerms = make(index.FieldTerms)
docNumber, err = indexReader.InternalID("2")
if err != nil {
t.Fatal(err)
}
err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name", "title"}, func(field string, term []byte) {
fieldTerms[field] = append(fieldTerms[field], string(term))
})
if err != nil {
t.Error(err)
}
expectedFieldTerms = index.FieldTerms{
"name": []string{"test2"},
"title": []string{"mister2"},
}
if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) {
t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms)
}
err = indexReader.Close()
if err != nil {
t.Fatal(err)
}
doc3 := document.NewDocument("3")
doc3.AddField(document.NewTextFieldWithIndexingOptions("name3", []uint64{}, []byte("test3"), document.IndexField|document.StoreField|document.IncludeTermVectors))
doc3.AddField(document.NewTextFieldWithIndexingOptions("title3", []uint64{}, []byte("mister3"), document.IndexField|document.StoreField|document.IncludeTermVectors))
err = idx.Update(doc3)
if err != nil {
t.Errorf("Error updating index: %v", err)
}
indexReader, err = idx.Reader()
if err != nil {
t.Error(err)
}
fieldTerms = make(index.FieldTerms)
docNumber, err = indexReader.InternalID("3")
if err != nil {
t.Fatal(err)
}
err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name3", "title3"}, func(field string, term []byte) {
fieldTerms[field] = append(fieldTerms[field], string(term))
})
if err != nil {
t.Error(err)
}
expectedFieldTerms = index.FieldTerms{
"name3": []string{"test3"},
"title3": []string{"mister3"},
}
if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) {
t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms)
}
fieldTerms = make(index.FieldTerms)
docNumber, err = indexReader.InternalID("1")
if err != nil {
t.Fatal(err)
}
err = indexReader.DocumentVisitFieldTerms(docNumber, []string{"name", "title"}, func(field string, term []byte) {
fieldTerms[field] = append(fieldTerms[field], string(term))
})
if err != nil {
t.Error(err)
}
expectedFieldTerms = index.FieldTerms{
"name": []string{"test"},
"title": []string{"mister"},
}
if !reflect.DeepEqual(fieldTerms, expectedFieldTerms) {
t.Errorf("expected field terms: %#v, got: %#v", expectedFieldTerms, fieldTerms)
}
err = indexReader.Close()
if err != nil {
t.Fatal(err)
}
}

View File

@ -248,7 +248,10 @@ func (i *IndexSnapshot) Document(id string) (rv *document.Document, err error) {
return nil, nil return nil, nil
} }
docNum := docInternalToNumber(next.ID) docNum, err := docInternalToNumber(next.ID)
if err != nil {
return nil, err
}
segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum)
rv = document.NewDocument(id) rv = document.NewDocument(id)
@ -286,12 +289,15 @@ func (i *IndexSnapshot) segmentIndexAndLocalDocNumFromGlobal(docNum uint64) (int
} }
func (i *IndexSnapshot) ExternalID(id index.IndexInternalID) (string, error) { func (i *IndexSnapshot) ExternalID(id index.IndexInternalID) (string, error) {
docNum := docInternalToNumber(id) docNum, err := docInternalToNumber(id)
if err != nil {
return "", err
}
segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum)
var found bool var found bool
var rv string var rv string
err := i.segment[segmentIndex].VisitDocument(localDocNum, func(field string, typ byte, value []byte, pos []uint64) bool { err = i.segment[segmentIndex].VisitDocument(localDocNum, func(field string, typ byte, value []byte, pos []uint64) bool {
if field == "_id" { if field == "_id" {
found = true found = true
rv = string(value) rv = string(value)
@ -377,15 +383,6 @@ func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
return rv, nil return rv, nil
} }
func (i *IndexSnapshot) DocumentVisitFieldTerms(id index.IndexInternalID, fields []string,
visitor index.DocumentFieldTermVisitor) error {
docNum := docInternalToNumber(id)
segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum)
return i.segment[segmentIndex].DocumentVisitFieldTerms(localDocNum, fields, visitor)
}
func docNumberToBytes(in uint64) []byte { func docNumberToBytes(in uint64) []byte {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
@ -393,8 +390,50 @@ func docNumberToBytes(in uint64) []byte {
return buf.Bytes() return buf.Bytes()
} }
func docInternalToNumber(in index.IndexInternalID) uint64 { func docInternalToNumber(in index.IndexInternalID) (uint64, error) {
var res uint64 var res uint64
_ = binary.Read(bytes.NewReader(in), binary.BigEndian, &res) err := binary.Read(bytes.NewReader(in), binary.BigEndian, &res)
return res if err != nil {
return res, err
}
return res, nil
}
func (i *IndexSnapshot) DocumentVisitFieldTerms(id index.IndexInternalID,
fields []string, visitor index.DocumentFieldTermVisitor) error {
docNum, err := docInternalToNumber(id)
if err != nil {
return err
}
segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum)
if segmentIndex >= len(i.segment) {
return nil
}
i.m.Lock()
ss := i.segment[segmentIndex]
if ss.cachedDocs == nil {
ss.cachedDocs = &cachedDocs{cache: nil}
}
i.m.Unlock()
err = ss.cachedDocs.prepareFields(localDocNum, fields, ss)
if err != nil {
return err
}
for _, field := range fields {
if cachedFieldDocs, exists := ss.cachedDocs.cache[field]; exists {
if tlist, exists := cachedFieldDocs.docs[localDocNum]; exists {
terms := bytes.SplitN(tlist, TermSeparatorSplitSlice, -1)
for _, term := range terms {
if len(term) > 0 {
visitor(field, term)
}
}
}
}
}
return nil
} }

View File

@ -15,11 +15,17 @@
package scorch package scorch
import ( import (
"sync"
"github.com/RoaringBitmap/roaring" "github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment" "github.com/blevesearch/bleve/index/scorch/segment"
) )
var TermSeparator byte = 0xff
var TermSeparatorSplitSlice = []byte{TermSeparator}
type SegmentDictionarySnapshot struct { type SegmentDictionarySnapshot struct {
s *SegmentSnapshot s *SegmentSnapshot
d segment.TermDictionary d segment.TermDictionary
@ -46,7 +52,8 @@ type SegmentSnapshot struct {
segment segment.Segment segment segment.Segment
deleted *roaring.Bitmap deleted *roaring.Bitmap
notify []chan error notify []chan error
cachedDocs *cachedDocs
} }
func (s *SegmentSnapshot) Id() uint64 { func (s *SegmentSnapshot) Id() uint64 {
@ -157,3 +164,92 @@ func (s *SegmentSnapshot) DocNumbersLive() *roaring.Bitmap {
func (s *SegmentSnapshot) Fields() []string { func (s *SegmentSnapshot) Fields() []string {
return s.segment.Fields() return s.segment.Fields()
} }
type cachedFieldDocs struct {
readyCh chan struct{} // closed when the cachedFieldDocs.docs is ready to be used.
err error // Non-nil if there was an error when preparing this cachedFieldDocs.
docs map[uint64][]byte // Keyed by localDocNum, value is a list of terms delimited by 0xFF.
}
func (cfd *cachedFieldDocs) prepareFields(docNum uint64, field string,
ss *SegmentSnapshot) {
defer close(cfd.readyCh)
dict, err := ss.segment.Dictionary(field)
if err != nil {
cfd.err = err
return
}
dictItr := dict.Iterator()
next, err := dictItr.Next()
for next != nil && err == nil {
postings, err1 := dict.PostingsList(next.Term, nil)
if err1 != nil {
cfd.err = err1
return
}
postingsItr := postings.Iterator()
nextPosting, err2 := postingsItr.Next()
for err2 == nil && nextPosting != nil && nextPosting.Number() <= docNum {
if nextPosting.Number() == docNum {
// got what we're looking for
cfd.docs[docNum] = append(cfd.docs[docNum], []byte(next.Term)...)
cfd.docs[docNum] = append(cfd.docs[docNum], TermSeparator)
}
nextPosting, err2 = postingsItr.Next()
}
if err2 != nil {
cfd.err = err2
return
}
next, err = dictItr.Next()
}
if err != nil {
cfd.err = err
return
}
}
type cachedDocs struct {
m sync.Mutex // As the cache is asynchronously prepared, need a lock
cache map[string]*cachedFieldDocs // Keyed by field
}
func (c *cachedDocs) prepareFields(docNum uint64, wantedFields []string,
ss *SegmentSnapshot) error {
c.m.Lock()
if c.cache == nil {
c.cache = make(map[string]*cachedFieldDocs, len(ss.Fields()))
}
for _, field := range wantedFields {
_, exists := c.cache[field]
if !exists {
c.cache[field] = &cachedFieldDocs{
readyCh: make(chan struct{}),
docs: make(map[uint64][]byte),
}
go c.cache[field].prepareFields(docNum, field, ss)
}
}
for _, field := range wantedFields {
cachedFieldDocs := c.cache[field]
c.m.Unlock()
<-cachedFieldDocs.readyCh
if cachedFieldDocs.err != nil {
return cachedFieldDocs.err
}
c.m.Lock()
}
c.m.Unlock()
return nil
}