diff --git a/index/store/gtreap/gtreap.go b/index/store/gtreap/gtreap.go index c1272dd1..5992ca66 100644 --- a/index/store/gtreap/gtreap.go +++ b/index/store/gtreap/gtreap.go @@ -53,13 +53,6 @@ type Reader struct { t *gtreap.Treap } -type Writer struct { - s *Store - - m sync.Mutex - t *gtreap.Treap -} - type Iterator struct { t *gtreap.Treap @@ -75,19 +68,12 @@ func newIterator(t *gtreap.Treap) *Iterator { } type Batch struct { - w *Writer - - m sync.Mutex - - ks [][]byte - vs [][]byte - ms map[string]store.AssociativeMergeChain + s *Store + items []*Item + ms map[string]store.AssociativeMergeChain } func (s *Store) Close() error { - s.m.Lock() - s.t = nil - s.m.Unlock() return nil } @@ -99,10 +85,7 @@ func (s *Store) Reader() (store.KVReader, error) { } func (s *Store) Writer() (store.KVWriter, error) { - s.m.Lock() - t := s.t - s.m.Unlock() - return &Writer{s: s, t: t}, nil + return s, nil } func (w *Reader) Get(k []byte) (v []byte, err error) { @@ -121,7 +104,7 @@ func (w *Reader) Close() error { return nil } -func (w *Writer) Get(k []byte) (v []byte, err error) { +func (w *Store) Get(k []byte) (v []byte, err error) { w.m.Lock() t := w.t w.m.Unlock() @@ -133,41 +116,33 @@ func (w *Writer) Get(k []byte) (v []byte, err error) { return nil, nil } -func (w *Writer) Iterator(k []byte) store.KVIterator { +func (w *Store) Iterator(k []byte) store.KVIterator { w.m.Lock() t := w.t w.m.Unlock() return newIterator(t).restart(&Item{k: k}) } -func (w *Writer) Close() error { +func (w *Store) Set(k, v []byte) (err error) { w.m.Lock() - w.t = nil + w.t = w.t.Upsert(&Item{k: k, v: v}, rand.Int()) w.m.Unlock() return nil } -func (w *Writer) Set(k, v []byte) (err error) { - w.s.m.Lock() - w.s.t = w.s.t.Upsert(&Item{k: k, v: v}, rand.Int()) - t := w.s.t - w.s.m.Unlock() - - w.m.Lock() - w.t = t - w.m.Unlock() - return nil -} - -func (w *Writer) Delete(k []byte) (err error) { +func (w *Store) Delete(k []byte) (err error) { w.m.Lock() w.t = w.t.Delete(&Item{k: k}) w.m.Unlock() return nil } -func (w *Writer) NewBatch() store.KVBatch { - return &Batch{w: w, ms: map[string]store.AssociativeMergeChain{}} +func (w *Store) NewBatch() store.KVBatch { + return &Batch{ + s: w, + items: make([]*Item, 0, 100), + ms: map[string]store.AssociativeMergeChain{}, + } } func (w *Iterator) SeekFirst() { @@ -268,44 +243,26 @@ func (w *Iterator) Close() error { } func (w *Batch) Set(k, v []byte) { - w.m.Lock() - w.ks = append(w.ks, k) - w.vs = append(w.vs, v) - w.m.Unlock() + w.items = append(w.items, &Item{k, v}) } func (w *Batch) Delete(k []byte) { - w.m.Lock() - w.ks = append(w.ks, k) - w.vs = append(w.vs, nil) - w.m.Unlock() + w.items = append(w.items, &Item{k, nil}) } func (w *Batch) Merge(k []byte, oper store.AssociativeMerge) { - key := string(k) - w.m.Lock() - w.ms[key] = append(w.ms[key], oper) - w.m.Unlock() + w.ms[string(k)] = append(w.ms[string(k)], oper) } func (w *Batch) Execute() (err error) { - w.m.Lock() - ks := w.ks - w.ks = nil - vs := w.vs - w.vs = nil - ms := w.ms - w.ms = map[string]store.AssociativeMergeChain{} - w.m.Unlock() - done := false for !done { - w.w.s.m.Lock() - torig := w.w.s.t - w.w.s.m.Unlock() + w.s.m.Lock() + torig := w.s.t + w.s.m.Unlock() t := torig - for key, mc := range ms { + for key, mc := range w.ms { k := []byte(key) itm := t.Get(&Item{k: k}) v := []byte(nil) @@ -323,31 +280,26 @@ func (w *Batch) Execute() (err error) { } } - for i, k := range ks { - v := vs[i] + for _, item := range w.items { + v := item.v if v != nil { - t = t.Upsert(&Item{k: k, v: v}, rand.Int()) + t = t.Upsert(item, rand.Int()) } else { - t = t.Delete(&Item{k: k}) + t = t.Delete(item) } } - w.w.s.m.Lock() - if w.w.s.t == torig { - w.w.s.t = t + w.s.m.Lock() + if w.s.t == torig { + w.s.t = t done = true } - w.w.s.m.Unlock() + w.s.m.Unlock() } return nil } func (w *Batch) Close() error { - w.m.Lock() - w.ks = nil - w.vs = nil - w.ms = nil - w.m.Unlock() return nil }