2015-08-25 20:52:42 +02:00
|
|
|
// Copyright (c) 2015 Couchbase, Inc.
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
|
|
|
|
// except in compliance with the License. You may obtain a copy of the License at
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the
|
|
|
|
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
|
|
|
// either express or implied. See the License for the specific language governing permissions
|
|
|
|
// and limitations under the License.
|
|
|
|
|
|
|
|
package firestorm
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/blevesearch/bleve/document"
|
|
|
|
"github.com/blevesearch/bleve/index"
|
|
|
|
"github.com/blevesearch/bleve/index/store"
|
|
|
|
"github.com/blevesearch/bleve/registry"
|
|
|
|
)
|
|
|
|
|
|
|
|
const Name = "firestorm"
|
|
|
|
|
|
|
|
var UnsafeBatchUseDetected = fmt.Errorf("bleve.Batch is NOT thread-safe, modification after execution detected")
|
|
|
|
|
|
|
|
type Firestorm struct {
|
2015-10-28 16:26:01 +01:00
|
|
|
storeName string
|
|
|
|
storeConfig map[string]interface{}
|
2015-08-25 20:52:42 +02:00
|
|
|
store store.KVStore
|
|
|
|
compensator *Compensator
|
|
|
|
analysisQueue *index.AnalysisQueue
|
|
|
|
fieldCache *index.FieldCache
|
|
|
|
highDocNumber uint64
|
|
|
|
docCount *uint64
|
|
|
|
garbageCollector *GarbageCollector
|
|
|
|
lookuper *Lookuper
|
|
|
|
dictUpdater *DictUpdater
|
|
|
|
stats *indexStat
|
|
|
|
}
|
|
|
|
|
2015-10-28 16:26:01 +01:00
|
|
|
func NewFirestorm(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
|
2015-08-25 20:52:42 +02:00
|
|
|
initialCount := uint64(0)
|
|
|
|
rv := Firestorm{
|
2015-10-28 16:26:01 +01:00
|
|
|
storeName: storeName,
|
|
|
|
storeConfig: storeConfig,
|
2015-08-25 20:52:42 +02:00
|
|
|
compensator: NewCompensator(),
|
|
|
|
analysisQueue: analysisQueue,
|
|
|
|
fieldCache: index.NewFieldCache(),
|
|
|
|
docCount: &initialCount,
|
|
|
|
highDocNumber: 0,
|
|
|
|
stats: &indexStat{},
|
|
|
|
}
|
2015-12-01 18:29:56 +01:00
|
|
|
rv.stats.f = &rv
|
2015-08-25 20:52:42 +02:00
|
|
|
rv.garbageCollector = NewGarbageCollector(&rv)
|
|
|
|
rv.lookuper = NewLookuper(&rv)
|
|
|
|
rv.dictUpdater = NewDictUpdater(&rv)
|
2015-10-28 16:26:01 +01:00
|
|
|
return &rv, nil
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (f *Firestorm) Open() (err error) {
|
|
|
|
|
2015-10-28 16:26:01 +01:00
|
|
|
// open the kv store
|
|
|
|
storeConstructor := registry.KVStoreConstructorByName(f.storeName)
|
|
|
|
if storeConstructor == nil {
|
|
|
|
err = index.ErrorUnknownStorageType
|
|
|
|
return
|
|
|
|
}
|
2015-08-25 20:52:42 +02:00
|
|
|
|
2015-10-28 16:26:01 +01:00
|
|
|
// now open the store
|
|
|
|
f.store, err = storeConstructor(&mergeOperator, f.storeConfig)
|
2015-08-25 20:52:42 +02:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-10-28 16:26:01 +01:00
|
|
|
// start a reader
|
|
|
|
var kvreader store.KVReader
|
|
|
|
kvreader, err = f.store.Reader()
|
2015-08-25 20:52:42 +02:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// assert correct version, and find out if this is new index
|
|
|
|
var newIndex bool
|
2015-10-28 16:26:01 +01:00
|
|
|
newIndex, err = f.checkVersion(kvreader)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if !newIndex {
|
|
|
|
// process existing index before opening
|
2015-11-24 20:32:33 +01:00
|
|
|
err = f.warmup(kvreader)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
2015-10-28 16:26:01 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
err = kvreader.Close()
|
2015-08-25 20:52:42 +02:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if newIndex {
|
|
|
|
// prepare a new index
|
2015-11-24 20:32:33 +01:00
|
|
|
err = f.bootstrap()
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// start the garbage collector
|
|
|
|
f.garbageCollector.Start()
|
|
|
|
|
|
|
|
// start the lookuper
|
|
|
|
f.lookuper.Start()
|
|
|
|
|
|
|
|
// start the dict updater
|
|
|
|
f.dictUpdater.Start()
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *Firestorm) Close() error {
|
|
|
|
f.garbageCollector.Stop()
|
|
|
|
f.lookuper.Stop()
|
|
|
|
f.dictUpdater.Stop()
|
|
|
|
return f.store.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *Firestorm) DocCount() (uint64, error) {
|
|
|
|
count := atomic.LoadUint64(f.docCount)
|
|
|
|
return count, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *Firestorm) Update(doc *document.Document) (err error) {
|
|
|
|
|
|
|
|
// assign this document a number
|
|
|
|
doc.Number = atomic.AddUint64(&f.highDocNumber, 1)
|
|
|
|
|
|
|
|
// do analysis before acquiring write lock
|
|
|
|
analysisStart := time.Now()
|
|
|
|
resultChan := make(chan *index.AnalysisResult)
|
|
|
|
aw := index.NewAnalysisWork(f, doc, resultChan)
|
2015-12-30 07:14:45 +01:00
|
|
|
|
2015-08-25 20:52:42 +02:00
|
|
|
// put the work on the queue
|
2016-01-07 09:43:27 +01:00
|
|
|
f.analysisQueue.Queue(aw)
|
2015-08-25 20:52:42 +02:00
|
|
|
|
|
|
|
// wait for the result
|
|
|
|
result := <-resultChan
|
|
|
|
close(resultChan)
|
|
|
|
atomic.AddUint64(&f.stats.analysisTime, uint64(time.Since(analysisStart)))
|
|
|
|
|
|
|
|
// start a writer for this update
|
|
|
|
indexStart := time.Now()
|
|
|
|
var kvwriter store.KVWriter
|
|
|
|
kvwriter, err = f.store.Writer()
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
if cerr := kvwriter.Close(); err == nil && cerr != nil {
|
|
|
|
err = cerr
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
var dictionaryDeltas map[string]int64
|
2016-01-02 07:25:34 +01:00
|
|
|
dictionaryDeltas, err = f.batchRows(kvwriter, [][]index.IndexRow{result.Rows}, nil)
|
2015-08-25 20:52:42 +02:00
|
|
|
if err != nil {
|
|
|
|
_ = kvwriter.Close()
|
|
|
|
atomic.AddUint64(&f.stats.errors, 1)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
f.compensator.Mutate([]byte(doc.ID), doc.Number)
|
2015-12-31 05:43:31 +01:00
|
|
|
f.lookuper.NotifyBatch([]*InFlightItem{&InFlightItem{[]byte(doc.ID), doc.Number}})
|
2015-08-25 20:52:42 +02:00
|
|
|
f.dictUpdater.NotifyBatch(dictionaryDeltas)
|
|
|
|
|
|
|
|
atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart)))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *Firestorm) Delete(id string) error {
|
|
|
|
indexStart := time.Now()
|
|
|
|
f.compensator.Mutate([]byte(id), 0)
|
2015-12-31 05:43:31 +01:00
|
|
|
f.lookuper.NotifyBatch([]*InFlightItem{&InFlightItem{[]byte(id), 0}})
|
2015-08-25 20:52:42 +02:00
|
|
|
atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart)))
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-01-02 07:25:34 +01:00
|
|
|
func (f *Firestorm) batchRows(writer store.KVWriter, rowsOfRows [][]index.IndexRow, deleteKeys [][]byte) (map[string]int64, error) {
|
2015-08-25 20:52:42 +02:00
|
|
|
|
2016-01-14 01:48:23 +01:00
|
|
|
dictionaryDeltas := make(map[string]int64)
|
|
|
|
|
|
|
|
// count up bytes needed for buffering.
|
|
|
|
addNum := 0
|
|
|
|
addKeyBytes := 0
|
|
|
|
addValBytes := 0
|
|
|
|
|
|
|
|
deleteNum := 0
|
|
|
|
deleteKeyBytes := 0
|
2015-08-25 20:52:42 +02:00
|
|
|
|
2015-12-30 20:18:16 +01:00
|
|
|
var kbuf []byte
|
|
|
|
|
|
|
|
prepareBuf := func(buf []byte, sizeNeeded int) []byte {
|
|
|
|
if cap(buf) < sizeNeeded {
|
|
|
|
return make([]byte, sizeNeeded, sizeNeeded+128)
|
|
|
|
}
|
|
|
|
return buf[0:sizeNeeded]
|
|
|
|
}
|
|
|
|
|
2016-01-02 07:25:34 +01:00
|
|
|
for _, rows := range rowsOfRows {
|
|
|
|
for _, row := range rows {
|
|
|
|
tfr, ok := row.(*TermFreqRow)
|
|
|
|
if ok {
|
|
|
|
if tfr.Field() != 0 {
|
|
|
|
kbuf = prepareBuf(kbuf, tfr.DictionaryRowKeySize())
|
|
|
|
klen, err := tfr.DictionaryRowKeyTo(kbuf)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
dictionaryDeltas[string(kbuf[0:klen])] += 1
|
2015-12-30 20:18:16 +01:00
|
|
|
}
|
2016-01-02 07:25:34 +01:00
|
|
|
}
|
2015-12-30 20:18:16 +01:00
|
|
|
|
2016-01-14 01:48:23 +01:00
|
|
|
addKeyBytes += row.KeySize()
|
|
|
|
addValBytes += row.ValueSize()
|
|
|
|
}
|
|
|
|
addNum += len(rows)
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, dk := range deleteKeys {
|
|
|
|
deleteKeyBytes += len(dk)
|
|
|
|
}
|
|
|
|
deleteNum += len(deleteKeys)
|
|
|
|
|
|
|
|
// prepare batch
|
|
|
|
totBytes := addKeyBytes + addValBytes + deleteKeyBytes
|
|
|
|
|
|
|
|
buf, wb, err := writer.NewBatchEx(store.KVBatchOptions{
|
|
|
|
TotalBytes: totBytes,
|
|
|
|
NumSets: addNum,
|
|
|
|
NumDeletes: deleteNum,
|
|
|
|
NumMerges: 0,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
_ = wb.Close()
|
|
|
|
}()
|
|
|
|
|
|
|
|
for _, rows := range rowsOfRows {
|
|
|
|
for _, row := range rows {
|
|
|
|
klen, err := row.KeyTo(buf)
|
2016-01-02 07:25:34 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
2015-12-30 20:18:16 +01:00
|
|
|
|
2016-01-14 01:48:23 +01:00
|
|
|
vlen, err := row.ValueTo(buf[klen:])
|
2016-01-02 07:25:34 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2015-12-30 20:18:16 +01:00
|
|
|
|
2016-01-14 01:48:23 +01:00
|
|
|
wb.Set(buf[0:klen], buf[klen:klen+vlen])
|
|
|
|
|
|
|
|
buf = buf[klen+vlen:]
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, dk := range deleteKeys {
|
2016-01-14 01:48:23 +01:00
|
|
|
dklen := copy(buf, dk)
|
|
|
|
wb.Delete(buf[0:dklen])
|
|
|
|
buf = buf[dklen:]
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// write out the batch
|
2016-01-14 01:48:23 +01:00
|
|
|
err = writer.ExecuteBatch(wb)
|
2015-08-25 20:52:42 +02:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return dictionaryDeltas, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *Firestorm) Batch(batch *index.Batch) (err error) {
|
|
|
|
|
|
|
|
// acquire enough doc numbers for all updates in the batch
|
|
|
|
// FIXME we actually waste doc numbers because deletes are in the
|
|
|
|
// same map and we don't need numbers for them
|
|
|
|
lastDocNumber := atomic.AddUint64(&f.highDocNumber, uint64(len(batch.IndexOps)))
|
|
|
|
firstDocNumber := lastDocNumber - uint64(len(batch.IndexOps)) + 1
|
|
|
|
|
|
|
|
analysisStart := time.Now()
|
|
|
|
resultChan := make(chan *index.AnalysisResult)
|
|
|
|
|
2015-12-31 04:23:56 +01:00
|
|
|
var docsUpdated uint64
|
2015-08-25 20:52:42 +02:00
|
|
|
var docsDeleted uint64
|
|
|
|
for _, doc := range batch.IndexOps {
|
|
|
|
if doc != nil {
|
|
|
|
doc.Number = firstDocNumber // actually assign doc numbers here
|
|
|
|
firstDocNumber++
|
2015-12-31 04:23:56 +01:00
|
|
|
docsUpdated++
|
2015-08-25 20:52:42 +02:00
|
|
|
} else {
|
|
|
|
docsDeleted++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var detectedUnsafeMutex sync.RWMutex
|
|
|
|
detectedUnsafe := false
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
sofar := uint64(0)
|
|
|
|
for _, doc := range batch.IndexOps {
|
|
|
|
if doc != nil {
|
|
|
|
sofar++
|
2015-12-31 04:23:56 +01:00
|
|
|
if sofar > docsUpdated {
|
2015-08-25 20:52:42 +02:00
|
|
|
detectedUnsafeMutex.Lock()
|
|
|
|
detectedUnsafe = true
|
|
|
|
detectedUnsafeMutex.Unlock()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
aw := index.NewAnalysisWork(f, doc, resultChan)
|
|
|
|
// put the work on the queue
|
|
|
|
f.analysisQueue.Queue(aw)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2016-01-02 07:25:34 +01:00
|
|
|
// extra 1 capacity for internal updates.
|
|
|
|
collectRows := make([][]index.IndexRow, 0, docsUpdated+1)
|
|
|
|
|
2015-08-25 20:52:42 +02:00
|
|
|
// wait for the result
|
|
|
|
var itemsDeQueued uint64
|
2015-12-31 04:23:56 +01:00
|
|
|
for itemsDeQueued < docsUpdated {
|
2015-08-25 20:52:42 +02:00
|
|
|
result := <-resultChan
|
2016-01-02 07:25:34 +01:00
|
|
|
collectRows = append(collectRows, result.Rows)
|
2015-08-25 20:52:42 +02:00
|
|
|
itemsDeQueued++
|
|
|
|
}
|
|
|
|
close(resultChan)
|
|
|
|
|
|
|
|
detectedUnsafeMutex.RLock()
|
|
|
|
defer detectedUnsafeMutex.RUnlock()
|
|
|
|
if detectedUnsafe {
|
|
|
|
return UnsafeBatchUseDetected
|
|
|
|
}
|
|
|
|
|
|
|
|
atomic.AddUint64(&f.stats.analysisTime, uint64(time.Since(analysisStart)))
|
|
|
|
|
2016-01-02 07:25:34 +01:00
|
|
|
var deleteKeys [][]byte
|
|
|
|
if len(batch.InternalOps) > 0 {
|
|
|
|
// add the internal ops
|
|
|
|
updateInternalRows := make([]index.IndexRow, 0, len(batch.InternalOps))
|
|
|
|
for internalKey, internalValue := range batch.InternalOps {
|
|
|
|
if internalValue == nil {
|
|
|
|
// delete
|
|
|
|
deleteInternalRow := NewInternalRow([]byte(internalKey), nil)
|
|
|
|
deleteKeys = append(deleteKeys, deleteInternalRow.Key())
|
|
|
|
} else {
|
|
|
|
updateInternalRow := NewInternalRow([]byte(internalKey), internalValue)
|
|
|
|
updateInternalRows = append(updateInternalRows, updateInternalRow)
|
|
|
|
}
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
2016-01-02 07:25:34 +01:00
|
|
|
collectRows = append(collectRows, updateInternalRows)
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
|
|
|
|
2015-12-31 05:43:31 +01:00
|
|
|
inflightItems := make([]*InFlightItem, 0, len(batch.IndexOps))
|
|
|
|
for docID, doc := range batch.IndexOps {
|
|
|
|
if doc != nil {
|
|
|
|
inflightItems = append(inflightItems,
|
|
|
|
&InFlightItem{[]byte(docID), doc.Number})
|
|
|
|
} else {
|
|
|
|
inflightItems = append(inflightItems,
|
|
|
|
&InFlightItem{[]byte(docID), 0})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-08-25 20:52:42 +02:00
|
|
|
indexStart := time.Now()
|
2016-01-10 18:58:24 +01:00
|
|
|
|
2015-08-25 20:52:42 +02:00
|
|
|
// start a writer for this batch
|
|
|
|
var kvwriter store.KVWriter
|
|
|
|
kvwriter, err = f.store.Writer()
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
var dictionaryDeltas map[string]int64
|
2016-01-02 07:25:34 +01:00
|
|
|
dictionaryDeltas, err = f.batchRows(kvwriter, collectRows, deleteKeys)
|
2015-08-25 20:52:42 +02:00
|
|
|
if err != nil {
|
|
|
|
_ = kvwriter.Close()
|
|
|
|
atomic.AddUint64(&f.stats.errors, 1)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-12-31 05:43:31 +01:00
|
|
|
f.compensator.MutateBatch(inflightItems, lastDocNumber)
|
2016-01-10 18:58:24 +01:00
|
|
|
|
|
|
|
err = kvwriter.Close()
|
|
|
|
|
2015-12-31 05:43:31 +01:00
|
|
|
f.lookuper.NotifyBatch(inflightItems)
|
2015-08-25 20:52:42 +02:00
|
|
|
f.dictUpdater.NotifyBatch(dictionaryDeltas)
|
|
|
|
|
|
|
|
atomic.AddUint64(&f.stats.indexTime, uint64(time.Since(indexStart)))
|
|
|
|
|
|
|
|
if err == nil {
|
2015-12-31 04:23:56 +01:00
|
|
|
atomic.AddUint64(&f.stats.updates, docsUpdated)
|
2015-08-25 20:52:42 +02:00
|
|
|
atomic.AddUint64(&f.stats.deletes, docsDeleted)
|
|
|
|
atomic.AddUint64(&f.stats.batches, 1)
|
|
|
|
} else {
|
|
|
|
atomic.AddUint64(&f.stats.errors, 1)
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *Firestorm) SetInternal(key, val []byte) (err error) {
|
|
|
|
internalRow := NewInternalRow(key, val)
|
|
|
|
var writer store.KVWriter
|
|
|
|
writer, err = f.store.Writer()
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
if cerr := writer.Close(); err == nil && cerr != nil {
|
|
|
|
err = cerr
|
|
|
|
}
|
|
|
|
}()
|
2015-10-28 16:26:01 +01:00
|
|
|
|
|
|
|
wb := writer.NewBatch()
|
|
|
|
wb.Set(internalRow.Key(), internalRow.Value())
|
|
|
|
|
|
|
|
return writer.ExecuteBatch(wb)
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (f *Firestorm) DeleteInternal(key []byte) (err error) {
|
|
|
|
internalRow := NewInternalRow(key, nil)
|
|
|
|
var writer store.KVWriter
|
|
|
|
writer, err = f.store.Writer()
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
if cerr := writer.Close(); err == nil && cerr != nil {
|
|
|
|
err = cerr
|
|
|
|
}
|
|
|
|
}()
|
2015-10-28 16:26:01 +01:00
|
|
|
|
|
|
|
wb := writer.NewBatch()
|
|
|
|
wb.Delete(internalRow.Key())
|
|
|
|
|
|
|
|
return writer.ExecuteBatch(wb)
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (f *Firestorm) DumpAll() chan interface{} {
|
|
|
|
rv := make(chan interface{})
|
|
|
|
go func() {
|
|
|
|
defer close(rv)
|
|
|
|
|
|
|
|
// start an isolated reader for use during the dump
|
|
|
|
kvreader, err := f.store.Reader()
|
|
|
|
if err != nil {
|
|
|
|
rv <- err
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
cerr := kvreader.Close()
|
|
|
|
if cerr != nil {
|
|
|
|
rv <- cerr
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2015-11-24 20:32:33 +01:00
|
|
|
err = f.dumpPrefix(kvreader, rv, nil)
|
|
|
|
if err != nil {
|
|
|
|
rv <- err
|
|
|
|
return
|
|
|
|
}
|
2015-08-25 20:52:42 +02:00
|
|
|
}()
|
|
|
|
return rv
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *Firestorm) DumpDoc(docID string) chan interface{} {
|
|
|
|
rv := make(chan interface{})
|
|
|
|
go func() {
|
|
|
|
defer close(rv)
|
|
|
|
|
|
|
|
// start an isolated reader for use during the dump
|
|
|
|
kvreader, err := f.store.Reader()
|
|
|
|
if err != nil {
|
|
|
|
rv <- err
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
cerr := kvreader.Close()
|
|
|
|
if cerr != nil {
|
|
|
|
rv <- cerr
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2015-11-24 20:32:33 +01:00
|
|
|
err = f.dumpDoc(kvreader, rv, []byte(docID))
|
|
|
|
if err != nil {
|
|
|
|
rv <- err
|
|
|
|
return
|
|
|
|
}
|
2015-08-25 20:52:42 +02:00
|
|
|
}()
|
|
|
|
return rv
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *Firestorm) DumpFields() chan interface{} {
|
|
|
|
rv := make(chan interface{})
|
|
|
|
go func() {
|
|
|
|
defer close(rv)
|
|
|
|
|
|
|
|
// start an isolated reader for use during the dump
|
|
|
|
kvreader, err := f.store.Reader()
|
|
|
|
if err != nil {
|
|
|
|
rv <- err
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
cerr := kvreader.Close()
|
|
|
|
if cerr != nil {
|
|
|
|
rv <- cerr
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2015-11-24 20:32:33 +01:00
|
|
|
err = f.dumpPrefix(kvreader, rv, FieldKeyPrefix)
|
|
|
|
if err != nil {
|
|
|
|
rv <- err
|
|
|
|
return
|
|
|
|
}
|
2015-08-25 20:52:42 +02:00
|
|
|
}()
|
|
|
|
return rv
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *Firestorm) Reader() (index.IndexReader, error) {
|
|
|
|
return newFirestormReader(f)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *Firestorm) Stats() json.Marshaler {
|
|
|
|
return f.stats
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2015-12-01 18:29:56 +01:00
|
|
|
func (f *Firestorm) Wait(timeout time.Duration) error {
|
|
|
|
return f.dictUpdater.waitTasksDone(timeout)
|
|
|
|
}
|
|
|
|
|
2015-08-25 20:52:42 +02:00
|
|
|
func init() {
|
2015-10-28 16:26:01 +01:00
|
|
|
registry.RegisterIndexType(Name, NewFirestorm)
|
2015-08-25 20:52:42 +02:00
|
|
|
}
|