more index/store/moss err handling
This commit is contained in:
parent
eb315fa500
commit
ea1a52464d
@ -25,18 +25,28 @@ type Batch struct {
|
||||
}
|
||||
|
||||
func (b *Batch) Set(key, val []byte) {
|
||||
var err error
|
||||
if b.alloced {
|
||||
b.batch.AllocSet(key, val)
|
||||
err = b.batch.AllocSet(key, val)
|
||||
} else {
|
||||
b.batch.Set(key, val)
|
||||
err = b.batch.Set(key, val)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
b.store.Logf("bleve moss batch.Set err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Batch) Delete(key []byte) {
|
||||
var err error
|
||||
if b.alloced {
|
||||
b.batch.AllocDel(key)
|
||||
err = b.batch.AllocDel(key)
|
||||
} else {
|
||||
b.batch.Del(key)
|
||||
err = b.batch.Del(key)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
b.store.Logf("bleve moss batch.Delete err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,7 +55,11 @@ func (b *Batch) Merge(key, val []byte) {
|
||||
}
|
||||
|
||||
func (b *Batch) Reset() {
|
||||
b.Close()
|
||||
err := b.Close()
|
||||
if err != nil {
|
||||
b.store.Logf("bleve moss batch.Close err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
batch, err := b.store.ms.NewBatch(0, 0)
|
||||
if err == nil {
|
||||
@ -56,7 +70,7 @@ func (b *Batch) Reset() {
|
||||
|
||||
func (b *Batch) Close() error {
|
||||
b.merge = nil
|
||||
b.batch.Close()
|
||||
err := b.batch.Close()
|
||||
b.batch = nil
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
)
|
||||
|
||||
type Iterator struct {
|
||||
store *Store
|
||||
ss moss.Snapshot
|
||||
iter moss.Iterator
|
||||
prefix []byte
|
||||
@ -39,10 +40,16 @@ func (x *Iterator) Seek(seekToKey []byte) {
|
||||
|
||||
iter, err := x.ss.StartIterator(seekToKey, x.end, moss.IteratorOptions{})
|
||||
if err != nil {
|
||||
x.store.Logf("bleve moss StartIterator err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = x.iter.Close()
|
||||
if err != nil {
|
||||
x.store.Logf("bleve moss iterator.Seek err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
x.iter.Close()
|
||||
x.iter = iter
|
||||
|
||||
x.checkDone()
|
||||
@ -90,10 +97,12 @@ func (x *Iterator) Valid() bool {
|
||||
}
|
||||
|
||||
func (x *Iterator) Close() error {
|
||||
var err error
|
||||
|
||||
x.ss = nil
|
||||
|
||||
if x.iter != nil {
|
||||
x.iter.Close()
|
||||
err = x.iter.Close()
|
||||
x.iter = nil
|
||||
}
|
||||
|
||||
@ -102,7 +111,7 @@ func (x *Iterator) Close() error {
|
||||
x.k = nil
|
||||
x.v = nil
|
||||
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (x *Iterator) checkDone() {
|
||||
|
@ -66,7 +66,7 @@ func initLowerLevelStore(
|
||||
|
||||
llSnapshot, err := llUpdate(nil)
|
||||
if err != nil {
|
||||
kvStore.Close()
|
||||
_ = kvStore.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
@ -126,7 +126,10 @@ func (s *llStore) decRef() {
|
||||
s.m.Lock()
|
||||
s.refs -= 1
|
||||
if s.refs <= 0 {
|
||||
s.kvStore.Close()
|
||||
err := s.kvStore.Close()
|
||||
if err != nil {
|
||||
s.logf("llStore kvStore.Close err: %v", err)
|
||||
}
|
||||
}
|
||||
s.m.Unlock()
|
||||
}
|
||||
@ -146,20 +149,33 @@ func (s *llStore) update(ssHigher moss.Snapshot, maxBatchSize uint64) (
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer iter.Close()
|
||||
defer func() {
|
||||
err = iter.Close()
|
||||
if err != nil {
|
||||
s.logf("llStore iter.Close err: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
kvWriter, err := s.kvStore.Writer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer kvWriter.Close()
|
||||
defer func() {
|
||||
err = kvWriter.Close()
|
||||
if err != nil {
|
||||
s.logf("llStore kvWriter.Close err: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
batch := kvWriter.NewBatch()
|
||||
|
||||
defer func() {
|
||||
if batch != nil {
|
||||
batch.Close()
|
||||
err = batch.Close()
|
||||
if err != nil {
|
||||
s.logf("llStore batch.Close err: %v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@ -217,8 +233,10 @@ func (s *llStore) update(ssHigher moss.Snapshot, maxBatchSize uint64) (
|
||||
return nil, err
|
||||
}
|
||||
|
||||
batch.Close()
|
||||
batch = nil
|
||||
err = batch.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
batch = kvWriter.NewBatch()
|
||||
}
|
||||
@ -267,7 +285,11 @@ func (llss *llSnapshot) decRef() {
|
||||
llss.refs -= 1
|
||||
if llss.refs <= 0 {
|
||||
if llss.kvReader != nil {
|
||||
llss.kvReader.Close()
|
||||
err := llss.kvReader.Close()
|
||||
if err != nil {
|
||||
llss.llStore.logf("llSnapshot kvReader.Close err: %v", err)
|
||||
}
|
||||
|
||||
llss.kvReader = nil
|
||||
}
|
||||
|
||||
@ -296,7 +318,7 @@ func (llss *llSnapshot) Get(key []byte,
|
||||
|
||||
val, err := r2.Get(key)
|
||||
|
||||
r2.Close()
|
||||
_ = r2.Close()
|
||||
|
||||
return val, err
|
||||
}
|
||||
|
@ -18,7 +18,8 @@ import (
|
||||
)
|
||||
|
||||
type Reader struct {
|
||||
ss moss.Snapshot
|
||||
store *Store
|
||||
ss moss.Snapshot
|
||||
}
|
||||
|
||||
func (r *Reader) Get(k []byte) (v []byte, err error) {
|
||||
@ -43,6 +44,7 @@ func (r *Reader) PrefixIterator(k []byte) store.KVIterator {
|
||||
}
|
||||
|
||||
rv := &Iterator{
|
||||
store: r.store,
|
||||
ss: r.ss,
|
||||
iter: iter,
|
||||
prefix: k,
|
||||
@ -62,6 +64,7 @@ func (r *Reader) RangeIterator(start, end []byte) store.KVIterator {
|
||||
}
|
||||
|
||||
rv := &Iterator{
|
||||
store: r.store,
|
||||
ss: r.ss,
|
||||
iter: iter,
|
||||
prefix: nil,
|
||||
|
@ -178,6 +178,13 @@ func (s *Store) Writer() (store.KVWriter, error) {
|
||||
return &Writer{s: s}, nil
|
||||
}
|
||||
|
||||
func (s *Store) Logf(fmt string, args ...interface{}) {
|
||||
options := s.ms.Options()
|
||||
if options.Log != nil {
|
||||
options.Log(fmt, args...)
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
registry.RegisterKVStore(Name, New)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user