From dbf50b7f2907b4a08ebbbe75deb6fa4a7bcbd97b Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Thu, 26 Mar 2015 16:40:18 -0700 Subject: [PATCH] KVStore gtreap allows only 1 writer at a time --- index/store/gtreap/gtreap.go | 73 ++++++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 20 deletions(-) diff --git a/index/store/gtreap/gtreap.go b/index/store/gtreap/gtreap.go index 514455b3..22c4a2fc 100644 --- a/index/store/gtreap/gtreap.go +++ b/index/store/gtreap/gtreap.go @@ -16,6 +16,7 @@ package gtreap import ( "bytes" + "fmt" "math/rand" "sync" @@ -31,8 +32,17 @@ func init() { registry.RegisterKVStore(Name, StoreConstructor) } +const MAX_CONCURRENT_WRITERS = 1 + func StoreConstructor(config map[string]interface{}) (store.KVStore, error) { - return &Store{t: gtreap.NewTreap(itemCompare)}, nil + s := &Store{ + availableWriters: make(chan bool, MAX_CONCURRENT_WRITERS), + t: gtreap.NewTreap(itemCompare), + } + for i := 0; i < MAX_CONCURRENT_WRITERS; i++ { + s.availableWriters <- true + } + return s, nil } type Item struct { @@ -45,6 +55,8 @@ func itemCompare(a, b interface{}) int { } type Store struct { + availableWriters chan bool + m sync.Mutex t *gtreap.Treap } @@ -53,6 +65,10 @@ type Reader struct { t *gtreap.Treap } +type Writer struct { + s *Store +} + type Iterator struct { t *gtreap.Treap @@ -74,6 +90,7 @@ type Batch struct { } func (s *Store) Close() error { + close(s.availableWriters) return nil } @@ -85,7 +102,12 @@ func (s *Store) Reader() (store.KVReader, error) { } func (s *Store) Writer() (store.KVWriter, error) { - return s, nil + available, ok := <-s.availableWriters + if !ok || !available { + return nil, fmt.Errorf("no available writers") + } + + return &Writer{s: s}, nil } func (w *Reader) Get(k []byte) (v []byte, err error) { @@ -104,10 +126,10 @@ func (w *Reader) Close() error { return nil } -func (w *Store) Get(k []byte) (v []byte, err error) { - w.m.Lock() - t := w.t - w.m.Unlock() +func (w *Writer) Get(k []byte) (v []byte, err error) { + w.s.m.Lock() + t := w.s.t + w.s.m.Unlock() itm := t.Get(&Item{k: k}) if itm != nil { @@ -116,30 +138,40 @@ func (w *Store) Get(k []byte) (v []byte, err error) { return nil, nil } -func (w *Store) Iterator(k []byte) store.KVIterator { - w.m.Lock() - t := w.t - w.m.Unlock() +func (w *Writer) Iterator(k []byte) store.KVIterator { + w.s.m.Lock() + t := w.s.t + w.s.m.Unlock() + return newIterator(t).restart(&Item{k: k}) } -func (w *Store) Set(k, v []byte) (err error) { - w.m.Lock() - w.t = w.t.Upsert(&Item{k: k, v: append([]byte(nil), v...)}, rand.Int()) - w.m.Unlock() +func (w *Writer) Close() error { + w.s.availableWriters <- true + w.s = nil + return nil } -func (w *Store) Delete(k []byte) (err error) { - w.m.Lock() - w.t = w.t.Delete(&Item{k: k}) - w.m.Unlock() +func (w *Writer) Set(k, v []byte) (err error) { + w.s.m.Lock() + w.s.t = w.s.t.Upsert(&Item{k: k, v: append([]byte(nil), v...)}, rand.Int()) + w.s.m.Unlock() + return nil } -func (w *Store) NewBatch() store.KVBatch { +func (w *Writer) Delete(k []byte) (err error) { + w.s.m.Lock() + w.s.t = w.s.t.Delete(&Item{k: k}) + w.s.m.Unlock() + + return nil +} + +func (w *Writer) NewBatch() store.KVBatch { return &Batch{ - s: w, + s: w.s, items: make([]*Item, 0, 100), ms: map[string]store.AssociativeMergeChain{}, } @@ -306,3 +338,4 @@ func (w *Batch) Execute() (err error) { func (w *Batch) Close() error { return nil } +