Skip to content

Commit

Permalink
[testbed] Detect and fix data race at testbed integration test (#30549)
Browse files Browse the repository at this point in the history
**Description:**
Fixing a bug - The Testbed module has many data race warnings when
tested with `go test -race`, especially for traces. I've listed the data
race issues and fixes below:
1. Add mutex to MockBackend `startedAt` variable
Because `tc := testbed.NewTestCase()` starts a `logStats()` goroutine
which calls `MockBackend.GetStats()` that uses the `startedAt` variable,
a data race occurs when we later execute `tc.StartBackend()`. This is
because `tc.StartBackend()` writes to the `startedAt` variable.
2. Add mutex to LoadGenerator `startedAt` variable
The reason is similar to MockBackend.
3. Move MockBackend `numSpansReceived` addition after `ConsumeTraces()`
Because `tc.numSpansReceived.Add(uint64(td.SpanCount()))` will make
`tc.LoadGenerator.DataItemsSent()` equals to
`tc.MockBackend.DataItemsReceived()` which the test case will assume
MockBackend already received all the spans and lead to
`tc.ValidateData()` while MockBackend is actually consuming the traces.
4. Get `td.SpanCount()` before `batchprocessor` consumes the spans at
`opencensusreceiver` and `zipkinreceiver`
With the `batchprocessor` pipeline, there's a step that it moves the
source `td.ResourceSpans()` to the batched one and that is why the
`td.SpanCount()` line while `batchprocessor` consuming the traces will
cause data race.

**Testing:**
Add `-race` args to `testbed/runtests.sh` at line 22 where `go test`
takes place.
Run `TESTS_DIR=correctnesstests/traces make e2e-test`

---------

Signed-off-by: James Ryans <[email protected]>
  • Loading branch information
james-ryans authored Jan 16, 2024
1 parent 2107819 commit 95e673e
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 7 deletions.
3 changes: 2 additions & 1 deletion receiver/opencensusreceiver/internal/octrace/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ func (ocr *Receiver) processReceivedMsg(
func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, td ptrace.Traces) error {
ctx := ocr.obsrecv.StartTracesOp(longLivedRPCCtx)

numReceivedSpans := td.SpanCount()
err := ocr.nextConsumer.ConsumeTraces(ctx, td)
ocr.obsrecv.EndTracesOp(ctx, receiverDataFormat, td.SpanCount(), err)
ocr.obsrecv.EndTracesOp(ctx, receiverDataFormat, numReceivedSpans, err)

return err
}
3 changes: 2 additions & 1 deletion receiver/zipkinreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,14 @@ func (zr *zipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

numReceivedSpans := td.SpanCount()
consumerErr := zr.nextConsumer.ConsumeTraces(ctx, td)

receiverTagValue := zipkinV2TagValue
if asZipkinv1 {
receiverTagValue = zipkinV1TagValue
}
obsrecv.EndTracesOp(ctx, receiverTagValue, td.SpanCount(), consumerErr)
obsrecv.EndTracesOp(ctx, receiverTagValue, numReceivedSpans, consumerErr)
if consumerErr == nil {
// Send back the response "Accepted" as
// required at https://zipkin.io/zipkin-api/#/default/post_spans
Expand Down
5 changes: 5 additions & 0 deletions testbed/testbed/load_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type ProviderSender struct {
// Number of data items (spans or metric data points) sent.
dataItemsSent atomic.Uint64
startedAt time.Time
startMutex sync.Mutex

// Number of permanent errors received
permanentErrors atomic.Uint64
Expand Down Expand Up @@ -116,6 +117,8 @@ func (ps *ProviderSender) Start(options LoadOptions) {

// Begin generation
go ps.generate()
ps.startMutex.Lock()
defer ps.startMutex.Unlock()
ps.startedAt = time.Now()
}

Expand Down Expand Up @@ -148,6 +151,8 @@ func (ps *ProviderSender) IsReady() bool {

// GetStats returns the stats as a printable string.
func (ps *ProviderSender) GetStats() string {
ps.startMutex.Lock()
defer ps.startMutex.Unlock()
sent := ps.DataItemsSent()
return printer.Sprintf("Sent:%10d %s (%d/sec)", sent, ps.sendType, int(float64(sent)/time.Since(ps.startedAt).Seconds()))
}
Expand Down
14 changes: 9 additions & 5 deletions testbed/testbed/mock_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ type MockBackend struct {
logFile *os.File

// Start/stop flags
isStarted bool
stopOnce sync.Once
startedAt time.Time
isStarted bool
stopOnce sync.Once
startedAt time.Time
startMutex sync.Mutex

// Recording fields.
isRecording bool
Expand Down Expand Up @@ -100,6 +101,8 @@ func (mb *MockBackend) Start() error {
}

mb.isStarted = true
mb.startMutex.Lock()
defer mb.startMutex.Unlock()
mb.startedAt = time.Now()
return nil
}
Expand Down Expand Up @@ -130,6 +133,8 @@ func (mb *MockBackend) EnableRecording() {
}

func (mb *MockBackend) GetStats() string {
mb.startMutex.Lock()
defer mb.startMutex.Unlock()
received := mb.DataItemsReceived()
return printer.Sprintf("Received:%10d items (%d/sec)", received, int(float64(received)/time.Since(mb.startedAt).Seconds()))
}
Expand Down Expand Up @@ -190,8 +195,6 @@ func (tc *MockTraceConsumer) ConsumeTraces(_ context.Context, td ptrace.Traces)
return err
}

tc.numSpansReceived.Add(uint64(td.SpanCount()))

rs := td.ResourceSpans()
for i := 0; i < rs.Len(); i++ {
ils := rs.At(i).ScopeSpans()
Expand Down Expand Up @@ -221,6 +224,7 @@ func (tc *MockTraceConsumer) ConsumeTraces(_ context.Context, td ptrace.Traces)
}

tc.backend.ConsumeTrace(td)
tc.numSpansReceived.Add(uint64(td.SpanCount()))

return nil
}
Expand Down

0 comments on commit 95e673e

Please sign in to comment.