0
0
Fork 0

fix data race in bleve batch reuse

Currently bleve batch is build by user goroutine
Then read by bleve gourinte
This is still safe when used correctly
However, Reset() will modify the map, which is now a data race

This fix is to simply make batch.Reset() alloc new maps.
This provides a data-access pattern that can be used safely.
Also, this thread argues that creating a new map may be faster
than trying to reuse an existing one:

https://groups.google.com/d/msg/golang-nuts/UvUm3LA1u8g/jGv_FobNpN0J

Separate but related, I have opted to remove the "unsafe batch"
checking that we did.  This was always limited anyway, and now
users of Go 1.6 are just as likely to get a panic from the
runtime for concurrent map access anyway.  So, the price paid
by us (additional mutex) is not worth it.

fixes #360 and #260
This commit is contained in:
Marty Schoch 2016-04-08 15:32:13 -04:00
parent 617fcf693e
commit b8a2fbb887
5 changed files with 64 additions and 125 deletions

View File

@ -11,8 +11,6 @@ package firestorm
import (
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
@ -24,8 +22,6 @@ import (
const Name = "firestorm"
var UnsafeBatchUseDetected = fmt.Errorf("bleve.Batch is NOT thread-safe, modification after execution detected")
type Firestorm struct {
highDocNumber uint64
docCount uint64
@ -312,20 +308,9 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) {
}
}
var detectedUnsafeMutex sync.RWMutex
detectedUnsafe := false
go func() {
sofar := uint64(0)
for _, doc := range batch.IndexOps {
if doc != nil {
sofar++
if sofar > docsUpdated {
detectedUnsafeMutex.Lock()
detectedUnsafe = true
detectedUnsafeMutex.Unlock()
return
}
aw := index.NewAnalysisWork(f, doc, resultChan)
// put the work on the queue
f.analysisQueue.Queue(aw)
@ -345,12 +330,6 @@ func (f *Firestorm) Batch(batch *index.Batch) (err error) {
}
close(resultChan)
detectedUnsafeMutex.RLock()
defer detectedUnsafeMutex.RUnlock()
if detectedUnsafe {
return UnsafeBatchUseDetected
}
atomic.AddUint64(&f.stats.analysisTime, uint64(time.Since(analysisStart)))
var deleteKeys [][]byte

View File

@ -195,10 +195,6 @@ func (b *Batch) String() string {
}
func (b *Batch) Reset() {
for k := range b.IndexOps {
delete(b.IndexOps, k)
}
for k := range b.InternalOps {
delete(b.InternalOps, k)
}
b.IndexOps = make(map[string]*document.Document)
b.InternalOps = make(map[string][]byte)
}

View File

@ -38,8 +38,6 @@ const RowBufferSize = 4 * 1024
var VersionKey = []byte{'v'}
var UnsafeBatchUseDetected = fmt.Errorf("bleve.Batch is NOT thread-safe, modification after execution detected")
const Version uint8 = 5
var IncompatibleVersion = fmt.Errorf("incompatible version, %d is supported", Version)
@ -802,20 +800,9 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
}
}
var detectedUnsafeMutex sync.RWMutex
detectedUnsafe := false
go func() {
sofar := uint64(0)
for _, doc := range batch.IndexOps {
if doc != nil {
sofar++
if sofar > numUpdates {
detectedUnsafeMutex.Lock()
detectedUnsafe = true
detectedUnsafeMutex.Unlock()
return
}
aw := index.NewAnalysisWork(udc, doc, resultChan)
// put the work on the queue
udc.analysisQueue.Queue(aw)
@ -932,12 +919,6 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
return docBackIndexRowErr
}
detectedUnsafeMutex.RLock()
defer detectedUnsafeMutex.RUnlock()
if detectedUnsafe {
return UnsafeBatchUseDetected
}
// start a writer for this batch
var kvwriter store.KVWriter
kvwriter, err = udc.store.Writer()

View File

@ -1,79 +0,0 @@
// Copyright (c) 2015 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
// this file includes tests which intentionally create race conditions,
// so exclude them from running with the race detector
// +build !race
package bleve
import (
"fmt"
"os"
"sync"
"testing"
"github.com/blevesearch/bleve/index/upside_down"
)
func TestBatchCrashBug195(t *testing.T) {
defer func() {
err := os.RemoveAll("testidx")
if err != nil {
t.Fatal(err)
}
}()
index, err := New("testidx", NewIndexMapping())
if err != nil {
t.Fatal(err)
}
b := index.NewBatch()
for i := 0; i < 200; i++ {
err := b.Index(fmt.Sprintf("%d", i), struct {
Value string
}{
Value: fmt.Sprintf("%d", i),
})
if err != nil {
t.Fatal(err)
}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := index.Batch(b)
if err != nil && err != upside_down.UnsafeBatchUseDetected {
t.Fatal(err)
}
}()
// now keep adding to the batch after we've started to execute it
for i := 200; i < 400; i++ {
err := b.Index(fmt.Sprintf("%d", i), struct {
Value string
}{
Value: fmt.Sprintf("%d", i),
})
if err != nil {
t.Fatal(err)
}
}
wg.Wait()
err = index.Close()
if err != nil {
t.Fatal(err)
}
}

View File

@ -28,6 +28,7 @@ import (
"github.com/blevesearch/bleve/analysis/analyzers/keyword_analyzer"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/store/null"
"github.com/blevesearch/bleve/search"
)
@ -1508,3 +1509,64 @@ func TestConfigCache(t *testing.T) {
}()
}
}
func TestBatchRaceBug260(t *testing.T) {
defer func() {
err := os.RemoveAll("testidx")
if err != nil {
t.Fatal(err)
}
}()
i, err := New("testidx", NewIndexMapping())
if err != nil {
t.Fatal(err)
}
b := i.NewBatch()
err = b.Index("1", 1)
if err != nil {
t.Fatal(err)
}
err = i.Batch(b)
if err != nil {
t.Fatal(err)
}
b.Reset()
err = b.Index("2", 2)
if err != nil {
t.Fatal(err)
}
err = i.Batch(b)
if err != nil {
t.Fatal(err)
}
b.Reset()
}
func BenchmarkBatchOverhead(b *testing.B) {
defer func() {
err := os.RemoveAll("testidx")
if err != nil {
b.Fatal(err)
}
}()
m := NewIndexMapping()
i, err := NewUsing("testidx", m, Config.DefaultIndexType, null.Name, nil)
if err != nil {
b.Fatal(err)
}
for n := 0; n < b.N; n++ {
// put 1000 items in a batch
batch := i.NewBatch()
for i := 0; i < 1000; i++ {
err = batch.Index(fmt.Sprintf("%d", i), map[string]interface{}{"name": "bleve"})
if err != nil {
b.Fatal(err)
}
}
err = i.Batch(batch)
if err != nil {
b.Fatal(err)
}
batch.Reset()
}
}