parent
57358088ec
commit
57cd67fa88
|
@ -14,6 +14,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
@ -36,9 +37,12 @@ type UpsideDownCouch struct {
|
|||
path string
|
||||
store store.KVStore
|
||||
fieldIndexCache *FieldIndexCache
|
||||
docCount uint64
|
||||
analysisQueue *AnalysisQueue
|
||||
stats *indexStat
|
||||
|
||||
m sync.RWMutex
|
||||
// fields protected by m
|
||||
docCount uint64
|
||||
}
|
||||
|
||||
func NewUpsideDownCouch(s store.KVStore, analysisQueue *AnalysisQueue) *UpsideDownCouch {
|
||||
|
@ -149,6 +153,8 @@ func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRows []UpsideDow
|
|||
}
|
||||
|
||||
func (udc *UpsideDownCouch) DocCount() (uint64, error) {
|
||||
udc.m.RLock()
|
||||
defer udc.m.RUnlock()
|
||||
return udc.docCount, nil
|
||||
}
|
||||
|
||||
|
@ -193,7 +199,9 @@ func (udc *UpsideDownCouch) Open() (err error) {
|
|||
}
|
||||
}
|
||||
// set doc count
|
||||
udc.m.Lock()
|
||||
udc.docCount, err = udc.countDocs(kvwriter)
|
||||
udc.m.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -300,7 +308,9 @@ func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) {
|
|||
|
||||
err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows)
|
||||
if err == nil && backIndexRow == nil {
|
||||
udc.m.Lock()
|
||||
udc.docCount++
|
||||
udc.m.Unlock()
|
||||
}
|
||||
atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
|
||||
if err == nil {
|
||||
|
@ -451,7 +461,9 @@ func (udc *UpsideDownCouch) Delete(id string) (err error) {
|
|||
|
||||
err = udc.batchRows(kvwriter, nil, nil, deleteRows)
|
||||
if err == nil {
|
||||
udc.m.Lock()
|
||||
udc.docCount--
|
||||
udc.m.Unlock()
|
||||
}
|
||||
atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
|
||||
if err == nil {
|
||||
|
@ -663,8 +675,10 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
|
|||
err = udc.batchRows(kvwriter, addRows, updateRows, deleteRows)
|
||||
atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
|
||||
if err == nil {
|
||||
udc.m.Lock()
|
||||
udc.docCount += docsAdded
|
||||
udc.docCount -= docsDeleted
|
||||
udc.m.Unlock()
|
||||
atomic.AddUint64(&udc.stats.updates, numUpdates)
|
||||
atomic.AddUint64(&udc.stats.deletes, docsDeleted)
|
||||
atomic.AddUint64(&udc.stats.batches, 1)
|
||||
|
@ -709,6 +723,8 @@ func (udc *UpsideDownCouch) Reader() (index.IndexReader, error) {
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("error opening store reader: %v", err)
|
||||
}
|
||||
udc.m.RLock()
|
||||
defer udc.m.RUnlock()
|
||||
return &IndexReader{
|
||||
index: udc,
|
||||
kvreader: kvr,
|
||||
|
|
|
@ -607,3 +607,36 @@ func TestBatchString(t *testing.T) {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
func TestIndexMetadataRaceBug198(t *testing.T) {
|
||||
defer func() {
|
||||
err := os.RemoveAll("testidx")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
index, err := New("testidx", NewIndexMapping())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
index.DocCount()
|
||||
}
|
||||
}()
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
batch := index.NewBatch()
|
||||
err = batch.Index("a", []byte("{}"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = index.Batch(batch)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue