0
0
Fork 0
bleve/index/scorch/scorch.go

328 lines
7.6 KiB
Go
Raw Normal View History

// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2017-09-29 18:42:37 +02:00
package scorch
import (
"encoding/json"
"fmt"
"os"
2017-09-29 18:42:37 +02:00
"sync"
"sync/atomic"
"time"
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/analysis"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/blevesearch/bleve/index/scorch/segment/mem"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/registry"
"github.com/boltdb/bolt"
2017-09-29 18:42:37 +02:00
)
const Name = "scorch"
const Version uint8 = 1
type Scorch struct {
2017-12-11 15:07:01 +01:00
readOnly bool
version uint8
config map[string]interface{}
analysisQueue *index.AnalysisQueue
stats *Stats
nextSegmentID uint64
nextSnapshotEpoch uint64
path string
unsafeBatch bool
2017-09-29 18:42:37 +02:00
rootLock sync.RWMutex
root *IndexSnapshot
closeCh chan struct{}
introductions chan *segmentIntroduction
merges chan *segmentMerge
introducerNotifier chan notificationChan
persisterNotifier chan notificationChan
rootBolt *bolt.DB
asyncTasks sync.WaitGroup
2017-09-29 18:42:37 +02:00
}
func NewScorch(storeName string, config map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
2017-09-29 18:42:37 +02:00
rv := &Scorch{
version: Version,
config: config,
analysisQueue: analysisQueue,
stats: &Stats{},
root: &IndexSnapshot{},
nextSnapshotEpoch: 1,
2017-09-29 18:42:37 +02:00
}
2017-12-11 15:07:01 +01:00
ro, ok := config["read_only"].(bool)
if ok {
rv.readOnly = ro
}
2017-09-29 18:42:37 +02:00
return rv, nil
}
func (s *Scorch) Open() error {
var ok bool
s.path, ok = s.config["path"].(string)
if !ok {
return fmt.Errorf("must specify path")
}
if s.path == "" {
2017-12-11 15:07:01 +01:00
s.unsafeBatch = true
}
2017-12-11 15:07:01 +01:00
var rootBoltOpt *bolt.Options
if s.readOnly {
rootBoltOpt = &bolt.Options{
ReadOnly: true,
}
} else {
if s.path != "" {
err := os.MkdirAll(s.path, 0700)
if err != nil {
return err
}
}
}
rootBoltPath := s.path + string(os.PathSeparator) + "root.bolt"
2017-12-11 15:07:01 +01:00
var err error
if s.path != "" {
s.rootBolt, err = bolt.Open(rootBoltPath, 0600, rootBoltOpt)
if err != nil {
return err
}
2017-12-11 15:07:01 +01:00
// now see if there is any existing state to load
err = s.loadFromBolt()
if err != nil {
return err
}
}
2017-09-29 18:42:37 +02:00
s.closeCh = make(chan struct{})
s.introductions = make(chan *segmentIntroduction)
s.merges = make(chan *segmentMerge)
s.introducerNotifier = make(chan notificationChan)
s.persisterNotifier = make(chan notificationChan)
s.asyncTasks.Add(1)
2017-09-29 18:42:37 +02:00
go s.mainLoop()
2017-12-11 15:07:01 +01:00
if !s.readOnly && s.path != "" {
s.asyncTasks.Add(1)
go s.persisterLoop()
s.asyncTasks.Add(1)
go s.mergerLoop()
2017-12-11 15:07:01 +01:00
}
2017-09-29 18:42:37 +02:00
return nil
}
func (s *Scorch) Close() (err error) {
// signal to async tasks we want to close
2017-09-29 18:42:37 +02:00
close(s.closeCh)
// wait for them to close
s.asyncTasks.Wait()
// now close the root bolt
2017-12-11 15:07:01 +01:00
if s.rootBolt != nil {
err = s.rootBolt.Close()
s.rootLock.Lock()
for _, segment := range s.root.segment {
cerr := segment.Close()
if err == nil {
err = cerr
}
}
}
return
2017-09-29 18:42:37 +02:00
}
func (s *Scorch) Update(doc *document.Document) error {
b := index.NewBatch()
b.Update(doc)
return s.Batch(b)
}
func (s *Scorch) Delete(id string) error {
b := index.NewBatch()
b.Delete(id)
return s.Batch(b)
}
// Batch applices a batch of changes to the index atomically
func (s *Scorch) Batch(batch *index.Batch) error {
analysisStart := time.Now()
resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
var numUpdates uint64
var numPlainTextBytes uint64
var ids []string
for docID, doc := range batch.IndexOps {
if doc != nil {
// insert _id field
doc.AddField(document.NewTextFieldCustom("_id", nil, []byte(doc.ID), document.IndexField|document.StoreField, nil))
numUpdates++
numPlainTextBytes += doc.NumPlainTextBytes()
}
ids = append(ids, docID)
}
// FIXME could sort ids list concurrent with analysis?
go func() {
for _, doc := range batch.IndexOps {
if doc != nil {
aw := index.NewAnalysisWork(s, doc, resultChan)
// put the work on the queue
s.analysisQueue.Queue(aw)
}
}
}()
// wait for analysis result
analysisResults := make([]*index.AnalysisResult, int(numUpdates))
// newRowsMap := make(map[string][]index.IndexRow)
var itemsDeQueued uint64
for itemsDeQueued < numUpdates {
result := <-resultChan
//newRowsMap[result.DocID] = result.Rows
analysisResults[itemsDeQueued] = result
itemsDeQueued++
}
close(resultChan)
atomic.AddUint64(&s.stats.analysisTime, uint64(time.Since(analysisStart)))
var newSegment segment.Segment
if len(analysisResults) > 0 {
newSegment = mem.NewFromAnalyzedDocs(analysisResults)
} else {
newSegment = mem.New()
}
2017-11-29 19:34:15 +01:00
return s.prepareSegment(newSegment, ids, batch.InternalOps)
2017-09-29 18:42:37 +02:00
}
func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string,
internalOps map[string][]byte) error {
// new introduction
introduction := &segmentIntroduction{
id: atomic.AddUint64(&s.nextSegmentID, 1),
data: newSegment,
ids: ids,
obsoletes: make(map[uint64]*roaring.Bitmap),
internal: internalOps,
applied: make(chan error),
}
if !s.unsafeBatch {
introduction.persisted = make(chan error)
2017-09-29 18:42:37 +02:00
}
// get read lock, to optimistically prepare obsoleted info
s.rootLock.RLock()
for i := range s.root.segment {
delta, err := s.root.segment[i].segment.DocNumbers(ids)
if err != nil {
return err
}
2017-09-29 18:42:37 +02:00
introduction.obsoletes[s.root.segment[i].id] = delta
}
s.rootLock.RUnlock()
s.introductions <- introduction
// block until this segment is applied
err := <-introduction.applied
if err != nil {
return err
}
2017-09-29 18:42:37 +02:00
if !s.unsafeBatch {
err = <-introduction.persisted
}
return err
2017-09-29 18:42:37 +02:00
}
func (s *Scorch) SetInternal(key, val []byte) error {
b := index.NewBatch()
b.SetInternal(key, val)
return s.Batch(b)
}
func (s *Scorch) DeleteInternal(key []byte) error {
b := index.NewBatch()
b.DeleteInternal(key)
return s.Batch(b)
}
// Reader returns a low-level accessor on the index data. Close it to
// release associated resources.
func (s *Scorch) Reader() (index.IndexReader, error) {
s.rootLock.RLock()
defer s.rootLock.RUnlock()
return &Reader{
root: s.root,
}, nil
}
func (s *Scorch) Stats() json.Marshaler {
return s.stats
}
func (s *Scorch) StatsMap() map[string]interface{} {
return s.stats.statsMap()
}
func (s *Scorch) Analyze(d *document.Document) *index.AnalysisResult {
rv := &index.AnalysisResult{
Document: d,
Analyzed: make([]analysis.TokenFrequencies, len(d.Fields)+len(d.CompositeFields)),
Length: make([]int, len(d.Fields)+len(d.CompositeFields)),
}
for i, field := range d.Fields {
if field.Options().IsIndexed() {
fieldLength, tokenFreqs := field.Analyze()
rv.Analyzed[i] = tokenFreqs
rv.Length[i] = fieldLength
if len(d.CompositeFields) > 0 {
// see if any of the composite fields need this
for _, compositeField := range d.CompositeFields {
compositeField.Compose(field.Name(), fieldLength, tokenFreqs)
}
}
}
}
return rv
}
func (s *Scorch) Advanced() (store.KVStore, error) {
return nil, nil
}
func init() {
registry.RegisterIndexType(Name, NewScorch)
}