diff --git a/index/analysis.go b/index/analysis.go index b626b9f3..840dad97 100644 --- a/index/analysis.go +++ b/index/analysis.go @@ -14,7 +14,10 @@ package index -import "github.com/blevesearch/bleve/document" +import ( + "github.com/blevesearch/bleve/analysis" + "github.com/blevesearch/bleve/document" +) type IndexRow interface { KeySize() int @@ -29,6 +32,11 @@ type IndexRow interface { type AnalysisResult struct { DocID string Rows []IndexRow + + // scorch + Document *document.Document + Analyzed []analysis.TokenFrequencies + Length []int } type AnalysisWork struct { diff --git a/index/scorch/README.md b/index/scorch/README.md new file mode 100644 index 00000000..cec982eb --- /dev/null +++ b/index/scorch/README.md @@ -0,0 +1,420 @@ +# scorch + +## Definitions + +Batch +- A collection of Documents to mutate in the index. + +Document +- Has a unique identifier (arbitrary bytes). +- Is comprised of a list of fields. + +Field +- Has a name (string). +- Has a type (text, number, date, geopoint). +- Has a value (depending on type). +- Can be indexed, stored, or both. +- If indexed, can be analyzed. +-m If indexed, can optionally store term vectors. + +## Scope + +Scorch *MUST* implement the bleve.index API without requiring any changes to this API. + +Scorch *MAY* introduce new interfaces, which can be discovered to allow use of new capabilities not in the current API. + +## Implementation + +The scorch implementation starts with the concept of a segmented index. + +A segment is simply a slice, subset, or portion of the entire index. A segmented index is one which is composed of one or more segments. Although segments are created in a particular order, knowing this ordering is not required to achieve correct semantics when querying. Because there is no ordering, this means that when searching an index, you can (and should) search all the segments concurrently. + +### Internal Wrapper + +In order to accommodate the existing APIs while also improving the implementation, the scorch implementation includes some wrapper functionality that must be described. + +#### \_id field + +In scorch, field 0 is prearranged to be named \_id. All documents have a value for this field, which is the documents external identifier. In this version the field *MUST* be both indexed AND stored. The scorch wrapper adds this field, as it will not be present in the Document from the calling bleve code. + +NOTE: If a document already contains a field \_id, it will be replaced. If this is problematic, the caller must ensure such a scenario does not happen. + +### Proposed Structures + +``` +type Segment interface { + + Dictionary(field string) TermDictionary + +} + +type TermDictionary interface { + + PostingsList(term string, excluding PostingsList) PostingsList + +} + +type PostingsList interface { + + Next() Posting + + And(other PostingsList) PostingsList + Or(other PostingsList) PostingsList + +} + +type Posting interface { + Number() uint64 + + Frequency() uint64 + Norm() float64 + + Locations() Locations +} + +type Locations interface { + Start() uint64 + End() uint64 + Pos() uint64 + ArrayPositions() ... +} + +type DeletedDocs { + +} + +type SegmentSnapshot struct { + segment Segment + deleted PostingsList +} + +type IndexSnapshot struct { + segment []SegmentSnapshot +} +``` +**What about errors?** +**What about memory mgmnt or context?** +**Postings List separate iterator to separate stateful from stateless** +### Mutating the Index + +The bleve.index API has methods for directly making individual mutations (Update/Delete/SetInternal/DeleteInternal), however for this first implementation, we assume that all of these calls can simply be turned into a Batch of size 1. This may be highly inefficient, but it will be correct. This decision is made based on the fact that Couchbase FTS always uses Batches. + +NOTE: As a side-effect of this decision, it should be clear that performance tuning may depend on the batch size, which may in-turn require changes in FTS. + +From this point forward, only Batch mutations will be discussed. + +Sequence of Operations: + +1. For each document in the batch, search through all existing segments. The goal is to build up a per-segment bitset which tells us which documents in that segment are obsoleted by the addition of the new segment we're currently building. NOTE: we're not ready for this change to take effect yet, so rather than this operation mutating anything, they simply return bitsets, which we can apply later. Logically, this is something like: + + ``` + foreach segment { + dict := segment.Dictionary("\_id") + postings := empty postings list + foreach docID { + postings = postings.Or(dict.PostingsList(docID, nil)) + } + } + ``` + + NOTE: it is illustrated above as nested for loops, but some or all of these could be concurrently. The end result is that for each segment, we have (possibly empty) bitset. + +2. Also concurrent with 1, the documents in the batch are analyzed. This analysis proceeds using the existing analyzer pool. + +3. (after 2 completes) Analyzed documents are fed into a function which builds a new Segment representing this information. + +4. We now have everything we need to update the state of the system to include this new snapshot. + + - Acquire a lock + - Create a new IndexSnapshot + - For each SegmentSnapshot in the IndexSnapshot, take the deleted PostingsList and OR it with the new postings list for this Segment. Construct a new SegmentSnapshot for the segment using this new deleted PostingsList. Append this SegmentSnapshot to the IndexSnapshot. + - Create a new SegmentSnapshot wrapping our new segment with nil deleted docs. + - Append the new SegmentSnapshot to the IndexSnapshot + - Release the lock + +An ASCII art example: + ``` + 0 - Empty Index + + No segments + + IndexSnapshot + segments [] + deleted [] + + + 1 - Index Batch [ A B C ] + + segment 0 + numbers [ 1 2 3 ] + \_id [ A B C ] + + IndexSnapshot + segments [ 0 ] + deleted [ nil ] + + + 2 - Index Batch [ B' ] + + segment 0 1 + numbers [ 1 2 3 ] [ 1 ] + \_id [ A B C ] [ B ] + + Compute bitset segment-0-deleted-by-1: + [ 0 1 0 ] + + OR it with previous (nil) (call it 0-1) + [ 0 1 0 ] + + IndexSnapshot + segments [ 0 1 ] + deleted [ 0-1 nil ] + + 3 - Index Batch [ C' ] + + segment 0 1 2 + numbers [ 1 2 3 ] [ 1 ] [ 1 ] + \_id [ A B C ] [ B ] [ C ] + + Compute bitset segment-0-deleted-by-2: + [ 0 0 1 ] + + OR it with previous ([ 0 1 0 ]) (call it 0-12) + [ 0 1 1 ] + + Compute bitset segment-1-deleted-by-2: + [ 0 0 0 ] + + OR it with previous (nil) + still just nil + + + IndexSnapshot + segments [ 0 1 2 ] + deleted [ 0-12 nil nil ] + ``` + +**is there opportunity to stop early when doc is found in one segment** +**also, more efficient way to find bits for long lists of ids?** + +### Searching + +In the bleve.index API all searching starts by getting an IndexReader, which represents a snapshot of the index at a point in time. + +As described in the section above, our index implementation maintains a pointer to the current IndexSnapshot. When a caller gets an IndexReader, they get a copy of this pointer, and can use it as long as they like. The IndexSnapshot contains SegmentSnapshots, which only contain pointers to immutable segments. The deleted posting lists associated with a segment change over time, but the particular deleted posting list in YOUR snapshot is immutable. This gives a stable view of the data. + +#### Term Search + +Term search is the only searching primitive exposed in today's bleve.index API. This ultimately could limit our ability to take advantage of the indexing improvements, but it also means it will be easier to get a first version of this working. + +A term search for term T in field F will look something like this: + +``` + searchResultPostings = empty + foreach segment { + dict := segment.Dictionary(F) + segmentResultPostings = dict.PostingsList(T, segmentSnapshotDeleted) + // make segmentLocal numbers into global numbers, and flip bits in searchResultPostings + } +``` + +The searchResultPostings will be a new implementation of the TermFieldReader inteface. + +As a reminder this interface is: + +``` +// TermFieldReader is the interface exposing the enumeration of documents +// containing a given term in a given field. Documents are returned in byte +// lexicographic order over their identifiers. +type TermFieldReader interface { + // Next returns the next document containing the term in this field, or nil + // when it reaches the end of the enumeration. The preAlloced TermFieldDoc + // is optional, and when non-nil, will be used instead of allocating memory. + Next(preAlloced *TermFieldDoc) (*TermFieldDoc, error) + + // Advance resets the enumeration at specified document or its immediate + // follower. + Advance(ID IndexInternalID, preAlloced *TermFieldDoc) (*TermFieldDoc, error) + + // Count returns the number of documents contains the term in this field. + Count() uint64 + Close() error +} +``` + +At first glance this appears problematic, we have no way to return documents in order of their identifiers. But it turns out the wording of this perhaps too strong, or a bit ambiguous. Originally, this referred to the external identifiers, but with the introduction of a distinction between internal/external identifiers, returning them in order of their internal identifiers is also acceptable. **ASIDE**: the reason for this is that most callers just use Next() and literally don't care what the order is, they could be in any order and it would be fine. There is only one search that cares and that is the ConjunctionSearcher, which relies on Next/Advance having very specific semantics. Later in this document we will have a proposal to split into multiple interfaces: + +- The weakest interface, only supports Next() no ordering at all. +- Ordered, supporting Advance() +- And/Or'able capable of internally efficiently doing these ops with like interfaces (if not capable then can always fall back to external walking) + +But, the good news is that we don't even have to do that for our first implementation. As long as the global numbers we use for internal identifiers are consistent within this IndexSnapshot, then Next() will be ordered by ascending document number, and Advance() will still work correctly. + +NOTE: there is another place where we rely on the ordering of these hits, and that is in the "\_id" sort order. Previously this was the natural order, and a NOOP for the collector, now it must be implemented by actually sorting on the "\_id" field. We probably should introduce at least a marker interface to detect this. + +An ASCII art example: + +``` +Let's start with the IndexSnapshot we ended with earlier: + +3 - Index Batch [ C' ] + + segment 0 1 2 + numbers [ 1 2 3 ] [ 1 ] [ 1 ] + \_id [ A B C ] [ B ] [ C ] + + Compute bitset segment-0-deleted-by-2: + [ 0 0 1 ] + + OR it with previous ([ 0 1 0 ]) (call it 0-12) + [ 0 1 1 ] + +Compute bitset segment-1-deleted-by-2: + [ 0 0 0 ] + +OR it with previous (nil) + still just nil + + + IndexSnapshot + segments [ 0 1 2 ] + deleted [ 0-12 nil nil ] + +Now let's search for the term 'cat' in the field 'desc' and let's assume that Document C (both versions) would match it. + +Concurrently: + + - Segment 0 + - Get Term Dictionary For Field 'desc' + - From it get Postings List for term 'cat' EXCLUDING 0-12 + - raw segment matches [ 0 0 1 ] but excluding [ 0 1 1 ] gives [ 0 0 0 ] + - Segment 1 + - Get Term Dictionary For Field 'desc' + - From it get Postings List for term 'cat' excluding nil + - [ 0 ] + - Segment 2 + - Get Term Dictionary For Field 'desc' + - From it get Postings List for term 'cat' excluding nil + - [ 1 ] + +Map local bitsets into global number space (global meaning cross-segment but still unique to this snapshot) + +IndexSnapshot already should have mapping something like: +0 - Offset 0 +1 - Offset 3 (because segment 0 had 3 docs) +2 - Offset 4 (becuase segment 1 had 1 doc) + +This maps to search result bitset: + +[ 0 0 0 0 1] + +Caller would call Next() and get doc number 5 (assuming 1 based indexing for now) + +Caller could then ask to get term locations, stored fields, external doc ID for document number 5. Internally in the IndexSnapshot, we can now convert that back, and realize doc number 5 comes from segment 2, 5-4=1 so we're looking for doc number 1 in segment 2. That happens to be C... + +``` + +#### Future improvements + +In the future, interfaces to detect these non-serially operating TermFieldReaders could expose their own And() and Or() up to the higher level Conjunction/Disjunction searchers. Doing this alone offers some win, but also means there would be greater burden on the Searcher code rewriting logical expressions for maximum performance. + +Another related topic is that of peak memory usage. With serially operating TermFieldReaders it was necessary to start them all at the same time and operate in unison. However, with these non-serially operating TermFieldReaders we have the option of doing a few at a time, consolidating them, dispoting the intermediaries, and then doing a few more. For very complex queries with many clauses this could reduce peak memory usage. + + +### Memory Tracking + +All segments must be able to produce two statistics, an estimate of their explicit memory usage, and their actual size on disk (if any). For in-memory segments, disk usage could be zero, and the memory usage represents the entire information content. For mmap-based disk segments, the memory could be as low as the size of tracking structure itself (say just a few pointers). + +This would allow the implementation to throttle or block incoming mutations when a threshold memory usage has (or would be) exceeded. + +### Persistence + +Obviously, we want to support (but maybe not require) asynchronous persistence of segments. My expectation is that segments are initially built in memory. At some point they are persisted to disk. This poses some interesting challenges. + +At runtime, the state of an index (it's IndexSnapshot) is not only the contents of the segments, but also the bitmasks of deleted documents. These bitmasks indirectly encode an ordering in which the segments were added. The reason is that the bitmasks encode which items have been obsoleted by other (subsequent or more future) segments. In the runtime implementation we compute bitmask deltas and then merge them at the same time we bring the new segment in. One idea is that we could take a similar approach on disk. When we persist a segment, we persist the bitmask deltas of segments known to exist at that time, and eventually these can get merged up into a base segment deleted bitmask. + +This also relates to the topic rollback, addressed next... + + +### Rollback + +One desirable property in the Couchbase ecosystem is the ability to rollback to some previous (though typically not long ago) state. One idea for keeping this property in this design is to protect some of the most recent segments from merging. Then, if necessary, they could be "undone" to reveal previous states of the system. In these scenarios "undone" has to properly undo the deleted bitmasks on the other segments. Again, the current thinking is that rather than "undo" anything, it could be work that was deferred in the first place, thus making it easier to logically undo. + +Another possibly related approach would be to tie this into our existing snapshot mechanism. Perhaps simulating a slow reader (holding onto index snapshots) for some period of time, can be the mechanism to achieve the desired end goal. + + +### Internal Storage + +The bleve.index API has support for "internal storage". The ability to store information under a separate name space. + +This is not used for high volume storage, so it is tempting to think we could just put a small k/v store alongside the rest of the index. But, the reality is that this storage is used to maintain key information related to the rollback scenario. Because of this, its crucial that ordering and overwriting of key/value pairs correspond with actual segment persistence in the index. Based on this, I believe its important to put the internal key/value pairs inside the segments themselves. But, this also means that they must follow a similar "deleted" bitmask approach to obsolete values in older segments. But, this also seems to substantially increase the complexity of the solution because of the separate name space, it would appear to require its own bitmask. Further keys aren't numeric, which then implies yet another mapping from internal key to number, etc. + +More thought is required here. + +### Merging + +The segmented index approach requires merging to prevent the number of segments from growing too large. + +Recent experience with LSMs has taught us that having the correct merge strategy can make a huge difference in the overall performance of the system. In particular, a simple merge strategy which merges segments too aggressively can lead to high write amplification and unnecessarily rendering cached data useless. + +A few simple principles have been identified. + +- Roughly we merge multiple smaller segments into a single larger one. +- The larger a segment gets the less likely we should be to ever merge it. +- Segments with large numbers of deleted/obsoleted items are good candidates as the merge will result in a space savings. +- Segments with all items deleted/obsoleted can be dropped. + +Merging of a segment should be able to proceed even if that segment is held by an ongoing snapshot, it should only delay the removal of it. + + +## TODO + +- need reference counting on the segments, to know when we can safely remove? + +- how well will bitmaps perform when large and possibly mmap'd? + + +----- +thinking out loud on storage + +- fields + - field name - field id +- term dictionary + - field id - FST (values postings ids) +- postings + - posting id - postings list +- freqs + - posting id - freqs list +- norms + - posting id - norms list +- stored + - docNum + - field id - field values + + + +---- + +race dialog with steve: + +state: 2, 4, 8 + +- introducing new segment X + - deleted bitmasks, 2, 4, 8 + +- merger, merge 4 and 8 + new segment Y + + +- merger wins + + state: 2, 9 + + introducer: need to recompute bitmask for 9, could lose again and keep losing race + +- introducer wins + + state: 2, 4, 8, X + 2-X, 4-X, 8-X, nil + + merger finishes: new segment Y, is not valid, need to be recomputed diff --git a/index/scorch/field_dict_test.go b/index/scorch/field_dict_test.go new file mode 100644 index 00000000..81285e76 --- /dev/null +++ b/index/scorch/field_dict_test.go @@ -0,0 +1,164 @@ +package scorch + +import ( + "reflect" + "testing" + + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/index" +) + +func TestIndexFieldDict(t *testing.T) { + + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + var expectedCount uint64 + doc := document.NewDocument("1") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("test"))) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + doc = document.NewDocument("2") + doc.AddField(document.NewTextFieldWithAnalyzer("name", []uint64{}, []byte("test test test"), testAnalyzer)) + doc.AddField(document.NewTextFieldCustom("desc", []uint64{}, []byte("eat more rice"), document.IndexField|document.IncludeTermVectors, testAnalyzer)) + doc.AddField(document.NewTextFieldCustom("prefix", []uint64{}, []byte("bob cat cats catting dog doggy zoo"), document.IndexField|document.IncludeTermVectors, testAnalyzer)) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } + defer func() { + err := indexReader.Close() + if err != nil { + t.Fatal(err) + } + }() + + dict, err := indexReader.FieldDict("name") + if err != nil { + t.Errorf("error creating reader: %v", err) + } + defer func() { + err := dict.Close() + if err != nil { + t.Fatal(err) + } + }() + + termCount := 0 + curr, err := dict.Next() + for err == nil && curr != nil { + termCount++ + if curr.Term != "test" { + t.Errorf("expected term to be 'test', got '%s'", curr.Term) + } + curr, err = dict.Next() + } + if termCount != 1 { + t.Errorf("expected 1 term for this field, got %d", termCount) + } + + dict2, err := indexReader.FieldDict("desc") + if err != nil { + t.Errorf("error creating reader: %v", err) + } + defer func() { + err := dict2.Close() + if err != nil { + t.Fatal(err) + } + }() + + termCount = 0 + terms := make([]string, 0) + curr, err = dict2.Next() + for err == nil && curr != nil { + termCount++ + terms = append(terms, curr.Term) + curr, err = dict2.Next() + } + if termCount != 3 { + t.Errorf("expected 3 term for this field, got %d", termCount) + } + expectedTerms := []string{"eat", "more", "rice"} + if !reflect.DeepEqual(expectedTerms, terms) { + t.Errorf("expected %#v, got %#v", expectedTerms, terms) + } + // test start and end range + dict3, err := indexReader.FieldDictRange("desc", []byte("fun"), []byte("nice")) + if err != nil { + t.Errorf("error creating reader: %v", err) + } + defer func() { + err := dict3.Close() + if err != nil { + t.Fatal(err) + } + }() + + termCount = 0 + terms = make([]string, 0) + curr, err = dict3.Next() + for err == nil && curr != nil { + termCount++ + terms = append(terms, curr.Term) + curr, err = dict3.Next() + } + if termCount != 1 { + t.Errorf("expected 1 term for this field, got %d", termCount) + } + expectedTerms = []string{"more"} + if !reflect.DeepEqual(expectedTerms, terms) { + t.Errorf("expected %#v, got %#v", expectedTerms, terms) + } + + // test use case for prefix + dict4, err := indexReader.FieldDictPrefix("prefix", []byte("cat")) + if err != nil { + t.Errorf("error creating reader: %v", err) + } + defer func() { + err := dict4.Close() + if err != nil { + t.Fatal(err) + } + }() + + termCount = 0 + terms = make([]string, 0) + curr, err = dict4.Next() + for err == nil && curr != nil { + termCount++ + terms = append(terms, curr.Term) + curr, err = dict4.Next() + } + if termCount != 3 { + t.Errorf("expected 3 term for this field, got %d", termCount) + } + expectedTerms = []string{"cat", "cats", "catting"} + if !reflect.DeepEqual(expectedTerms, terms) { + t.Errorf("expected %#v, got %#v", expectedTerms, terms) + } +} diff --git a/index/scorch/introducer.go b/index/scorch/introducer.go new file mode 100644 index 00000000..dc748ad8 --- /dev/null +++ b/index/scorch/introducer.go @@ -0,0 +1,85 @@ +package scorch + +import ( + "github.com/RoaringBitmap/roaring" + "github.com/blevesearch/bleve/index/scorch/segment" +) + +type segmentIntroduction struct { + id uint64 + data segment.Segment + obsoletes map[uint64]*roaring.Bitmap + ids []string + internal map[string][]byte + + applied chan struct{} +} + +func (s *Scorch) mainLoop() { + for { + select { + case <-s.closeCh: + return + + case next := <-s.introductions: + + // acquire lock + s.rootLock.Lock() + + // prepare new index snapshot, with curr size + 1 + newSnapshot := &IndexSnapshot{ + segment: make([]*SegmentSnapshot, len(s.root.segment)+1), + offsets: make([]uint64, len(s.root.segment)+1), + internal: make(map[string][]byte, len(s.root.segment)), + } + + // iterate through current segments + var running uint64 + for i := range s.root.segment { + // see if optimistic work included this segment + delta, ok := next.obsoletes[s.root.segment[i].id] + if !ok { + delta = s.root.segment[i].segment.DocNumbers(next.ids) + } + newSnapshot.segment[i] = &SegmentSnapshot{ + id: s.root.segment[i].id, + segment: s.root.segment[i].segment, + } + // apply new obsoletions + if s.root.segment[i].deleted == nil { + newSnapshot.segment[i].deleted = delta + } else { + newSnapshot.segment[i].deleted = s.root.segment[i].deleted.Clone() + newSnapshot.segment[i].deleted.Or(delta) + } + + newSnapshot.offsets[i] = running + running += s.root.segment[i].Count() + + } + // put new segment at end + newSnapshot.segment[len(s.root.segment)] = &SegmentSnapshot{ + id: next.id, + segment: next.data, + } + newSnapshot.offsets[len(s.root.segment)] = running + // copy old values + for key, oldVal := range s.root.internal { + newSnapshot.internal[key] = oldVal + } + // set new values and apply deletes + for key, newVal := range next.internal { + if newVal != nil { + newSnapshot.internal[key] = newVal + } else { + delete(newSnapshot.internal, key) + } + } + // swap in new segment + s.root = newSnapshot + // release lock + s.rootLock.Unlock() + close(next.applied) + } + } +} diff --git a/index/scorch/reader.go b/index/scorch/reader.go new file mode 100644 index 00000000..acb01905 --- /dev/null +++ b/index/scorch/reader.go @@ -0,0 +1,84 @@ +package scorch + +import ( + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/index" +) + +type Reader struct { + root *IndexSnapshot +} + +func (r *Reader) TermFieldReader(term []byte, field string, includeFreq, + includeNorm, includeTermVectors bool) (index.TermFieldReader, error) { + return r.root.TermFieldReader(term, field, includeFreq, includeNorm, includeTermVectors) +} + +// DocIDReader returns an iterator over all doc ids +// The caller must close returned instance to release associated resources. +func (r *Reader) DocIDReaderAll() (index.DocIDReader, error) { + return r.root.DocIDReaderAll() +} + +func (r *Reader) DocIDReaderOnly(ids []string) (index.DocIDReader, error) { + return r.root.DocIDReaderOnly(ids) +} + +func (r *Reader) FieldDict(field string) (index.FieldDict, error) { + return r.root.FieldDict(field) +} + +// FieldDictRange is currently defined to include the start and end terms +func (r *Reader) FieldDictRange(field string, startTerm []byte, + endTerm []byte) (index.FieldDict, error) { + return r.root.FieldDictRange(field, startTerm, endTerm) +} + +func (r *Reader) FieldDictPrefix(field string, + termPrefix []byte) (index.FieldDict, error) { + return r.root.FieldDictPrefix(field, termPrefix) +} + +func (r *Reader) Document(id string) (*document.Document, error) { + return r.root.Document(id) +} +func (r *Reader) DocumentVisitFieldTerms(id index.IndexInternalID, fields []string, + visitor index.DocumentFieldTermVisitor) error { + panic("document visit field terms not implemented") +} + +func (r *Reader) Fields() ([]string, error) { + return r.root.Fields() +} + +func (r *Reader) GetInternal(key []byte) ([]byte, error) { + return r.root.GetInternal(key) +} + +func (r *Reader) DocCount() (uint64, error) { + return r.root.DocCount() +} + +func (r *Reader) ExternalID(id index.IndexInternalID) (string, error) { + return r.root.ExternalID(id) +} + +func (r *Reader) InternalID(id string) (index.IndexInternalID, error) { + return r.root.InternalID(id) +} + +func (r *Reader) DumpAll() chan interface{} { + panic("dumpall") +} + +func (r *Reader) DumpDoc(id string) chan interface{} { + panic("dumpdoc") +} + +func (r *Reader) DumpFields() chan interface{} { + panic("dumpfields") +} + +func (r *Reader) Close() error { + return nil +} diff --git a/index/scorch/reader_test.go b/index/scorch/reader_test.go new file mode 100644 index 00000000..a050bb44 --- /dev/null +++ b/index/scorch/reader_test.go @@ -0,0 +1,511 @@ +package scorch + +import ( + "reflect" + "testing" + + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/index" +) + +func TestIndexReader(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + var expectedCount uint64 + doc := document.NewDocument("1") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("test"))) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + doc = document.NewDocument("2") + doc.AddField(document.NewTextFieldWithAnalyzer("name", []uint64{}, []byte("test test test"), testAnalyzer)) + doc.AddField(document.NewTextFieldCustom("desc", []uint64{}, []byte("eat more rice"), document.IndexField|document.IncludeTermVectors, testAnalyzer)) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } + defer func() { + err := indexReader.Close() + if err != nil { + t.Fatal(err) + } + }() + + // first look for a term that doesn't exist + reader, err := indexReader.TermFieldReader([]byte("nope"), "name", true, true, true) + if err != nil { + t.Errorf("Error accessing term field reader: %v", err) + } + count := reader.Count() + if count != 0 { + t.Errorf("Expected doc count to be: %d got: %d", 0, count) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + reader, err = indexReader.TermFieldReader([]byte("test"), "name", true, true, true) + if err != nil { + t.Errorf("Error accessing term field reader: %v", err) + } + + expectedCount = 2 + count = reader.Count() + if count != expectedCount { + t.Errorf("Exptected doc count to be: %d got: %d", expectedCount, count) + } + + var match *index.TermFieldDoc + var actualCount uint64 + match, err = reader.Next(nil) + for err == nil && match != nil { + match, err = reader.Next(nil) + if err != nil { + t.Errorf("unexpected error reading next") + } + actualCount++ + } + if actualCount != count { + t.Errorf("count was 2, but only saw %d", actualCount) + } + + internalID2, err := indexReader.InternalID("2") + if err != nil { + t.Fatal(err) + } + expectedMatch := &index.TermFieldDoc{ + ID: internalID2, + Freq: 1, + Norm: 0.5773502588272095, + Vectors: []*index.TermFieldVector{ + { + Field: "desc", + Pos: 3, + Start: 9, + End: 13, + }, + }, + } + tfr, err := indexReader.TermFieldReader([]byte("rice"), "desc", true, true, true) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + match, err = tfr.Next(nil) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(expectedMatch, match) { + t.Errorf("got %#v, expected %#v", match, expectedMatch) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + // now test usage of advance + reader, err = indexReader.TermFieldReader([]byte("test"), "name", true, true, true) + if err != nil { + t.Errorf("Error accessing term field reader: %v", err) + } + + match, err = reader.Advance(internalID2, nil) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if match == nil { + t.Fatalf("Expected match, got nil") + } + if !match.ID.Equals(internalID2) { + t.Errorf("Expected ID '2', got '%s'", match.ID) + } + // NOTE: no point in changing this to internal id 3, there is no id 3 + // the test is looking for something that doens't exist and this doesn't + match, err = reader.Advance(index.IndexInternalID("3"), nil) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if match != nil { + t.Errorf("expected nil, got %v", match) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + // now test creating a reader for a field that doesn't exist + reader, err = indexReader.TermFieldReader([]byte("water"), "doesnotexist", true, true, true) + if err != nil { + t.Errorf("Error accessing term field reader: %v", err) + } + count = reader.Count() + if count != 0 { + t.Errorf("expected count 0 for reader of non-existent field") + } + match, err = reader.Next(nil) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if match != nil { + t.Errorf("expected nil, got %v", match) + } + match, err = reader.Advance(index.IndexInternalID("anywhere"), nil) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if match != nil { + t.Errorf("expected nil, got %v", match) + } + +} + +func TestIndexDocIdReader(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + var expectedCount uint64 + doc := document.NewDocument("1") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("test"))) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + doc = document.NewDocument("2") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("test test test"))) + doc.AddField(document.NewTextFieldWithIndexingOptions("desc", []uint64{}, []byte("eat more rice"), document.IndexField|document.IncludeTermVectors)) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } + defer func() { + err := indexReader.Close() + if err != nil { + t.Error(err) + } + }() + + // first get all doc ids + reader, err := indexReader.DocIDReaderAll() + if err != nil { + t.Errorf("Error accessing doc id reader: %v", err) + } + defer func() { + err := reader.Close() + if err != nil { + t.Fatal(err) + } + }() + + id, err := reader.Next() + count := uint64(0) + for id != nil { + count++ + id, err = reader.Next() + } + if count != expectedCount { + t.Errorf("expected %d, got %d", expectedCount, count) + } + + // try it again, but jump to the second doc this time + reader2, err := indexReader.DocIDReaderAll() + if err != nil { + t.Errorf("Error accessing doc id reader: %v", err) + } + defer func() { + err := reader2.Close() + if err != nil { + t.Error(err) + } + }() + + internalID2, err := indexReader.InternalID("2") + if err != nil { + t.Fatal(err) + } + + id, err = reader2.Advance(internalID2) + if err != nil { + t.Error(err) + } + if !id.Equals(internalID2) { + t.Errorf("expected to find id '2', got '%s'", id) + } + + // again 3 doesn't exist cannot use internal id for 3 as there is none + // the important aspect is that this id doesn't exist, so its ok + id, err = reader2.Advance(index.IndexInternalID("3")) + if err != nil { + t.Error(err) + } + if id != nil { + t.Errorf("expected to find id '', got '%s'", id) + } +} + +func TestIndexDocIdOnlyReader(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + doc := document.NewDocument("1") + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + + doc = document.NewDocument("3") + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + + doc = document.NewDocument("5") + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + + doc = document.NewDocument("7") + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + + doc = document.NewDocument("9") + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } + defer func() { + err := indexReader.Close() + if err != nil { + t.Error(err) + } + }() + + onlyIds := []string{"1", "5", "9"} + reader, err := indexReader.DocIDReaderOnly(onlyIds) + if err != nil { + t.Errorf("Error accessing doc id reader: %v", err) + } + defer func() { + err := reader.Close() + if err != nil { + t.Fatal(err) + } + }() + + id, err := reader.Next() + count := uint64(0) + for id != nil { + count++ + id, err = reader.Next() + if err != nil { + t.Fatal(err) + } + } + if count != 3 { + t.Errorf("expected 3, got %d", count) + } + + // commented out because advance works with internal ids + // this test presumes we see items in external doc id order + // which is no longer the case, so simply converting external ids + // to internal ones is not logically correct + // not removing though because we need some way to test Advance() + + // // try it again, but jump + // reader2, err := indexReader.DocIDReaderOnly(onlyIds) + // if err != nil { + // t.Errorf("Error accessing doc id reader: %v", err) + // } + // defer func() { + // err := reader2.Close() + // if err != nil { + // t.Error(err) + // } + // }() + // + // id, err = reader2.Advance(index.IndexInternalID("5")) + // if err != nil { + // t.Error(err) + // } + // if !id.Equals(index.IndexInternalID("5")) { + // t.Errorf("expected to find id '5', got '%s'", id) + // } + // + // id, err = reader2.Advance(index.IndexInternalID("a")) + // if err != nil { + // t.Error(err) + // } + // if id != nil { + // t.Errorf("expected to find id '', got '%s'", id) + // } + + // some keys aren't actually there + onlyIds = []string{"0", "2", "4", "5", "6", "8", "a"} + reader3, err := indexReader.DocIDReaderOnly(onlyIds) + if err != nil { + t.Errorf("Error accessing doc id reader: %v", err) + } + defer func() { + err := reader3.Close() + if err != nil { + t.Error(err) + } + }() + + id, err = reader3.Next() + count = uint64(0) + for id != nil { + count++ + id, err = reader3.Next() + } + if count != 1 { + t.Errorf("expected 1, got %d", count) + } + + // commented out because advance works with internal ids + // this test presumes we see items in external doc id order + // which is no longer the case, so simply converting external ids + // to internal ones is not logically correct + // not removing though because we need some way to test Advance() + + // // mix advance and next + // onlyIds = []string{"0", "1", "3", "5", "6", "9"} + // reader4, err := indexReader.DocIDReaderOnly(onlyIds) + // if err != nil { + // t.Errorf("Error accessing doc id reader: %v", err) + // } + // defer func() { + // err := reader4.Close() + // if err != nil { + // t.Error(err) + // } + // }() + // + // // first key is "1" + // id, err = reader4.Next() + // if err != nil { + // t.Error(err) + // } + // if !id.Equals(index.IndexInternalID("1")) { + // t.Errorf("expected to find id '1', got '%s'", id) + // } + // + // // advancing to key we dont have gives next + // id, err = reader4.Advance(index.IndexInternalID("2")) + // if err != nil { + // t.Error(err) + // } + // if !id.Equals(index.IndexInternalID("3")) { + // t.Errorf("expected to find id '3', got '%s'", id) + // } + // + // // next after advance works + // id, err = reader4.Next() + // if err != nil { + // t.Error(err) + // } + // if !id.Equals(index.IndexInternalID("5")) { + // t.Errorf("expected to find id '5', got '%s'", id) + // } + // + // // advancing to key we do have works + // id, err = reader4.Advance(index.IndexInternalID("9")) + // if err != nil { + // t.Error(err) + // } + // if !id.Equals(index.IndexInternalID("9")) { + // t.Errorf("expected to find id '9', got '%s'", id) + // } + // + // // advance backwards at end + // id, err = reader4.Advance(index.IndexInternalID("4")) + // if err != nil { + // t.Error(err) + // } + // if !id.Equals(index.IndexInternalID("5")) { + // t.Errorf("expected to find id '5', got '%s'", id) + // } + // + // // next after advance works + // id, err = reader4.Next() + // if err != nil { + // t.Error(err) + // } + // if !id.Equals(index.IndexInternalID("9")) { + // t.Errorf("expected to find id '9', got '%s'", id) + // } + // + // // advance backwards to key that exists, but not in only set + // id, err = reader4.Advance(index.IndexInternalID("7")) + // if err != nil { + // t.Error(err) + // } + // if !id.Equals(index.IndexInternalID("9")) { + // t.Errorf("expected to find id '9', got '%s'", id) + // } + +} diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go new file mode 100644 index 00000000..d20261f5 --- /dev/null +++ b/index/scorch/scorch.go @@ -0,0 +1,218 @@ +package scorch + +import ( + "encoding/json" + "sync" + "sync/atomic" + "time" + + "github.com/RoaringBitmap/roaring" + "github.com/blevesearch/bleve/analysis" + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/index" + "github.com/blevesearch/bleve/index/scorch/segment" + "github.com/blevesearch/bleve/index/scorch/segment/mem" + "github.com/blevesearch/bleve/index/store" + "github.com/blevesearch/bleve/registry" +) + +const Name = "scorch" + +const Version uint8 = 1 + +type Scorch struct { + version uint8 + storeConfig map[string]interface{} + analysisQueue *index.AnalysisQueue + stats *Stats + nextSegmentID uint64 + + rootLock sync.RWMutex + root *IndexSnapshot + + closeCh chan struct{} + introductions chan *segmentIntroduction +} + +func NewScorch(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) { + rv := &Scorch{ + version: Version, + storeConfig: storeConfig, + analysisQueue: analysisQueue, + stats: &Stats{}, + root: &IndexSnapshot{}, + } + return rv, nil +} + +func (s *Scorch) Open() error { + s.closeCh = make(chan struct{}) + s.introductions = make(chan *segmentIntroduction) + go s.mainLoop() + return nil +} + +func (s *Scorch) Close() error { + close(s.closeCh) + return nil +} + +func (s *Scorch) Update(doc *document.Document) error { + b := index.NewBatch() + b.Update(doc) + return s.Batch(b) +} + +func (s *Scorch) Delete(id string) error { + b := index.NewBatch() + b.Delete(id) + return s.Batch(b) +} + +// Batch applices a batch of changes to the index atomically +func (s *Scorch) Batch(batch *index.Batch) error { + + analysisStart := time.Now() + + resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps)) + + var numUpdates uint64 + var numPlainTextBytes uint64 + var ids []string + for docID, doc := range batch.IndexOps { + if doc != nil { + // insert _id field + doc.AddField(document.NewTextFieldCustom("_id", nil, []byte(doc.ID), document.IndexField|document.StoreField, nil)) + numUpdates++ + numPlainTextBytes += doc.NumPlainTextBytes() + } + ids = append(ids, docID) + } + + // FIXME could sort ids list concurrent with analysis? + + go func() { + for _, doc := range batch.IndexOps { + if doc != nil { + aw := index.NewAnalysisWork(s, doc, resultChan) + // put the work on the queue + s.analysisQueue.Queue(aw) + } + } + }() + + // wait for analysis result + analysisResults := make([]*index.AnalysisResult, int(numUpdates)) + // newRowsMap := make(map[string][]index.IndexRow) + var itemsDeQueued uint64 + for itemsDeQueued < numUpdates { + result := <-resultChan + //newRowsMap[result.DocID] = result.Rows + analysisResults[itemsDeQueued] = result + itemsDeQueued++ + } + close(resultChan) + + atomic.AddUint64(&s.stats.analysisTime, uint64(time.Since(analysisStart))) + + var newSegment segment.Segment + if len(analysisResults) > 0 { + newSegment = mem.NewFromAnalyzedDocs(analysisResults) + } else { + newSegment = mem.New() + } + s.prepareSegment(newSegment, ids, batch.InternalOps) + + return nil +} + +func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string, + internalOps map[string][]byte) error { + + // new introduction + introduction := &segmentIntroduction{ + id: atomic.AddUint64(&s.nextSegmentID, 1), + data: newSegment, + ids: ids, + obsoletes: make(map[uint64]*roaring.Bitmap), + internal: internalOps, + applied: make(chan struct{}), + } + + // get read lock, to optimistically prepare obsoleted info + s.rootLock.RLock() + for i := range s.root.segment { + delta := s.root.segment[i].segment.DocNumbers(ids) + introduction.obsoletes[s.root.segment[i].id] = delta + } + s.rootLock.RUnlock() + + s.introductions <- introduction + + // block until this segment is applied + <-introduction.applied + + return nil +} + +func (s *Scorch) SetInternal(key, val []byte) error { + b := index.NewBatch() + b.SetInternal(key, val) + return s.Batch(b) +} + +func (s *Scorch) DeleteInternal(key []byte) error { + b := index.NewBatch() + b.DeleteInternal(key) + return s.Batch(b) +} + +// Reader returns a low-level accessor on the index data. Close it to +// release associated resources. +func (s *Scorch) Reader() (index.IndexReader, error) { + s.rootLock.RLock() + defer s.rootLock.RUnlock() + return &Reader{ + root: s.root, + }, nil +} + +func (s *Scorch) Stats() json.Marshaler { + return s.stats +} +func (s *Scorch) StatsMap() map[string]interface{} { + return s.stats.statsMap() +} + +func (s *Scorch) Analyze(d *document.Document) *index.AnalysisResult { + rv := &index.AnalysisResult{ + Document: d, + Analyzed: make([]analysis.TokenFrequencies, len(d.Fields)+len(d.CompositeFields)), + Length: make([]int, len(d.Fields)+len(d.CompositeFields)), + } + + for i, field := range d.Fields { + if field.Options().IsIndexed() { + fieldLength, tokenFreqs := field.Analyze() + rv.Analyzed[i] = tokenFreqs + rv.Length[i] = fieldLength + + if len(d.CompositeFields) > 0 { + // see if any of the composite fields need this + for _, compositeField := range d.CompositeFields { + compositeField.Compose(field.Name(), fieldLength, tokenFreqs) + } + } + } + } + + return rv +} + +func (s *Scorch) Advanced() (store.KVStore, error) { + return nil, nil +} + +func init() { + registry.RegisterIndexType(Name, NewScorch) +} diff --git a/index/scorch/scorch_test.go b/index/scorch/scorch_test.go new file mode 100644 index 00000000..48e80c8b --- /dev/null +++ b/index/scorch/scorch_test.go @@ -0,0 +1,1109 @@ +// Copyright (c) 2014 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scorch + +import ( + "log" + "reflect" + "regexp" + "strconv" + "sync" + "testing" + "time" + + "github.com/blevesearch/bleve/analysis" + regexpTokenizer "github.com/blevesearch/bleve/analysis/tokenizer/regexp" + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/index" +) + +var testAnalyzer = &analysis.Analyzer{ + Tokenizer: regexpTokenizer.NewRegexpTokenizer(regexp.MustCompile(`\w+`)), +} + +func TestIndexInsert(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + var expectedCount uint64 + reader, err := idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err := reader.DocCount() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + doc := document.NewDocument("1") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("test"))) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + reader, err = idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err = reader.DocCount() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestIndexInsertThenDelete(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + var expectedCount uint64 + reader, err := idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err := reader.DocCount() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + doc := document.NewDocument("1") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("test"))) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + doc2 := document.NewDocument("2") + doc2.AddField(document.NewTextField("name", []uint64{}, []byte("test"))) + err = idx.Update(doc2) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + reader, err = idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err = reader.DocCount() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + err = idx.Delete("1") + if err != nil { + t.Errorf("Error deleting entry from index: %v", err) + } + expectedCount-- + + reader, err = idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err = reader.DocCount() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + err = idx.Delete("2") + if err != nil { + t.Errorf("Error deleting entry from index: %v", err) + } + expectedCount-- + + reader, err = idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err = reader.DocCount() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestIndexInsertThenUpdate(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + + var expectedCount uint64 + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + doc := document.NewDocument("1") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("test"))) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + // this update should overwrite one term, and introduce one new one + doc = document.NewDocument("1") + doc.AddField(document.NewTextFieldWithAnalyzer("name", []uint64{}, []byte("test fail"), testAnalyzer)) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error deleting entry from index: %v", err) + } + + // now do another update that should remove one of the terms + doc = document.NewDocument("1") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("fail"))) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error deleting entry from index: %v", err) + } + + reader, err := idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err := reader.DocCount() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestIndexInsertMultiple(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + + var expectedCount uint64 + + doc := document.NewDocument("1") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("test"))) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + doc = document.NewDocument("2") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("test"))) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + doc = document.NewDocument("3") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("test"))) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + reader, err := idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err := reader.DocCount() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestIndexInsertWithStore(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + var expectedCount uint64 + reader, err := idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err := reader.DocCount() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + doc := document.NewDocument("1") + doc.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("test"), document.IndexField|document.StoreField)) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + reader, err = idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err = reader.DocCount() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } + defer func() { + err := indexReader.Close() + if err != nil { + t.Fatal(err) + } + }() + + storedDoc, err := indexReader.Document("1") + if err != nil { + t.Error(err) + } + + if len(storedDoc.Fields) != 2 { + t.Errorf("expected 1 stored field, got %d", len(storedDoc.Fields)) + } + for _, field := range storedDoc.Fields { + if field.Name() == "name" { + textField, ok := field.(*document.TextField) + if !ok { + t.Errorf("expected text field") + } + if string(textField.Value()) != "test" { + t.Errorf("expected field content 'test', got '%s'", string(textField.Value())) + } + } else if field.Name() == "_id" { + textField, ok := field.(*document.TextField) + if !ok { + t.Errorf("expected text field") + } + if string(textField.Value()) != "1" { + t.Errorf("expected field content '1', got '%s'", string(textField.Value())) + } + } + } +} + +func TestIndexInternalCRUD(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } + + // get something that doesn't exist yet + val, err := indexReader.GetInternal([]byte("key")) + if err != nil { + t.Error(err) + } + if val != nil { + t.Errorf("expected nil, got %s", val) + } + + err = indexReader.Close() + if err != nil { + t.Fatal(err) + } + + // set + err = idx.SetInternal([]byte("key"), []byte("abc")) + if err != nil { + t.Error(err) + } + + indexReader2, err := idx.Reader() + if err != nil { + t.Error(err) + } + + // get + val, err = indexReader2.GetInternal([]byte("key")) + if err != nil { + t.Error(err) + } + if string(val) != "abc" { + t.Errorf("expected %s, got '%s'", "abc", val) + } + + err = indexReader2.Close() + if err != nil { + t.Fatal(err) + } + + // delete + err = idx.DeleteInternal([]byte("key")) + if err != nil { + t.Error(err) + } + + indexReader3, err := idx.Reader() + if err != nil { + t.Error(err) + } + + // get again + val, err = indexReader3.GetInternal([]byte("key")) + if err != nil { + t.Error(err) + } + if val != nil { + t.Errorf("expected nil, got %s", val) + } + + err = indexReader3.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestIndexBatch(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + var expectedCount uint64 + + // first create 2 docs the old fashioned way + doc := document.NewDocument("1") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("test"))) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + doc = document.NewDocument("2") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("test2"))) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + // now create a batch which does 3 things + // insert new doc + // update existing doc + // delete existing doc + // net document count change 0 + + batch := index.NewBatch() + doc = document.NewDocument("3") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("test3"))) + batch.Update(doc) + doc = document.NewDocument("2") + doc.AddField(document.NewTextField("name", []uint64{}, []byte("test2updated"))) + batch.Update(doc) + batch.Delete("1") + + err = idx.Batch(batch) + if err != nil { + t.Error(err) + } + + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } + defer func() { + err := indexReader.Close() + if err != nil { + t.Fatal(err) + } + }() + + docCount, err := indexReader.DocCount() + if err != nil { + t.Fatal(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + + docIDReader, err := indexReader.DocIDReaderAll() + if err != nil { + t.Error(err) + } + var docIds []index.IndexInternalID + docID, err := docIDReader.Next() + for docID != nil && err == nil { + docIds = append(docIds, docID) + docID, err = docIDReader.Next() + } + if err != nil { + t.Error(err) + } + externalDocIds := map[string]struct{}{} + // convert back to external doc ids + for _, id := range docIds { + externalID, err := indexReader.ExternalID(id) + if err != nil { + t.Fatal(err) + } + externalDocIds[externalID] = struct{}{} + } + expectedDocIds := map[string]struct{}{ + "2": struct{}{}, + "3": struct{}{}, + } + if !reflect.DeepEqual(externalDocIds, expectedDocIds) { + t.Errorf("expected ids: %v, got ids: %v", expectedDocIds, externalDocIds) + } +} + +func TestIndexInsertUpdateDeleteWithMultipleTypesStored(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + var expectedCount uint64 + reader, err := idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err := reader.DocCount() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + doc := document.NewDocument("1") + doc.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("test"), document.IndexField|document.StoreField)) + doc.AddField(document.NewNumericFieldWithIndexingOptions("age", []uint64{}, 35.99, document.IndexField|document.StoreField)) + df, err := document.NewDateTimeFieldWithIndexingOptions("unixEpoch", []uint64{}, time.Unix(0, 0), document.IndexField|document.StoreField) + if err != nil { + t.Error(err) + } + doc.AddField(df) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + expectedCount++ + + reader, err = idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err = reader.DocCount() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } + + storedDoc, err := indexReader.Document("1") + if err != nil { + t.Error(err) + } + + err = indexReader.Close() + if err != nil { + t.Error(err) + } + + if len(storedDoc.Fields) != 4 { + t.Errorf("expected 4 stored field, got %d", len(storedDoc.Fields)) + } + for _, field := range storedDoc.Fields { + + if field.Name() == "name" { + textField, ok := field.(*document.TextField) + if !ok { + t.Errorf("expected text field") + } + if string(textField.Value()) != "test" { + t.Errorf("expected field content 'test', got '%s'", string(textField.Value())) + } + } else if field.Name() == "age" { + numField, ok := field.(*document.NumericField) + if !ok { + t.Errorf("expected numeric field") + } + numFieldNumer, err := numField.Number() + if err != nil { + t.Error(err) + } else { + if numFieldNumer != 35.99 { + t.Errorf("expeted numeric value 35.99, got %f", numFieldNumer) + } + } + } else if field.Name() == "unixEpoch" { + dateField, ok := field.(*document.DateTimeField) + if !ok { + t.Errorf("expected date field") + } + dateFieldDate, err := dateField.DateTime() + if err != nil { + t.Error(err) + } else { + if dateFieldDate != time.Unix(0, 0).UTC() { + t.Errorf("expected date value unix epoch, got %v", dateFieldDate) + } + } + } + + } + + // now update the document, but omit one of the fields + doc = document.NewDocument("1") + doc.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("testup"), document.IndexField|document.StoreField)) + doc.AddField(document.NewNumericFieldWithIndexingOptions("age", []uint64{}, 36.99, document.IndexField|document.StoreField)) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + + indexReader2, err := idx.Reader() + if err != nil { + t.Error(err) + } + + // expected doc count shouldn't have changed + docCount, err = indexReader2.DocCount() + if err != nil { + t.Fatal(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + + // should only get 2 fields back now though + storedDoc, err = indexReader2.Document("1") + if err != nil { + t.Error(err) + } + + err = indexReader2.Close() + if err != nil { + t.Error(err) + } + + if len(storedDoc.Fields) != 3 { + t.Errorf("expected 3 stored field, got %d", len(storedDoc.Fields)) + } + + for _, field := range storedDoc.Fields { + + if field.Name() == "name" { + textField, ok := field.(*document.TextField) + if !ok { + t.Errorf("expected text field") + } + if string(textField.Value()) != "testup" { + t.Errorf("expected field content 'testup', got '%s'", string(textField.Value())) + } + } else if field.Name() == "age" { + numField, ok := field.(*document.NumericField) + if !ok { + t.Errorf("expected numeric field") + } + numFieldNumer, err := numField.Number() + if err != nil { + t.Error(err) + } else { + if numFieldNumer != 36.99 { + t.Errorf("expeted numeric value 36.99, got %f", numFieldNumer) + } + } + } + } + + // now delete the document + err = idx.Delete("1") + expectedCount-- + + // expected doc count shouldn't have changed + reader, err = idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err = reader.DocCount() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestIndexInsertFields(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + doc := document.NewDocument("1") + doc.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("test"), document.IndexField|document.StoreField)) + doc.AddField(document.NewNumericFieldWithIndexingOptions("age", []uint64{}, 35.99, document.IndexField|document.StoreField)) + dateField, err := document.NewDateTimeFieldWithIndexingOptions("unixEpoch", []uint64{}, time.Unix(0, 0), document.IndexField|document.StoreField) + if err != nil { + t.Error(err) + } + doc.AddField(dateField) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } + defer func() { + err := indexReader.Close() + if err != nil { + t.Fatal(err) + } + }() + + fields, err := indexReader.Fields() + if err != nil { + t.Error(err) + } else { + fieldsMap := map[string]struct{}{} + for _, field := range fields { + fieldsMap[field] = struct{}{} + } + expectedFieldsMap := map[string]struct{}{ + "_id": struct{}{}, + "name": struct{}{}, + "age": struct{}{}, + "unixEpoch": struct{}{}, + } + if !reflect.DeepEqual(fieldsMap, expectedFieldsMap) { + t.Errorf("expected fields: %v, got %v", expectedFieldsMap, fieldsMap) + } + } + +} + +func TestIndexUpdateComposites(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + doc := document.NewDocument("1") + doc.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("test"), document.IndexField|document.StoreField)) + doc.AddField(document.NewTextFieldWithIndexingOptions("title", []uint64{}, []byte("mister"), document.IndexField|document.StoreField)) + doc.AddField(document.NewCompositeFieldWithIndexingOptions("_all", true, nil, nil, document.IndexField)) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + + // now lets update it + doc = document.NewDocument("1") + doc.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("testupdated"), document.IndexField|document.StoreField)) + doc.AddField(document.NewTextFieldWithIndexingOptions("title", []uint64{}, []byte("misterupdated"), document.IndexField|document.StoreField)) + doc.AddField(document.NewCompositeFieldWithIndexingOptions("_all", true, nil, nil, document.IndexField)) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } + defer func() { + err := indexReader.Close() + if err != nil { + t.Fatal(err) + } + }() + + // make sure new values are in index + storedDoc, err := indexReader.Document("1") + if err != nil { + t.Error(err) + } + if len(storedDoc.Fields) != 3 { + t.Errorf("expected 3 stored field, got %d", len(storedDoc.Fields)) + } + for _, field := range storedDoc.Fields { + if field.Name() == "name" { + textField, ok := field.(*document.TextField) + if !ok { + t.Errorf("expected text field") + } + if string(textField.Value()) != "testupdated" { + t.Errorf("expected field content 'test', got '%s'", string(textField.Value())) + } + } + } +} + +func TestIndexTermReaderCompositeFields(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + doc := document.NewDocument("1") + doc.AddField(document.NewTextFieldWithIndexingOptions("name", []uint64{}, []byte("test"), document.IndexField|document.StoreField|document.IncludeTermVectors)) + doc.AddField(document.NewTextFieldWithIndexingOptions("title", []uint64{}, []byte("mister"), document.IndexField|document.StoreField|document.IncludeTermVectors)) + doc.AddField(document.NewCompositeFieldWithIndexingOptions("_all", true, nil, nil, document.IndexField|document.IncludeTermVectors)) + err = idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + + indexReader, err := idx.Reader() + if err != nil { + t.Error(err) + } + defer func() { + err := indexReader.Close() + if err != nil { + t.Fatal(err) + } + }() + + termFieldReader, err := indexReader.TermFieldReader([]byte("mister"), "_all", true, true, true) + if err != nil { + t.Error(err) + } + + tfd, err := termFieldReader.Next(nil) + for tfd != nil && err == nil { + externalID, err := indexReader.ExternalID(tfd.ID) + if err != nil { + t.Fatal(err) + } + if externalID != "1" { + t.Errorf("expected to find document id 1") + } + tfd, err = termFieldReader.Next(nil) + } + if err != nil { + t.Error(err) + } +} + +func TestConcurrentUpdate(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + // do some concurrent updates + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + doc := document.NewDocument("1") + doc.AddField(document.NewTextFieldWithIndexingOptions(strconv.Itoa(i), []uint64{}, []byte(strconv.Itoa(i)), document.StoreField)) + err := idx.Update(doc) + if err != nil { + t.Errorf("Error updating index: %v", err) + } + wg.Done() + }(i) + } + wg.Wait() + + // now load the name field and see what we get + r, err := idx.Reader() + if err != nil { + log.Fatal(err) + } + + doc, err := r.Document("1") + if err != nil { + log.Fatal(err) + } + + if len(doc.Fields) > 2 { + t.Errorf("expected no more than 2 fields, found %d", len(doc.Fields)) + } +} + +func TestLargeField(t *testing.T) { + analysisQueue := index.NewAnalysisQueue(1) + idx, err := NewScorch(Name, nil, analysisQueue) + if err != nil { + t.Fatal(err) + } + err = idx.Open() + if err != nil { + t.Errorf("error opening index: %v", err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + var largeFieldValue []byte + for len(largeFieldValue) < 4096 { + largeFieldValue = append(largeFieldValue, bleveWikiArticle1K...) + } + + d := document.NewDocument("large") + f := document.NewTextFieldWithIndexingOptions("desc", nil, largeFieldValue, document.IndexField|document.StoreField) + d.AddField(f) + + err = idx.Update(d) + if err != nil { + t.Fatal(err) + } +} + +var bleveWikiArticle1K = []byte(`Boiling liquid expanding vapor explosion +From Wikipedia, the free encyclopedia +See also: Boiler explosion and Steam explosion + +Flames subsequent to a flammable liquid BLEVE from a tanker. BLEVEs do not necessarily involve fire. + +This article's tone or style may not reflect the encyclopedic tone used on Wikipedia. See Wikipedia's guide to writing better articles for suggestions. (July 2013) +A boiling liquid expanding vapor explosion (BLEVE, /ˈblɛviː/ blev-ee) is an explosion caused by the rupture of a vessel containing a pressurized liquid above its boiling point.[1] +Contents [hide] +1 Mechanism +1.1 Water example +1.2 BLEVEs without chemical reactions +2 Fires +3 Incidents +4 Safety measures +5 See also +6 References +7 External links +Mechanism[edit] + +This section needs additional citations for verification. Please help improve this article by adding citations to reliable sources. Unsourced material may be challenged and removed. (July 2013) +There are three characteristics of liquids which are relevant to the discussion of a BLEVE:`) diff --git a/index/scorch/segment/mem/build.go b/index/scorch/segment/mem/build.go new file mode 100644 index 00000000..ff6fb605 --- /dev/null +++ b/index/scorch/segment/mem/build.go @@ -0,0 +1,220 @@ +package mem + +import ( + "math" + "sort" + + "github.com/RoaringBitmap/roaring" + "github.com/blevesearch/bleve/analysis" + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/index" +) + +// NewFromAnalyzedDocs places the analyzed document mutations into this segment +func NewFromAnalyzedDocs(results []*index.AnalysisResult) *Segment { + s := New() + + // ensure that _id field get fieldID 0 + s.getOrDefineField("_id", false) + + // walk each doc + for _, result := range results { + s.processDocument(result) + } + + // go back and sort the dictKeys + for _, dict := range s.dictKeys { + sort.Strings(dict) + } + + // professional debugging + // + // log.Printf("fields: %v\n", s.fields) + // log.Printf("fieldsInv: %v\n", s.fieldsInv) + // log.Printf("fieldsLoc: %v\n", s.fieldsLoc) + // log.Printf("dicts: %v\n", s.dicts) + // log.Printf("dict keys: %v\n", s.dictKeys) + // for i, posting := range s.postings { + // log.Printf("posting %d: %v\n", i, posting) + // } + // for i, freq := range s.freqs { + // log.Printf("freq %d: %v\n", i, freq) + // } + // for i, norm := range s.norms { + // log.Printf("norm %d: %v\n", i, norm) + // } + // for i, field := range s.locfields { + // log.Printf("field %d: %v\n", i, field) + // } + // for i, start := range s.locstarts { + // log.Printf("start %d: %v\n", i, start) + // } + // for i, end := range s.locends { + // log.Printf("end %d: %v\n", i, end) + // } + // for i, pos := range s.locpos { + // log.Printf("pos %d: %v\n", i, pos) + // } + // for i, apos := range s.locarraypos { + // log.Printf("apos %d: %v\n", i, apos) + // } + // log.Printf("stored: %v\n", s.stored) + // log.Printf("stored types: %v\n", s.storedTypes) + // log.Printf("stored pos: %v\n", s.storedPos) + + return s +} + +func (s *Segment) processDocument(result *index.AnalysisResult) { + // used to collate information across fields + docMap := map[uint16]analysis.TokenFrequencies{} + fieldLens := map[uint16]int{} + docNum := uint64(s.addDocument()) + + processField := func(field uint16, name string, l int, tf analysis.TokenFrequencies) { + fieldLens[field] += l + if existingFreqs, ok := docMap[field]; ok { + existingFreqs.MergeAll(name, tf) + } else { + docMap[field] = tf + } + } + + storeField := func(docNum uint64, field uint16, typ byte, val []byte, pos []uint64) { + s.stored[docNum][field] = append(s.stored[docNum][field], val) + s.storedTypes[docNum][field] = append(s.storedTypes[docNum][field], typ) + s.storedPos[docNum][field] = append(s.storedPos[docNum][field], pos) + } + + // walk each composite field + for _, field := range result.Document.CompositeFields { + fieldID := uint16(s.getOrDefineField(field.Name(), false)) + l, tf := field.Analyze() + processField(fieldID, field.Name(), l, tf) + } + + // walk each field + for i, field := range result.Document.Fields { + fieldID := uint16(s.getOrDefineField(field.Name(), field.Options().IncludeTermVectors())) + l := result.Length[i] + tf := result.Analyzed[i] + processField(fieldID, field.Name(), l, tf) + if field.Options().IsStored() { + storeField(docNum, fieldID, encodeFieldType(field), field.Value(), field.ArrayPositions()) + } + } + + // now that its been rolled up into docMap, walk that + for fieldID, tokenFrequencies := range docMap { + for term, tokenFreq := range tokenFrequencies { + fieldTermPostings := s.dicts[fieldID][term] + + // FIXME this if/else block has duplicate code that has resulted in + // bugs fixed/missed more than once, need to refactor + if fieldTermPostings == 0 { + // need to build new posting + bs := roaring.New() + bs.AddInt(int(docNum)) + + newPostingID := uint64(len(s.postings) + 1) + // add this new bitset to the postings slice + s.postings = append(s.postings, bs) + // add this to the details slice + s.freqs = append(s.freqs, []uint64{uint64(tokenFreq.Frequency())}) + s.norms = append(s.norms, []float32{float32(1.0 / math.Sqrt(float64(fieldLens[fieldID])))}) + // add to locations + var locfields []uint16 + var locstarts []uint64 + var locends []uint64 + var locpos []uint64 + var locarraypos [][]uint64 + for _, loc := range tokenFreq.Locations { + var locf = fieldID + if loc.Field != "" { + locf = uint16(s.getOrDefineField(loc.Field, false)) + } + locfields = append(locfields, locf) + locstarts = append(locstarts, uint64(loc.Start)) + locends = append(locends, uint64(loc.End)) + locpos = append(locpos, uint64(loc.Position)) + if len(loc.ArrayPositions) > 0 { + locarraypos = append(locarraypos, loc.ArrayPositions) + } else { + locarraypos = append(locarraypos, nil) + } + } + s.locfields = append(s.locfields, locfields) + s.locstarts = append(s.locstarts, locstarts) + s.locends = append(s.locends, locends) + s.locpos = append(s.locpos, locpos) + s.locarraypos = append(s.locarraypos, locarraypos) + // record it + s.dicts[fieldID][term] = newPostingID + // this term was new for this field, add it to dictKeys + s.dictKeys[fieldID] = append(s.dictKeys[fieldID], term) + } else { + // posting already started for this field/term + // the actual offset is - 1, because 0 is zero value + bs := s.postings[fieldTermPostings-1] + bs.AddInt(int(docNum)) + s.freqs[fieldTermPostings-1] = append(s.freqs[fieldTermPostings-1], uint64(tokenFreq.Frequency())) + s.norms[fieldTermPostings-1] = append(s.norms[fieldTermPostings-1], float32(1.0/math.Sqrt(float64(fieldLens[fieldID])))) + for _, loc := range tokenFreq.Locations { + var locf = fieldID + if loc.Field != "" { + locf = uint16(s.getOrDefineField(loc.Field, false)) + } + s.locfields[fieldTermPostings-1] = append(s.locfields[fieldTermPostings-1], locf) + s.locstarts[fieldTermPostings-1] = append(s.locstarts[fieldTermPostings-1], uint64(loc.Start)) + s.locends[fieldTermPostings-1] = append(s.locends[fieldTermPostings-1], uint64(loc.End)) + s.locpos[fieldTermPostings-1] = append(s.locpos[fieldTermPostings-1], uint64(loc.Position)) + if len(loc.ArrayPositions) > 0 { + s.locarraypos[fieldTermPostings-1] = append(s.locarraypos[fieldTermPostings-1], loc.ArrayPositions) + } else { + s.locarraypos[fieldTermPostings-1] = append(s.locarraypos[fieldTermPostings-1], nil) + } + } + } + } + } +} + +func (s *Segment) getOrDefineField(name string, hasLoc bool) int { + fieldID, ok := s.fields[name] + if !ok { + fieldID = uint16(len(s.fieldsInv) + 1) + s.fields[name] = fieldID + s.fieldsInv = append(s.fieldsInv, name) + s.fieldsLoc = append(s.fieldsLoc, hasLoc) + s.dicts = append(s.dicts, make(map[string]uint64)) + s.dictKeys = append(s.dictKeys, make([]string, 0)) + } + return int(fieldID - 1) +} + +func (s *Segment) addDocument() int { + docNum := len(s.stored) + s.stored = append(s.stored, map[uint16][][]byte{}) + s.storedTypes = append(s.storedTypes, map[uint16][]byte{}) + s.storedPos = append(s.storedPos, map[uint16][][]uint64{}) + return docNum +} + +func encodeFieldType(f document.Field) byte { + fieldType := byte('x') + switch f.(type) { + case *document.TextField: + fieldType = 't' + case *document.NumericField: + fieldType = 'n' + case *document.DateTimeField: + fieldType = 'd' + case *document.BooleanField: + fieldType = 'b' + case *document.GeoPointField: + fieldType = 'g' + case *document.CompositeField: + fieldType = 'c' + } + return fieldType +} diff --git a/index/scorch/segment/mem/dict.go b/index/scorch/segment/mem/dict.go new file mode 100644 index 00000000..a7d609f6 --- /dev/null +++ b/index/scorch/segment/mem/dict.go @@ -0,0 +1,87 @@ +package mem + +import ( + "sort" + "strings" + + "github.com/RoaringBitmap/roaring" + "github.com/blevesearch/bleve/index" + "github.com/blevesearch/bleve/index/scorch/segment" +) + +// Dictionary is the in-memory representation of the term dictionary +type Dictionary struct { + segment *Segment + field string + fieldID uint16 +} + +// PostingsList returns the postings list for the specified term +func (d *Dictionary) PostingsList(term string, except *roaring.Bitmap) segment.PostingsList { + return &PostingsList{ + dictionary: d, + term: term, + postingsID: d.segment.dicts[d.fieldID][term], + except: except, + } +} + +// Iterator returns an iterator for this dictionary +func (d *Dictionary) Iterator() segment.DictionaryIterator { + return &DictionaryIterator{ + d: d, + } +} + +// PrefixIterator returns an iterator which only visits terms having the +// the specified prefix +func (d *Dictionary) PrefixIterator(prefix string) segment.DictionaryIterator { + offset := sort.SearchStrings(d.segment.dictKeys[d.fieldID], prefix) + return &DictionaryIterator{ + d: d, + prefix: prefix, + offset: offset, + } +} + +// RangeIterator returns an iterator which only visits terms between the +// start and end terms. NOTE: bleve.index API specifies the end is inclusive. +func (d *Dictionary) RangeIterator(start, end string) segment.DictionaryIterator { + offset := sort.SearchStrings(d.segment.dictKeys[d.fieldID], start) + return &DictionaryIterator{ + d: d, + offset: offset, + end: end, + } +} + +// DictionaryIterator is an iterator for term dictionary +type DictionaryIterator struct { + d *Dictionary + prefix string + end string + offset int +} + +// Next returns the next entry in the dictionary +func (d *DictionaryIterator) Next() (*index.DictEntry, error) { + if d.offset > len(d.d.segment.dictKeys[d.d.fieldID])-1 { + return nil, nil + } + next := d.d.segment.dictKeys[d.d.fieldID][d.offset] + // check prefix + if d.prefix != "" && !strings.HasPrefix(next, d.prefix) { + return nil, nil + } + // check end (bleve.index API demands inclusive end) + if d.end != "" && next > d.end { + return nil, nil + } + + d.offset++ + postingID := d.d.segment.dicts[d.d.fieldID][next] + return &index.DictEntry{ + Term: next, + Count: d.d.segment.postings[postingID-1].GetCardinality(), + }, nil +} diff --git a/index/scorch/segment/mem/posting.go b/index/scorch/segment/mem/posting.go new file mode 100644 index 00000000..ac8b2fb7 --- /dev/null +++ b/index/scorch/segment/mem/posting.go @@ -0,0 +1,160 @@ +package mem + +import ( + "github.com/RoaringBitmap/roaring" + "github.com/blevesearch/bleve/index/scorch/segment" +) + +// PostingsList is an in-memory represenation of a postings list +type PostingsList struct { + dictionary *Dictionary + term string + postingsID uint64 + except *roaring.Bitmap +} + +// Count returns the number of items on this postings list +func (p *PostingsList) Count() uint64 { + var rv uint64 + if p.postingsID > 0 { + rv = p.dictionary.segment.postings[p.postingsID-1].GetCardinality() + if p.except != nil { + except := p.except.GetCardinality() + if except > rv { + // avoid underflow + except = rv + } + rv -= except + } + } + return rv +} + +// Iterator returns an iterator for this postings list +func (p *PostingsList) Iterator() segment.PostingsIterator { + rv := &PostingsIterator{ + postings: p, + } + if p.postingsID > 0 { + allbits := p.dictionary.segment.postings[p.postingsID-1] + rv.all = allbits.Iterator() + if p.except != nil { + allExcept := allbits.Clone() + allExcept.AndNot(p.except) + rv.actual = allExcept.Iterator() + } else { + rv.actual = allbits.Iterator() + } + } + + return rv +} + +// PostingsIterator provides a way to iterate through the postings list +type PostingsIterator struct { + postings *PostingsList + all roaring.IntIterable + offset int + locoffset int + actual roaring.IntIterable +} + +// Next returns the next posting on the postings list, or nil at the end +func (i *PostingsIterator) Next() segment.Posting { + if i.actual == nil || !i.actual.HasNext() { + return nil + } + n := i.actual.Next() + allN := i.all.Next() + + // n is the next actual hit (excluding some postings) + // allN is the next hit in the full postings + // if they don't match, adjust offsets to factor in item we're skipping over + // incr the all iterator, and check again + for allN != n { + i.locoffset += int(i.postings.dictionary.segment.freqs[i.postings.postingsID-1][i.offset]) + i.offset++ + allN = i.all.Next() + } + rv := &Posting{ + iterator: i, + docNum: uint64(n), + offset: i.offset, + locoffset: i.locoffset, + } + + i.locoffset += int(i.postings.dictionary.segment.freqs[i.postings.postingsID-1][i.offset]) + i.offset++ + return rv +} + +// Posting is a single entry in a postings list +type Posting struct { + iterator *PostingsIterator + docNum uint64 + offset int + locoffset int +} + +// Number returns the document number of this posting in this segment +func (p *Posting) Number() uint64 { + return p.docNum +} + +// Frequency returns the frequence of occurance of this term in this doc/field +func (p *Posting) Frequency() uint64 { + return p.iterator.postings.dictionary.segment.freqs[p.iterator.postings.postingsID-1][p.offset] +} + +// Norm returns the normalization factor for this posting +func (p *Posting) Norm() float64 { + return float64(p.iterator.postings.dictionary.segment.norms[p.iterator.postings.postingsID-1][p.offset]) +} + +// Locations returns the location information for each occurance +func (p *Posting) Locations() []segment.Location { + if !p.iterator.postings.dictionary.segment.fieldsLoc[p.iterator.postings.dictionary.fieldID] { + return nil + } + freq := int(p.Frequency()) + rv := make([]segment.Location, freq) + for i := 0; i < freq; i++ { + rv[i] = &Location{ + p: p, + offset: p.locoffset + i, + } + } + return rv +} + +// Location represents the location of a single occurance +type Location struct { + p *Posting + offset int +} + +// Field returns the name of the field (useful in composite fields to know +// which original field the value came from) +func (l *Location) Field() string { + return l.p.iterator.postings.dictionary.segment.fieldsInv[l.p.iterator.postings.dictionary.segment.locfields[l.p.iterator.postings.postingsID-1][l.offset]] +} + +// Start returns the start byte offset of this occurance +func (l *Location) Start() uint64 { + return l.p.iterator.postings.dictionary.segment.locstarts[l.p.iterator.postings.postingsID-1][l.offset] +} + +// End returns the end byte offset of this occurance +func (l *Location) End() uint64 { + return l.p.iterator.postings.dictionary.segment.locends[l.p.iterator.postings.postingsID-1][l.offset] +} + +// Pos returns the 1-based phrase position of this occurance +func (l *Location) Pos() uint64 { + return l.p.iterator.postings.dictionary.segment.locpos[l.p.iterator.postings.postingsID-1][l.offset] +} + +// ArrayPositions returns the array position vector associated with this occurance +func (l *Location) ArrayPositions() []uint64 { + return l.p.iterator.postings.dictionary.segment.locarraypos[l.p.iterator.postings.postingsID-1][l.offset] +} diff --git a/index/scorch/segment/mem/segment.go b/index/scorch/segment/mem/segment.go new file mode 100644 index 00000000..fdec1b2e --- /dev/null +++ b/index/scorch/segment/mem/segment.go @@ -0,0 +1,132 @@ +package mem + +import ( + "github.com/RoaringBitmap/roaring" + "github.com/blevesearch/bleve/index/scorch/segment" +) + +// KNOWN ISSUES +// - LIMITATION - we decided whether or not to store term vectors for a field +// at the segment level, based on the first definition of a +// field we see. in normal bleve usage this is fine, all +// instances of a field definition will be the same. however, +// advanced users may violate this and provide unique field +// definitions with each document. this segment does not +// support this usage. + +// TODO +// - need better testing of multiple docs, iterating freqs, locations and +// and verifying the correct results are returned +// - need tests for term dictionary iteration + +// Segment is an in memory implementation of scorch.Segment +type Segment struct { + + // fields name -> id+1 + fields map[string]uint16 + // fields id -> name + fieldsInv []string + // field id -> has location info + fieldsLoc []bool + + // term dictionary + // field id -> term -> posting id + 1 + dicts []map[string]uint64 + + // term dictionary keys + // field id -> []dictionary keys + dictKeys [][]string + + // postings list + // postings list id -> postings bitmap + postings []*roaring.Bitmap + + // term frequencies + // postings list id -> freqs (one for each hit in bitmap) + freqs [][]uint64 + + // field norms + // postings list id -> norms (one for each hit in bitmap) + norms [][]float32 + + // field/start/end/pos/locarraypos + // postings list id -> start/end/pos/locarraypos (one for each freq) + locfields [][]uint16 + locstarts [][]uint64 + locends [][]uint64 + locpos [][]uint64 + locarraypos [][][]uint64 + + // stored field values + // docNum -> field id -> slice of values (each value []byte) + stored []map[uint16][][]byte + + // stored field types + // docNum -> field id -> slice of types (each type byte) + storedTypes []map[uint16][]byte + + // stored field array positions + // docNum -> field id -> slice of array positions (each is []uint64) + storedPos []map[uint16][][]uint64 +} + +// New builds a new empty Segment +func New() *Segment { + return &Segment{ + fields: map[string]uint16{}, + } +} + +// Fields returns the field names used in this segment +func (s *Segment) Fields() []string { + return s.fieldsInv +} + +// VisitDocument invokes the DocFieldValueVistor for each stored field +// for the specified doc number +func (s *Segment) VisitDocument(num uint64, visitor segment.DocumentFieldValueVisitor) error { + // ensure document number exists + if int(num) > len(s.stored)-1 { + return nil + } + docFields := s.stored[int(num)] + for field, values := range docFields { + for i, value := range values { + keepGoing := visitor(s.fieldsInv[field], s.storedTypes[int(num)][field][i], value, s.storedPos[int(num)][field][i]) + if !keepGoing { + return nil + } + } + } + return nil +} + +// Dictionary returns the term dictionary for the specified field +func (s *Segment) Dictionary(field string) segment.TermDictionary { + return &Dictionary{ + segment: s, + field: field, + fieldID: uint16(s.getOrDefineField(field, false)), + } +} + +// Count returns the number of documents in this segment +// (this has no notion of deleted docs) +func (s *Segment) Count() uint64 { + return uint64(len(s.stored)) +} + +// DocNumbers returns a bitset corresponding to the doc numbers of all the +// provided _id strings +func (s *Segment) DocNumbers(ids []string) *roaring.Bitmap { + + idDictionary := s.dicts[s.getOrDefineField("_id", false)] + rv := roaring.New() + for _, id := range ids { + postingID := idDictionary[id] + if postingID > 0 { + rv.Or(s.postings[postingID-1]) + } + } + return rv +} diff --git a/index/scorch/segment/mem/segment_test.go b/index/scorch/segment/mem/segment_test.go new file mode 100644 index 00000000..bc840e14 --- /dev/null +++ b/index/scorch/segment/mem/segment_test.go @@ -0,0 +1,521 @@ +package mem + +import ( + "math" + "testing" + + "github.com/blevesearch/bleve/analysis" + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/index" +) + +func TestEmpty(t *testing.T) { + + emptySegment := New() + + if emptySegment.Count() != 0 { + t.Errorf("expected count 0, got %d", emptySegment.Count()) + } + + dict := emptySegment.Dictionary("name") + if dict == nil { + t.Fatal("got nil dict, expected non-nil") + } + + postingsList := dict.PostingsList("marty", nil) + if postingsList == nil { + t.Fatal("got nil postings list, expected non-nil") + } + + postingsItr := postingsList.Iterator() + if postingsItr == nil { + t.Fatal("got nil iterator, expected non-nil") + } + + count := 0 + nextPosting := postingsItr.Next() + for nextPosting != nil { + count++ + nextPosting = postingsItr.Next() + } + + if count != 0 { + t.Errorf("expected count to be 0, got %d", count) + } + + // now try and visit a document + emptySegment.VisitDocument(0, func(field string, typ byte, value []byte, pos []uint64) bool { + t.Errorf("document visitor called, not expected") + return true + }) +} + +func TestSingle(t *testing.T) { + + doc := &document.Document{ + ID: "a", + Fields: []document.Field{ + document.NewTextFieldCustom("_id", nil, []byte("a"), document.IndexField|document.StoreField, nil), + document.NewTextFieldCustom("name", nil, []byte("wow"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("desc", nil, []byte("some thing"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("tag", []uint64{0}, []byte("cold"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("tag", []uint64{1}, []byte("dark"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + }, + CompositeFields: []*document.CompositeField{ + document.NewCompositeField("_all", true, nil, nil), + }, + } + + // forge analyzed docs + results := []*index.AnalysisResult{ + &index.AnalysisResult{ + Document: doc, + Analyzed: []analysis.TokenFrequencies{ + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 1, + Position: 1, + Term: []byte("a"), + }, + }, nil, false), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 3, + Position: 1, + Term: []byte("wow"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("some"), + }, + &analysis.Token{ + Start: 5, + End: 10, + Position: 2, + Term: []byte("thing"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("cold"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("dark"), + }, + }, nil, true), + }, + Length: []int{ + 1, + 1, + 2, + 1, + 1, + }, + }, + } + + // fix up composite fields + for _, ar := range results { + for i, f := range ar.Document.Fields { + for _, cf := range ar.Document.CompositeFields { + cf.Compose(f.Name(), ar.Length[i], ar.Analyzed[i]) + } + } + } + + segment := NewFromAnalyzedDocs(results) + if segment == nil { + t.Fatalf("segment nil, not expected") + } + + if segment.Count() != 1 { + t.Errorf("expected count 1, got %d", segment.Count()) + } + + // check the _id field + dict := segment.Dictionary("_id") + if dict == nil { + t.Fatal("got nil dict, expected non-nil") + } + + postingsList := dict.PostingsList("a", nil) + if postingsList == nil { + t.Fatal("got nil postings list, expected non-nil") + } + + postingsItr := postingsList.Iterator() + if postingsItr == nil { + t.Fatal("got nil iterator, expected non-nil") + } + + count := 0 + nextPosting := postingsItr.Next() + for nextPosting != nil { + count++ + if nextPosting.Frequency() != 1 { + t.Errorf("expected frequency 1, got %d", nextPosting.Frequency()) + } + if nextPosting.Number() != 0 { + t.Errorf("expected doc number 0, got %d", nextPosting.Number()) + } + if nextPosting.Norm() != 1.0 { + t.Errorf("expected norm 1.0, got %f", nextPosting.Norm()) + } + + nextPosting = postingsItr.Next() + } + + if count != 1 { + t.Errorf("expected count to be 1, got %d", count) + } + + // check the name field + dict = segment.Dictionary("name") + if dict == nil { + t.Fatal("got nil dict, expected non-nil") + } + + postingsList = dict.PostingsList("wow", nil) + if postingsList == nil { + t.Fatal("got nil postings list, expected non-nil") + } + + postingsItr = postingsList.Iterator() + if postingsItr == nil { + t.Fatal("got nil iterator, expected non-nil") + } + + count = 0 + nextPosting = postingsItr.Next() + for nextPosting != nil { + count++ + if nextPosting.Frequency() != 1 { + t.Errorf("expected frequency 1, got %d", nextPosting.Frequency()) + } + if nextPosting.Number() != 0 { + t.Errorf("expected doc number 0, got %d", nextPosting.Number()) + } + if nextPosting.Norm() != 1.0 { + t.Errorf("expected norm 1.0, got %f", nextPosting.Norm()) + } + for _, loc := range nextPosting.Locations() { + if loc.Start() != 0 { + t.Errorf("expected loc start to be 0, got %d", loc.Start()) + } + if loc.End() != 3 { + t.Errorf("expected loc end to be 3, got %d", loc.End()) + } + if loc.Pos() != 1 { + t.Errorf("expected loc pos to be 1, got %d", loc.Pos()) + } + if loc.ArrayPositions() != nil { + t.Errorf("expect loc array pos to be nil, got %v", loc.ArrayPositions()) + } + } + + nextPosting = postingsItr.Next() + } + + if count != 1 { + t.Errorf("expected count to be 1, got %d", count) + } + + // check the _all field (composite) + dict = segment.Dictionary("_all") + if dict == nil { + t.Fatal("got nil dict, expected non-nil") + } + + postingsList = dict.PostingsList("wow", nil) + if postingsList == nil { + t.Fatal("got nil postings list, expected non-nil") + } + + postingsItr = postingsList.Iterator() + if postingsItr == nil { + t.Fatal("got nil iterator, expected non-nil") + } + + count = 0 + nextPosting = postingsItr.Next() + for nextPosting != nil { + count++ + if nextPosting.Frequency() != 1 { + t.Errorf("expected frequency 1, got %d", nextPosting.Frequency()) + } + if nextPosting.Number() != 0 { + t.Errorf("expected doc number 0, got %d", nextPosting.Number()) + } + expectedNorm := float32(1.0 / math.Sqrt(float64(6))) + if nextPosting.Norm() != float64(expectedNorm) { + t.Errorf("expected norm %f, got %f", expectedNorm, nextPosting.Norm()) + } + for _, loc := range nextPosting.Locations() { + if loc.Start() != 0 { + t.Errorf("expected loc start to be 0, got %d", loc.Start()) + } + if loc.End() != 3 { + t.Errorf("expected loc end to be 3, got %d", loc.End()) + } + if loc.Pos() != 1 { + t.Errorf("expected loc pos to be 1, got %d", loc.Pos()) + } + if loc.ArrayPositions() != nil { + t.Errorf("expect loc array pos to be nil, got %v", loc.ArrayPositions()) + } + } + + nextPosting = postingsItr.Next() + } + + if count != 1 { + t.Errorf("expected count to be 1, got %d", count) + } + + // now try and visit a document + var fieldValuesSeen int + segment.VisitDocument(0, func(field string, typ byte, value []byte, pos []uint64) bool { + fieldValuesSeen++ + return true + }) + if fieldValuesSeen != 5 { + t.Errorf("expected 5 field values, got %d", fieldValuesSeen) + } + +} + +func TestMultiple(t *testing.T) { + + doc := &document.Document{ + ID: "a", + Fields: []document.Field{ + document.NewTextFieldCustom("_id", nil, []byte("a"), document.IndexField|document.StoreField, nil), + document.NewTextFieldCustom("name", nil, []byte("wow"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("desc", nil, []byte("some thing"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("tag", []uint64{0}, []byte("cold"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("tag", []uint64{1}, []byte("dark"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + }, + CompositeFields: []*document.CompositeField{ + document.NewCompositeField("_all", true, nil, nil), + }, + } + + doc2 := &document.Document{ + ID: "b", + Fields: []document.Field{ + document.NewTextFieldCustom("_id", nil, []byte("b"), document.IndexField|document.StoreField, nil), + document.NewTextFieldCustom("name", nil, []byte("who"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("desc", nil, []byte("some thing"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("tag", []uint64{0}, []byte("cold"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + document.NewTextFieldCustom("tag", []uint64{1}, []byte("dark"), document.IndexField|document.StoreField|document.IncludeTermVectors, nil), + }, + CompositeFields: []*document.CompositeField{ + document.NewCompositeField("_all", true, nil, nil), + }, + } + + // forge analyzed docs + results := []*index.AnalysisResult{ + &index.AnalysisResult{ + Document: doc, + Analyzed: []analysis.TokenFrequencies{ + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 1, + Position: 1, + Term: []byte("a"), + }, + }, nil, false), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 3, + Position: 1, + Term: []byte("wow"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("some"), + }, + &analysis.Token{ + Start: 5, + End: 10, + Position: 2, + Term: []byte("thing"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("cold"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("dark"), + }, + }, nil, true), + }, + Length: []int{ + 1, + 1, + 2, + 1, + 1, + }, + }, + &index.AnalysisResult{ + Document: doc2, + Analyzed: []analysis.TokenFrequencies{ + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 1, + Position: 1, + Term: []byte("b"), + }, + }, nil, false), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 3, + Position: 1, + Term: []byte("who"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("some"), + }, + &analysis.Token{ + Start: 5, + End: 10, + Position: 2, + Term: []byte("thing"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("cold"), + }, + }, nil, true), + analysis.TokenFrequency(analysis.TokenStream{ + &analysis.Token{ + Start: 0, + End: 4, + Position: 1, + Term: []byte("dark"), + }, + }, nil, true), + }, + Length: []int{ + 1, + 1, + 2, + 1, + 1, + }, + }, + } + + // fix up composite fields + for _, ar := range results { + for i, f := range ar.Document.Fields { + for _, cf := range ar.Document.CompositeFields { + cf.Compose(f.Name(), ar.Length[i], ar.Analyzed[i]) + } + } + } + + segment := NewFromAnalyzedDocs(results) + if segment == nil { + t.Fatalf("segment nil, not expected") + } + + if segment.Count() != 2 { + t.Errorf("expected count 2, got %d", segment.Count()) + } + + // check the desc field + dict := segment.Dictionary("desc") + if dict == nil { + t.Fatal("got nil dict, expected non-nil") + } + + postingsList := dict.PostingsList("thing", nil) + if postingsList == nil { + t.Fatal("got nil postings list, expected non-nil") + } + + postingsItr := postingsList.Iterator() + if postingsItr == nil { + t.Fatal("got nil iterator, expected non-nil") + } + + count := 0 + nextPosting := postingsItr.Next() + for nextPosting != nil { + count++ + nextPosting = postingsItr.Next() + } + + if count != 2 { + t.Errorf("expected count to be 2, got %d", count) + } + + // get docnum of a + exclude := segment.DocNumbers([]string{"a"}) + + // look for term 'thing' excluding doc 'a' + postingsListExcluding := dict.PostingsList("thing", exclude) + if postingsList == nil { + t.Fatal("got nil postings list, expected non-nil") + } + + postingsItrExcluding := postingsListExcluding.Iterator() + if postingsItr == nil { + t.Fatal("got nil iterator, expected non-nil") + } + + count = 0 + nextPosting = postingsItrExcluding.Next() + for nextPosting != nil { + count++ + nextPosting = postingsItrExcluding.Next() + } + + if count != 1 { + t.Errorf("expected count to be 1, got %d", count) + } + +} diff --git a/index/scorch/segment/segment.go b/index/scorch/segment/segment.go new file mode 100644 index 00000000..5cd3d5d7 --- /dev/null +++ b/index/scorch/segment/segment.go @@ -0,0 +1,66 @@ +package segment + +import ( + "github.com/RoaringBitmap/roaring" + "github.com/blevesearch/bleve/index" +) + +// DocumentFieldValueVisitor defines a callback to be visited for each +// stored field value. The return value determines if the visitor +// should keep going. Returning true continues visiting, false stops. +type DocumentFieldValueVisitor func(field string, typ byte, value []byte, pos []uint64) bool + +type Segment interface { + Dictionary(field string) TermDictionary + + VisitDocument(num uint64, visitor DocumentFieldValueVisitor) error + Count() uint64 + + DocNumbers([]string) *roaring.Bitmap + + Fields() []string +} + +type TermDictionary interface { + PostingsList(term string, except *roaring.Bitmap) PostingsList + + Iterator() DictionaryIterator + PrefixIterator(prefix string) DictionaryIterator + RangeIterator(start, end string) DictionaryIterator +} + +type DictionaryIterator interface { + Next() (*index.DictEntry, error) +} + +type PostingsList interface { + Iterator() PostingsIterator + + Count() uint64 + + // NOTE deferred for future work + + // And(other PostingsList) PostingsList + // Or(other PostingsList) PostingsList +} + +type PostingsIterator interface { + Next() Posting +} + +type Posting interface { + Number() uint64 + + Frequency() uint64 + Norm() float64 + + Locations() []Location +} + +type Location interface { + Field() string + Start() uint64 + End() uint64 + Pos() uint64 + ArrayPositions() []uint64 +} diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go new file mode 100644 index 00000000..a33cc58e --- /dev/null +++ b/index/scorch/snapshot_index.go @@ -0,0 +1,300 @@ +package scorch + +import ( + "bytes" + "container/heap" + "encoding/binary" + "fmt" + + "github.com/RoaringBitmap/roaring" + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/index" + "github.com/blevesearch/bleve/index/scorch/segment" +) + +type IndexSnapshot struct { + segment []*SegmentSnapshot + offsets []uint64 + internal map[string][]byte +} + +func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string, makeItr func(i segment.TermDictionary) segment.DictionaryIterator) (*IndexSnapshotFieldDict, error) { + + results := make(chan segment.DictionaryIterator) + for index, segment := range i.segment { + go func(index int, segment *SegmentSnapshot) { + dict := segment.Dictionary(field) + results <- makeItr(dict) + }(index, segment) + } + + rv := &IndexSnapshotFieldDict{ + snapshot: i, + cursors: make([]*segmentDictCursor, 0, len(i.segment)), + } + for count := 0; count < len(i.segment); count++ { + di := <-results + next, err := di.Next() + if err != nil { + return nil, err + } + if next != nil { + rv.cursors = append(rv.cursors, &segmentDictCursor{ + itr: di, + curr: next, + }) + } + } + // prepare heap + heap.Init(rv) + + return rv, nil +} + +func (i *IndexSnapshot) FieldDict(field string) (index.FieldDict, error) { + return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator { + return i.Iterator() + }) +} + +func (i *IndexSnapshot) FieldDictRange(field string, startTerm []byte, + endTerm []byte) (index.FieldDict, error) { + return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator { + return i.RangeIterator(string(startTerm), string(endTerm)) + }) +} + +func (i *IndexSnapshot) FieldDictPrefix(field string, + termPrefix []byte) (index.FieldDict, error) { + return i.newIndexSnapshotFieldDict(field, func(i segment.TermDictionary) segment.DictionaryIterator { + return i.PrefixIterator(string(termPrefix)) + }) +} + +func (i *IndexSnapshot) DocIDReaderAll() (index.DocIDReader, error) { + + type segmentDocNumsResult struct { + index int + docs *roaring.Bitmap + } + + results := make(chan *segmentDocNumsResult) + for index, segment := range i.segment { + go func(index int, segment *SegmentSnapshot) { + docnums := roaring.NewBitmap() + docnums.AddRange(0, segment.Count()) + results <- &segmentDocNumsResult{ + index: index, + docs: docnums, + } + }(index, segment) + } + + rv := &IndexSnapshotDocIDReader{ + snapshot: i, + iterators: make([]roaring.IntIterable, len(i.segment)), + } + for count := 0; count < len(i.segment); count++ { + sdnr := <-results + rv.iterators[sdnr.index] = sdnr.docs.Iterator() + } + + return rv, nil +} + +func (i *IndexSnapshot) DocIDReaderOnly(ids []string) (index.DocIDReader, error) { + + type segmentDocNumsResult struct { + index int + docs *roaring.Bitmap + } + + results := make(chan *segmentDocNumsResult) + for index, segment := range i.segment { + go func(index int, segment *SegmentSnapshot) { + docnums := segment.DocNumbers(ids) + results <- &segmentDocNumsResult{ + index: index, + docs: docnums, + } + }(index, segment) + } + + rv := &IndexSnapshotDocIDReader{ + snapshot: i, + iterators: make([]roaring.IntIterable, len(i.segment)), + } + for count := 0; count < len(i.segment); count++ { + sdnr := <-results + rv.iterators[count] = sdnr.docs.Iterator() + } + + return rv, nil +} + +func (i *IndexSnapshot) Fields() ([]string, error) { + // FIXME not making this concurrent for now as it's not used in hot path + // of any searches at the moment (just a debug aid) + fieldsMap := map[string]struct{}{} + for _, segment := range i.segment { + fields := segment.Fields() + for _, field := range fields { + fieldsMap[field] = struct{}{} + } + } + rv := make([]string, 0, len(fieldsMap)) + for k := range fieldsMap { + rv = append(rv, k) + } + return rv, nil +} + +func (i *IndexSnapshot) GetInternal(key []byte) ([]byte, error) { + return i.internal[string(key)], nil +} + +func (i *IndexSnapshot) DocCount() (uint64, error) { + var rv uint64 + for _, segment := range i.segment { + rv += segment.Count() + } + return rv, nil +} + +func (i *IndexSnapshot) Document(id string) (*document.Document, error) { + // FIXME could be done more efficiently directly, but reusing for simplicity + tfr, err := i.TermFieldReader([]byte(id), "_id", false, false, false) + if err != nil { + return nil, err + } + defer tfr.Close() + + next, err := tfr.Next(nil) + if err != nil { + return nil, err + } + + docNum := docInternalToNumber(next.ID) + segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) + + rv := document.NewDocument(id) + i.segment[segmentIndex].VisitDocument(localDocNum, func(name string, typ byte, value []byte, pos []uint64) bool { + switch typ { + case 't': + rv.AddField(document.NewTextField(name, pos, value)) + case 'n': + rv.AddField(document.NewNumericFieldFromBytes(name, pos, value)) + case 'd': + rv.AddField(document.NewDateTimeFieldFromBytes(name, pos, value)) + case 'b': + rv.AddField(document.NewBooleanFieldFromBytes(name, pos, value)) + case 'g': + rv.AddField(document.NewGeoPointFieldFromBytes(name, pos, value)) + } + + return true + }) + + return rv, nil +} + +func (i *IndexSnapshot) segmentIndexAndLocalDocNumFromGlobal(docNum uint64) (int, uint64) { + var segmentIndex uint64 + for j := 1; j < len(i.offsets); j++ { + if docNum >= i.offsets[j] { + segmentIndex = uint64(j) + } else { + break + } + } + + localDocNum := docNum - i.offsets[segmentIndex] + return int(segmentIndex), localDocNum +} + +func (i *IndexSnapshot) ExternalID(id index.IndexInternalID) (string, error) { + docNum := docInternalToNumber(id) + segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) + + var found bool + var rv string + i.segment[segmentIndex].VisitDocument(localDocNum, func(field string, typ byte, value []byte, pos []uint64) bool { + if field == "_id" { + found = true + rv = string(value) + return false + } + return true + }) + + if found { + return rv, nil + } + return "", fmt.Errorf("document number %d not found", docNum) +} + +func (i *IndexSnapshot) InternalID(id string) (index.IndexInternalID, error) { + // FIXME could be done more efficiently directly, but reusing for simplicity + tfr, err := i.TermFieldReader([]byte(id), "_id", false, false, false) + if err != nil { + return nil, err + } + defer tfr.Close() + + next, err := tfr.Next(nil) + if err != nil { + return nil, err + } + + return next.ID, nil +} + +func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq, + includeNorm, includeTermVectors bool) (index.TermFieldReader, error) { + + type segmentPostingResult struct { + index int + postings segment.PostingsList + } + + results := make(chan *segmentPostingResult) + for index, segment := range i.segment { + go func(index int, segment *SegmentSnapshot) { + dict := segment.Dictionary(field) + pl := dict.PostingsList(string(term), nil) + results <- &segmentPostingResult{ + index: index, + postings: pl, + } + }(index, segment) + } + + rv := &IndexSnapshotTermFieldReader{ + snapshot: i, + postings: make([]segment.PostingsList, len(i.segment)), + iterators: make([]segment.PostingsIterator, len(i.segment)), + includeFreq: includeFreq, + includeNorm: includeNorm, + includeTermVectors: includeTermVectors, + } + for count := 0; count < len(i.segment); count++ { + spr := <-results + rv.postings[spr.index] = spr.postings + rv.iterators[spr.index] = spr.postings.Iterator() + } + + return rv, nil +} + +func docNumberToBytes(in uint64) []byte { + + buf := new(bytes.Buffer) + _ = binary.Write(buf, binary.BigEndian, in) + return buf.Bytes() +} + +func docInternalToNumber(in index.IndexInternalID) uint64 { + var res uint64 + binary.Read(bytes.NewReader(in), binary.BigEndian, &res) + return res +} diff --git a/index/scorch/snapshot_index_dict.go b/index/scorch/snapshot_index_dict.go new file mode 100644 index 00000000..443e401e --- /dev/null +++ b/index/scorch/snapshot_index_dict.go @@ -0,0 +1,78 @@ +package scorch + +import ( + "container/heap" + + "github.com/blevesearch/bleve/index" + "github.com/blevesearch/bleve/index/scorch/segment" +) + +type segmentDictCursor struct { + itr segment.DictionaryIterator + curr *index.DictEntry +} + +type IndexSnapshotFieldDict struct { + snapshot *IndexSnapshot + cursors []*segmentDictCursor +} + +func (i *IndexSnapshotFieldDict) Len() int { return len(i.cursors) } +func (i *IndexSnapshotFieldDict) Less(a, b int) bool { + return i.cursors[a].curr.Term < i.cursors[b].curr.Term +} +func (i *IndexSnapshotFieldDict) Swap(a, b int) { + i.cursors[a], i.cursors[b] = i.cursors[b], i.cursors[a] +} + +func (i *IndexSnapshotFieldDict) Push(x interface{}) { + i.cursors = append(i.cursors, x.(*segmentDictCursor)) +} + +func (i *IndexSnapshotFieldDict) Pop() interface{} { + n := len(i.cursors) + x := i.cursors[n-1] + i.cursors = i.cursors[0 : n-1] + return x +} + +func (i *IndexSnapshotFieldDict) Next() (*index.DictEntry, error) { + if len(i.cursors) <= 0 { + return nil, nil + } + rv := i.cursors[0].curr + next, err := i.cursors[0].itr.Next() + if err != nil { + return nil, err + } + if next == nil { + // at end of this cursor, remove it + heap.Pop(i) + } else { + // modified heap, fix it + i.cursors[0].curr = next + heap.Fix(i, 0) + } + // look for any other entries with the exact same term + for len(i.cursors) > 0 && i.cursors[0].curr.Term == rv.Term { + rv.Count += i.cursors[0].curr.Count + next, err := i.cursors[0].itr.Next() + if err != nil { + return nil, err + } + if next == nil { + // at end of this cursor, remove it + heap.Pop(i) + } else { + // modified heap, fix it + i.cursors[0].curr = next + heap.Fix(i, 0) + } + } + + return rv, nil +} + +func (i *IndexSnapshotFieldDict) Close() error { + return nil +} diff --git a/index/scorch/snapshot_index_doc.go b/index/scorch/snapshot_index_doc.go new file mode 100644 index 00000000..2b114487 --- /dev/null +++ b/index/scorch/snapshot_index_doc.go @@ -0,0 +1,53 @@ +package scorch + +import ( + "bytes" + + "github.com/RoaringBitmap/roaring" + "github.com/blevesearch/bleve/index" +) + +type IndexSnapshotDocIDReader struct { + snapshot *IndexSnapshot + iterators []roaring.IntIterable + segmentOffset int +} + +func (i *IndexSnapshotDocIDReader) Next() (index.IndexInternalID, error) { + for i.segmentOffset < len(i.iterators) { + if !i.iterators[i.segmentOffset].HasNext() { + i.segmentOffset++ + continue + } + next := i.iterators[i.segmentOffset].Next() + // make segment number into global number by adding offset + globalOffset := i.snapshot.offsets[i.segmentOffset] + return docNumberToBytes(uint64(next) + globalOffset), nil + } + return nil, nil +} + +func (i *IndexSnapshotDocIDReader) Advance(ID index.IndexInternalID) (index.IndexInternalID, error) { + // FIXME do something better + next, err := i.Next() + if err != nil { + return nil, err + } + if next == nil { + return nil, nil + } + for bytes.Compare(next, ID) < 0 { + next, err = i.Next() + if err != nil { + return nil, err + } + if next == nil { + break + } + } + return next, nil +} + +func (i *IndexSnapshotDocIDReader) Close() error { + return nil +} diff --git a/index/scorch/snapshot_index_tfr.go b/index/scorch/snapshot_index_tfr.go new file mode 100644 index 00000000..44172f3d --- /dev/null +++ b/index/scorch/snapshot_index_tfr.go @@ -0,0 +1,91 @@ +package scorch + +import ( + "bytes" + + "github.com/blevesearch/bleve/index" + "github.com/blevesearch/bleve/index/scorch/segment" +) + +type IndexSnapshotTermFieldReader struct { + snapshot *IndexSnapshot + postings []segment.PostingsList + iterators []segment.PostingsIterator + segmentOffset int + includeFreq bool + includeNorm bool + includeTermVectors bool +} + +func (i *IndexSnapshotTermFieldReader) Next(preAlloced *index.TermFieldDoc) (*index.TermFieldDoc, error) { + rv := preAlloced + if rv == nil { + rv = &index.TermFieldDoc{} + } + // find the next hit + for i.segmentOffset < len(i.postings) { + next := i.iterators[i.segmentOffset].Next() + if next != nil { + // make segment number into global number by adding offset + globalOffset := i.snapshot.offsets[i.segmentOffset] + nnum := next.Number() + rv.ID = docNumberToBytes(nnum + globalOffset) + if i.includeFreq { + rv.Freq = next.Frequency() + } + if i.includeNorm { + rv.Norm = next.Norm() + } + if i.includeTermVectors { + locs := next.Locations() + rv.Vectors = make([]*index.TermFieldVector, len(locs)) + for i, loc := range locs { + rv.Vectors[i] = &index.TermFieldVector{ + Start: loc.Start(), + End: loc.End(), + Pos: loc.Pos(), + ArrayPositions: loc.ArrayPositions(), + Field: loc.Field(), + } + } + } + + return rv, nil + } + i.segmentOffset++ + } + return nil, nil +} + +func (i *IndexSnapshotTermFieldReader) Advance(ID index.IndexInternalID, preAlloced *index.TermFieldDoc) (*index.TermFieldDoc, error) { + // FIXME do something better + next, err := i.Next(preAlloced) + if err != nil { + return nil, err + } + if next == nil { + return nil, nil + } + for bytes.Compare(next.ID, ID) < 0 { + next, err = i.Next(preAlloced) + if err != nil { + return nil, err + } + if next == nil { + break + } + } + return next, nil +} + +func (i *IndexSnapshotTermFieldReader) Count() uint64 { + var rv uint64 + for _, posting := range i.postings { + rv += posting.Count() + } + return rv +} + +func (i *IndexSnapshotTermFieldReader) Close() error { + return nil +} diff --git a/index/scorch/snapshot_segment.go b/index/scorch/snapshot_segment.go new file mode 100644 index 00000000..b32725e9 --- /dev/null +++ b/index/scorch/snapshot_segment.go @@ -0,0 +1,64 @@ +package scorch + +import ( + "github.com/RoaringBitmap/roaring" + "github.com/blevesearch/bleve/index/scorch/segment" +) + +type SegmentDictionarySnapshot struct { + s *SegmentSnapshot + d segment.TermDictionary +} + +func (s *SegmentDictionarySnapshot) PostingsList(term string, except *roaring.Bitmap) segment.PostingsList { + return s.d.PostingsList(term, s.s.deleted) +} + +func (s *SegmentDictionarySnapshot) Iterator() segment.DictionaryIterator { + return s.d.Iterator() +} + +func (s *SegmentDictionarySnapshot) PrefixIterator(prefix string) segment.DictionaryIterator { + return s.d.PrefixIterator(prefix) +} + +func (s *SegmentDictionarySnapshot) RangeIterator(start, end string) segment.DictionaryIterator { + return s.d.RangeIterator(start, end) +} + +type SegmentSnapshot struct { + id uint64 + segment segment.Segment + deleted *roaring.Bitmap +} + +func (s *SegmentSnapshot) VisitDocument(num uint64, visitor segment.DocumentFieldValueVisitor) error { + return s.segment.VisitDocument(num, visitor) +} + +func (s *SegmentSnapshot) Count() uint64 { + rv := s.segment.Count() + if s.deleted != nil { + rv -= s.deleted.GetCardinality() + } + return rv +} + +func (s *SegmentSnapshot) Dictionary(field string) segment.TermDictionary { + return &SegmentDictionarySnapshot{ + s: s, + d: s.segment.Dictionary(field), + } +} + +func (s *SegmentSnapshot) DocNumbers(docIDs []string) *roaring.Bitmap { + rv := s.segment.DocNumbers(docIDs) + if s.deleted != nil { + rv.AndNot(s.deleted) + } + return rv +} + +func (s *SegmentSnapshot) Fields() []string { + return s.segment.Fields() +} diff --git a/index/scorch/stats.go b/index/scorch/stats.go new file mode 100644 index 00000000..f49c8178 --- /dev/null +++ b/index/scorch/stats.go @@ -0,0 +1,33 @@ +package scorch + +import ( + "encoding/json" + "sync/atomic" +) + +// Stats tracks statistics about the index +type Stats struct { + analysisTime, indexTime uint64 +} + +// FIXME wire up these other stats again +func (s *Stats) statsMap() map[string]interface{} { + m := map[string]interface{}{} + // m["updates"] = atomic.LoadUint64(&i.updates) + // m["deletes"] = atomic.LoadUint64(&i.deletes) + // m["batches"] = atomic.LoadUint64(&i.batches) + // m["errors"] = atomic.LoadUint64(&i.errors) + m["analysis_time"] = atomic.LoadUint64(&s.analysisTime) + m["index_time"] = atomic.LoadUint64(&s.indexTime) + // m["term_searchers_started"] = atomic.LoadUint64(&i.termSearchersStarted) + // m["term_searchers_finished"] = atomic.LoadUint64(&i.termSearchersFinished) + // m["num_plain_text_bytes_indexed"] = atomic.LoadUint64(&i.numPlainTextBytesIndexed) + + return m +} + +// MarshalJSON implements json.Marshaler +func (s *Stats) MarshalJSON() ([]byte, error) { + m := s.statsMap() + return json.Marshal(m) +} diff --git a/mapping/index.go b/mapping/index.go index 737f26ff..cefa5980 100644 --- a/mapping/index.go +++ b/mapping/index.go @@ -339,7 +339,7 @@ func (im *IndexMappingImpl) newWalkContext(doc *document.Document, dm *DocumentM doc: doc, im: im, dm: dm, - excludedFromAll: []string{}, + excludedFromAll: []string{"_id"}, } }