0
0

Merge pull request #354 from steveyen/master

moss lowerLevelUpdate didn't handle batches of size 1
This commit is contained in:
Marty Schoch 2016-03-13 11:43:46 -04:00
commit b31f15002b
2 changed files with 15 additions and 8 deletions

View File

@ -55,9 +55,11 @@ func initLowerLevelStore(
} }
llStore := &llStore{ llStore := &llStore{
refs: 0, refs: 0,
kvStore: kvStore, config: config,
logf: logf, llConfig: lowerLevelStoreConfig,
kvStore: kvStore,
logf: logf,
} }
llUpdate := func(ssHigher moss.Snapshot) (ssLower moss.Snapshot, err error) { llUpdate := func(ssHigher moss.Snapshot) (ssLower moss.Snapshot, err error) {
@ -80,6 +82,9 @@ func initLowerLevelStore(
type llStore struct { type llStore struct {
kvStore store.KVStore kvStore store.KVStore
config map[string]interface{}
llConfig map[string]interface{}
logf func(format string, a ...interface{}) logf func(format string, a ...interface{})
m sync.Mutex // Protects fields that follow. m sync.Mutex // Protects fields that follow.
@ -219,6 +224,8 @@ func (s *llStore) update(ssHigher moss.Snapshot, maxBatchSize uint64) (
" unexpected operation, ex: %v", ex) " unexpected operation, ex: %v", ex)
} }
i++
err = iter.Next() err = iter.Next()
if err == moss.ErrIteratorDone { if err == moss.ErrIteratorDone {
break break
@ -240,19 +247,19 @@ func (s *llStore) update(ssHigher moss.Snapshot, maxBatchSize uint64) (
batch = kvWriter.NewBatch() batch = kvWriter.NewBatch()
} }
i++
} }
if i > 0 { if i > 0 {
s.logf("llStore.update, total: %d, ExecuteBatch: ...", i) s.logf("llStore.update, ExecuteBatch,"+
" path: %s, total: %d, start", s.llConfig["path"], i)
err = kvWriter.ExecuteBatch(batch) err = kvWriter.ExecuteBatch(batch)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s.logf("llStore.update, total: %d, ExecuteBatch: done", i) s.logf("llStore.update, ExecuteBatch,"+
" path: %s: total: %d, done", s.llConfig["path"], i)
} }
} }

View File

@ -82,7 +82,7 @@ func New(mo store.MergeOperator, config map[string]interface{}) (
// -------------------------------------------------- // --------------------------------------------------
if options.Log == nil { if options.Log == nil || options.Debug <= 0 {
options.Log = func(format string, a ...interface{}) {} options.Log = func(format string, a ...interface{}) {}
} }