0
0
Fork 0

clarify Batch is not threadsafe in docs

in some limited cases we can detect unsafe usage
in these cases, do not trip over ourselves and panic
instead return a strongly typed error upside_down.UnsafeBatchUseDetected
also, introduced Batch.Reset() to allow batch reuse
this is currently still experimental
closes #195
This commit is contained in:
Marty Schoch 2015-05-15 15:04:52 -04:00
parent 580d9013b2
commit 328bc73ed0
5 changed files with 153 additions and 1 deletions

View File

@ -17,7 +17,10 @@ import (
// A Batch groups together multiple Index and Delete
// operations you would like performed at the same
// time.
// time. The Batch structure is NOT thread-safe.
// You should only perform operations on a batch
// from a single thread at a time. Once batch
// execution has started, you may not modify it.
type Batch struct {
index Index
internal *index.Batch
@ -69,6 +72,12 @@ func (b *Batch) String() string {
return b.internal.String()
}
// Reset returns a Batch to the empty state so that it can
// be re-used in the future.
func (b *Batch) Reset() {
b.internal.Reset()
}
// An Index implements all the indexing and searching
// capabilities of bleve. An Index can be created
// using the New() and Open() methods.

View File

@ -144,3 +144,12 @@ func (b *Batch) String() string {
}
return rv
}
func (b *Batch) Reset() {
for k, _ := range b.IndexOps {
delete(b.IndexOps, k)
}
for k, _ := range b.InternalOps {
delete(b.InternalOps, k)
}
}

View File

@ -28,6 +28,8 @@ import (
var VersionKey = []byte{'v'}
var UnsafeBatchUseDetected = fmt.Errorf("bleve.Batch is NOT thread-safe, modification after execution detected")
const Version uint8 = 4
var IncompatibleVersion = fmt.Errorf("incompatible version, %d is supported", Version)
@ -593,9 +595,20 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
}
}
var detectedUnsafeMutex sync.RWMutex
detectedUnsafe := false
go func() {
sofar := uint64(0)
for _, doc := range batch.IndexOps {
if doc != nil {
sofar++
if sofar > numUpdates {
detectedUnsafeMutex.Lock()
detectedUnsafe = true
detectedUnsafeMutex.Unlock()
return
}
aw := AnalysisWork{
udc: udc,
d: doc,
@ -617,6 +630,12 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
}
close(resultChan)
detectedUnsafeMutex.RLock()
defer detectedUnsafeMutex.RUnlock()
if detectedUnsafe {
return UnsafeBatchUseDetected
}
atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart)))
indexStart := time.Now()

72
index_race_test.go Normal file
View File

@ -0,0 +1,72 @@
// Copyright (c) 2015 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
// this file includes tests which intentionally create race conditions
// +build !race
package bleve
import (
"fmt"
"os"
"sync"
"testing"
"github.com/blevesearch/bleve/index/upside_down"
)
func TestBatchCrashBug195(t *testing.T) {
defer func() {
err := os.RemoveAll("testidx")
if err != nil {
t.Fatal(err)
}
}()
index, err := New("testidx", NewIndexMapping())
if err != nil {
t.Fatal(err)
}
b := index.NewBatch()
for i := 0; i < 200; i++ {
b.Index(fmt.Sprintf("%d", i), struct {
Value string
}{
Value: fmt.Sprintf("%d", i),
})
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := index.Batch(b)
if err != nil && err != upside_down.UnsafeBatchUseDetected {
t.Fatal(err)
}
}()
// now keep adding to the batch after we've started to execute it
for i := 200; i < 400; i++ {
b.Index(fmt.Sprintf("%d", i), struct {
Value string
}{
Value: fmt.Sprintf("%d", i),
})
}
wg.Wait()
err = index.Close()
if err != nil {
t.Fatal(err)
}
}

View File

@ -712,3 +712,46 @@ func TestIndexCountMatchSearch(t *testing.T) {
t.Fatal(err)
}
}
func TestBatchReset(t *testing.T) {
defer func() {
err := os.RemoveAll("testidx")
if err != nil {
t.Fatal(err)
}
}()
index, err := New("testidx", NewIndexMapping())
if err != nil {
t.Fatal(err)
}
batch := index.NewBatch()
err = batch.Index("k1", struct {
Body string
}{
Body: "v1",
})
if err != nil {
t.Error(err)
}
batch.Delete("k2")
batch.SetInternal([]byte("k3"), []byte("v3"))
batch.DeleteInternal([]byte("k4"))
if batch.Size() != 4 {
t.Logf("%v", batch)
t.Errorf("expected batch size 4, got %d", batch.Size())
}
batch.Reset()
if batch.Size() != 0 {
t.Errorf("expected batch size 0 after reset, got %d", batch.Size())
}
err = index.Close()
if err != nil {
t.Fatal(err)
}
}