Skip to content

Commit

Permalink
fix: Data races detected from Batch function #448 (#449)
Browse files Browse the repository at this point in the history
After analyzing those data races as listed in #448, there're two
variables shall be in atomic operation to avoid data races:

1. batchData
2. timerActive

This commit from the batchData [][]byte to atomicBatchData to avoid data
race and update timerActive from bool to atomicBool to avoid data race

Signed-off-by: Jude Hung <[email protected]>
  • Loading branch information
judehung authored Sep 3, 2020
1 parent a75dd35 commit 337bfa7
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 19 deletions.
21 changes: 21 additions & 0 deletions internal/common/atomicbool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package common

import "sync"

type AtomicBool struct {
mutex sync.Mutex
value bool
}

func (b *AtomicBool) Value() bool {
b.mutex.Lock()
defer b.mutex.Unlock()
v := b.value
return v
}

func (b *AtomicBool) Set(v bool) {
b.mutex.Lock()
defer b.mutex.Unlock()
b.value = v
}
57 changes: 46 additions & 11 deletions pkg/transforms/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package transforms

import (
"errors"
"sync"
"time"

"github.com/edgexfoundry/app-functions-sdk-go/appcontext"
"github.com/edgexfoundry/app-functions-sdk-go/internal/common"
"github.com/edgexfoundry/app-functions-sdk-go/pkg/util"
)

Expand All @@ -17,15 +19,48 @@ const (
BatchByTimeAndCount
)

type atomicBatchData struct {
mutex sync.Mutex
data [][]byte
}

func (d *atomicBatchData) append(toBeAdded []byte) [][]byte {
d.mutex.Lock()
defer d.mutex.Unlock()
d.data = append(d.data, toBeAdded)
result := d.data
return result
}

func (d *atomicBatchData) all() [][]byte {
d.mutex.Lock()
defer d.mutex.Unlock()
result := d.data
return result
}

func (d *atomicBatchData) removeAll() {
d.mutex.Lock()
defer d.mutex.Unlock()
d.data = nil
}

func (d *atomicBatchData) length() int {
d.mutex.Lock()
defer d.mutex.Unlock()
result := len(d.data)
return result
}

// BatchConfig ...
type BatchConfig struct {
timeInterval string
parsedDuration time.Duration
batchThreshold int
batchMode BatchMode
batchData [][]byte
batchData atomicBatchData
continuedPipelineTransforms []appcontext.AppFunction
timerActive bool
timerActive common.AtomicBool
done chan bool
}

Expand Down Expand Up @@ -85,12 +120,12 @@ func (batch *BatchConfig) Batch(edgexcontext *appcontext.Context, params ...inte
return false, err
}
// always append data
batch.batchData = append(batch.batchData, data)
batch.batchData.append(data)

// If its time only or time and count
if batch.batchMode != BatchByCountOnly {
if batch.timerActive == false {
batch.timerActive = true
if !batch.timerActive.Value() {
batch.timerActive.Set(true)
for {
select {
case <-batch.done:
Expand All @@ -100,7 +135,7 @@ func (batch *BatchConfig) Batch(edgexcontext *appcontext.Context, params ...inte
}
break
}
batch.timerActive = false
batch.timerActive.Set(false)
} else {
if batch.batchMode == BatchByTimeOnly {
return false, nil
Expand All @@ -111,9 +146,9 @@ func (batch *BatchConfig) Batch(edgexcontext *appcontext.Context, params ...inte
if batch.batchMode != BatchByTimeOnly {
//Only want to check the threshold if the timer is running and in TimeAndCount mode OR if we are
// in CountOnly mode
if batch.batchMode == BatchByCountOnly || (batch.timerActive == true && batch.batchMode == BatchByTimeAndCount) {
if batch.batchMode == BatchByCountOnly || (batch.timerActive.Value() && batch.batchMode == BatchByTimeAndCount) {
// if we have not reached the threshold, then stop pipeline and continue batching
if len(batch.batchData) < batch.batchThreshold {
if batch.batchData.length() < batch.batchThreshold {
return false, nil
}
// if in BatchByCountOnly mode, there are no listeners so this would hang indefinitely
Expand All @@ -125,9 +160,9 @@ func (batch *BatchConfig) Batch(edgexcontext *appcontext.Context, params ...inte

edgexcontext.LoggingClient.Debug("Forwarding Batched Data...")
// we've met the threshold, lets clear out the buffer and send it forward in the pipeline
if len(batch.batchData) > 0 {
copy := batch.batchData
batch.batchData = nil
if batch.batchData.length() > 0 {
copy := batch.batchData.all()
batch.batchData.removeAll()
return true, copy
}
return false, nil
Expand Down
16 changes: 8 additions & 8 deletions pkg/transforms/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,29 @@ func TestBatchInCountMode(t *testing.T) {

continuePipeline1, _ := bs.Batch(context, []byte(dataToBatch[0]))
assert.False(t, continuePipeline1)
assert.Len(t, bs.batchData, 1, "Should have 1 record")
assert.Len(t, bs.batchData.all(), 1, "Should have 1 record")

continuePipeline2, _ := bs.Batch(context, []byte(dataToBatch[0]))
assert.False(t, continuePipeline2)
assert.Len(t, bs.batchData, 2, "Should have 2 records")
assert.Len(t, bs.batchData.all(), 2, "Should have 2 records")

continuePipeline3, result3 := bs.Batch(context, []byte(dataToBatch[0]))
assert.True(t, continuePipeline3)
assert.Len(t, result3, 3, "Should have 3 records")
assert.Len(t, bs.batchData, 0, "Records should have been cleared")
assert.Len(t, bs.batchData.all(), 0, "Records should have been cleared")

continuePipeline4, _ := bs.Batch(context, []byte(dataToBatch[0]))
assert.False(t, continuePipeline4)
assert.Len(t, bs.batchData, 1, "Should have 1 record")
assert.Len(t, bs.batchData.all(), 1, "Should have 1 record")

continuePipeline5, _ := bs.Batch(context, []byte(dataToBatch[0]))
assert.False(t, continuePipeline5)
assert.Len(t, bs.batchData, 2, "Should have 2 records")
assert.Len(t, bs.batchData.all(), 2, "Should have 2 records")

continuePipeline6, result4 := bs.Batch(context, []byte(dataToBatch[0]))
assert.True(t, continuePipeline6)
assert.Len(t, result4, 3, "Should have 3 records")
assert.Len(t, bs.batchData, 0, "Records should have been cleared")
assert.Len(t, bs.batchData.all(), 0, "Records should have been cleared")
}
func TestBatchInTimeAndCountMode_TimeElapsed(t *testing.T) {

Expand All @@ -64,7 +64,7 @@ func TestBatchInTimeAndCountMode_TimeElapsed(t *testing.T) {
continuePipeline1, result := bs.Batch(context, []byte(dataToBatch[0]))
assert.True(t, continuePipeline1)
assert.Equal(t, 3, len(result.([][]byte)))
assert.Len(t, bs.batchData, 0, "Should have 0 records")
assert.Len(t, bs.batchData.all(), 0, "Should have 0 records")
wgAll.Done()
}()
go func() {
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestBatchInTimeAndCountMode_CountMet(t *testing.T) {
continuePipeline3, result := bs.Batch(context, []byte(dataToBatch[0]))
assert.True(t, continuePipeline3)
assert.Equal(t, 3, len(result.([][]byte)))
assert.Nil(t, bs.batchData, "Should have 0 records")
assert.Nil(t, bs.batchData.all(), "Should have 0 records")
wgAll.Done()
}()
wgAll.Wait()
Expand Down

0 comments on commit 337bfa7

Please sign in to comment.