0
0
Fork 0
This commit is contained in:
Bertram Truong 2018-03-28 15:40:17 +00:00 committed by GitHub
commit 8f9a92682a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 59 additions and 9 deletions

View File

@ -48,7 +48,16 @@ var createCmd = &cobra.Command{
if err != nil {
return fmt.Errorf("error building mapping: %v", err)
}
idx, err = bleve.NewUsing(args[0], mapping, indexType, storeType, nil)
// If we are using moss, use mossStore and turn on persistSync
var kvconfig map[string]interface{}
if storeType == "moss" {
kvconfig = make(map[string]interface{})
kvconfig["mossLowerLevelStoreName"] = "mossStore"
kvconfig["mossPersistSync"] = true
}
idx, err = bleve.NewUsing(args[0], mapping, indexType, storeType, kvconfig)
if err != nil {
return fmt.Errorf("error creating index: %v", err)
}

View File

@ -520,6 +520,11 @@ type mossStoreWrapper struct {
m sync.Mutex
refs int
s *moss.Store
pendingWrites int
enablePersistSync bool
persistCh chan struct{}
}
func (w *mossStoreWrapper) AddRef() {

View File

@ -22,10 +22,9 @@ import (
"fmt"
"sync"
"github.com/couchbase/moss"
"github.com/blevesearch/bleve/index/store"
"github.com/blevesearch/bleve/registry"
"github.com/couchbase/moss"
)
// RegistryCollectionOptions should be treated as read-only after
@ -155,6 +154,31 @@ func New(mo store.MergeOperator, config map[string]interface{}) (
// --------------------------------------------------
v, ok = config["mossPersistSync"]
if ok {
if val, ok := v.(bool); ok && val {
options.OnEvent = func(event moss.Event) {
if msw, ok := llStore.(*mossStoreWrapper); ok {
msw.enablePersistSync = true
if event.Kind == moss.EventKindPersisterProgress {
stats, err := event.Collection.Stats()
if err == nil && stats.CurDirtyOps <= 0 &&
stats.CurDirtyBytes <= 0 && stats.CurDirtySegments <= 0 {
msw.m.Lock()
if msw.persistCh != nil {
msw.persistCh <- struct{}{}
msw.persistCh = nil
}
msw.m.Unlock()
}
}
}
}
}
}
// --------------------------------------------------
ms, err := moss.NewCollection(options)
if err != nil {
return nil, err
@ -175,12 +199,14 @@ func New(mo store.MergeOperator, config map[string]interface{}) (
}
func (s *Store) Close() error {
if val, ok := s.config["mossAbortCloseEnabled"]; ok {
if v, ok := val.(bool); ok && v {
if msw, ok := s.llstore.(*mossStoreWrapper); ok {
if s := msw.Actual(); s != nil {
_ = s.CloseEx(moss.StoreCloseExOptions{Abort: true})
if msw, ok := s.llstore.(*mossStoreWrapper); ok {
if ss := msw.Actual(); ss != nil {
if val, ok := s.config["mossAbortCloseEnabled"]; ok {
if v, ok := val.(bool); ok && v {
_ = ss.CloseEx(moss.StoreCloseExOptions{Abort: true})
}
} else {
_ = ss.CloseEx(moss.StoreCloseExOptions{})
}
}
}

View File

@ -88,7 +88,17 @@ func (w *Writer) ExecuteBatch(b store.KVBatch) (err error) {
}
}
return w.s.ms.ExecuteBatch(batch.batch, moss.WriteOptions{})
err = w.s.ms.ExecuteBatch(batch.batch, moss.WriteOptions{})
if msw, ok := batch.store.llstore.(*mossStoreWrapper); ok && msw.enablePersistSync {
persistCh := make(chan struct{}, 1)
batch.store.m.Lock()
msw.persistCh = persistCh
batch.store.m.Unlock()
<-persistCh
}
return err
}
func (w *Writer) Close() error {