Skip to content

Commit

Permalink
[exporter/datadog] Refactor WaitGroup into separate fields for traces…
Browse files Browse the repository at this point in the history
… and metrics exporter (#33291)

`TraceAgent` is only called in the traces exporter and
`consumeStatsPayload` is only called in the metrics exporter, so I think
using the same `wg` field was causing conflicts with both exporters in
use at the same time. This PR splits `wg` into separate fields for each
use case.

This PR also removes `wg.Wait` on the logs exporter since that wasn't
applicable in the first place.
  • Loading branch information
liustanley authored May 30, 2024
1 parent 28ca05c commit 9a0c38c
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 17 deletions.
27 changes: 27 additions & 0 deletions .chloggen/stanley.liu_refactor-wg.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/datadog

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fixes a potential race condition when the traces exporter and metrics exporter are both shutting down.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33291]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
36 changes: 19 additions & 17 deletions exporter/datadogexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ type factory struct {
attributesTranslator *attributes.Translator
attributesErr error

wg sync.WaitGroup // waits for agent to exit

registry *featuregate.Registry
}

Expand Down Expand Up @@ -150,14 +148,14 @@ func (f *factory) StopReporter() {
})
}

func (f *factory) TraceAgent(ctx context.Context, params exporter.CreateSettings, cfg *Config, sourceProvider source.Provider, attrsTranslator *attributes.Translator) (*agent.Agent, error) {
func (f *factory) TraceAgent(ctx context.Context, wg *sync.WaitGroup, params exporter.CreateSettings, cfg *Config, sourceProvider source.Provider, attrsTranslator *attributes.Translator) (*agent.Agent, error) {
agnt, err := newTraceAgent(ctx, params, cfg, sourceProvider, metricsclient.InitializeMetricClient(params.MeterProvider, metricsclient.ExporterSourceTag), attrsTranslator)
if err != nil {
return nil, err
}
f.wg.Add(1)
wg.Add(1)
go func() {
defer f.wg.Done()
defer wg.Done()
agnt.Run()
}()
return agnt, nil
Expand Down Expand Up @@ -253,11 +251,11 @@ func checkAndCastConfig(c component.Config, logger *zap.Logger) *Config {
return cfg
}

func (f *factory) consumeStatsPayload(ctx context.Context, statsIn <-chan []byte, statsToAgent chan<- *pb.StatsPayload, tracerVersion string, agentVersion string, logger *zap.Logger) {
func (f *factory) consumeStatsPayload(ctx context.Context, wg *sync.WaitGroup, statsIn <-chan []byte, statsToAgent chan<- *pb.StatsPayload, tracerVersion string, agentVersion string, logger *zap.Logger) {
for i := 0; i < runtime.NumCPU(); i++ {
f.wg.Add(1)
wg.Add(1)
go func() {
defer f.wg.Done()
defer wg.Done()
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -305,7 +303,11 @@ func (f *factory) createMetricsExporter(
return nil, fmt.Errorf("failed to build attributes translator: %w", err)
}

var pushMetricsFn consumer.ConsumeMetricsFunc
var (
pushMetricsFn consumer.ConsumeMetricsFunc
wg sync.WaitGroup // waits for consumeStatsPayload to exit
)

acfg, err := newTraceAgentConfig(ctx, set, cfg, hostProvider, attrsTranslator)
if err != nil {
cancel()
Expand All @@ -321,7 +323,7 @@ func (f *factory) createMetricsExporter(

statsIn := make(chan []byte, 1000)
statsv := set.BuildInfo.Command + set.BuildInfo.Version
f.consumeStatsPayload(ctx, statsIn, statsToAgent, statsv, acfg.AgentVersion, set.Logger)
f.consumeStatsPayload(ctx, &wg, statsIn, statsToAgent, statsv, acfg.AgentVersion, set.Logger)
pcfg := newMetadataConfigfromConfig(cfg)
metadataReporter, err := f.Reporter(set, pcfg)
if err != nil {
Expand Down Expand Up @@ -350,8 +352,8 @@ func (f *factory) createMetricsExporter(
} else {
exp, metricsErr := newMetricsExporter(ctx, set, cfg, acfg, &f.onceMetadata, attrsTranslator, hostProvider, metadataReporter, statsIn)
if metricsErr != nil {
cancel() // first cancel context
f.wg.Wait() // then wait for shutdown
cancel() // first cancel context
wg.Wait() // then wait for shutdown
return nil, metricsErr
}
pushMetricsFn = exp.PushMetricsDataScrubbed
Expand All @@ -370,8 +372,8 @@ func (f *factory) createMetricsExporter(
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithShutdown(func(context.Context) error {
cancel() // first cancel context
f.wg.Wait() // then wait for shutdown
cancel() // first cancel context
wg.Wait() // then wait for shutdown
f.StopReporter()
statsWriter.Stop()
if statsIn != nil {
Expand Down Expand Up @@ -408,6 +410,7 @@ func (f *factory) createTracesExporter(
var (
pusher consumer.ConsumeTracesFunc
stop component.ShutdownFunc
wg sync.WaitGroup // waits for agent to exit
)

hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname)
Expand All @@ -423,7 +426,7 @@ func (f *factory) createTracesExporter(
return nil, fmt.Errorf("failed to build attributes translator: %w", err)
}

traceagent, err := f.TraceAgent(ctx, set, cfg, hostProvider, attrsTranslator)
traceagent, err := f.TraceAgent(ctx, &wg, set, cfg, hostProvider, attrsTranslator)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to start trace-agent: %w", err)
Expand Down Expand Up @@ -462,7 +465,7 @@ func (f *factory) createTracesExporter(
tracex, err2 := newTracesExporter(ctx, set, cfg, &f.onceMetadata, hostProvider, traceagent, metadataReporter)
if err2 != nil {
cancel()
f.wg.Wait() // then wait for shutdown
wg.Wait() // then wait for shutdown
return nil, err2
}
pusher = tracex.consumeTraces
Expand Down Expand Up @@ -543,7 +546,6 @@ func (f *factory) createLogsExporter(
exp, err := newLogsExporter(ctx, set, cfg, &f.onceMetadata, attributesTranslator, hostProvider, metadataReporter)
if err != nil {
cancel()
f.wg.Wait() // then wait for shutdown
return nil, err
}
pusher = exp.consumeLogs
Expand Down
56 changes: 56 additions & 0 deletions exporter/datadogexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"sync"
"testing"
"time"

"github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata"
"github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata/payload"
Expand Down Expand Up @@ -738,3 +739,58 @@ func TestOnlyMetadata(t *testing.T) {
recvMetadata := <-server.MetadataChan
assert.Equal(t, recvMetadata.InternalHostname, "custom-hostname")
}

func TestStopExporters(t *testing.T) {
server := testutil.DatadogServerMock()
defer server.Close()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "api").String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))

c := cfg.(*Config)
c.Metrics.TCPAddrConfig.Endpoint = server.URL
c.HostMetadata.Enabled = false

ctx := context.Background()
expTraces, err := factory.CreateTracesExporter(
ctx,
exportertest.NewNopCreateSettings(),
cfg,
)
assert.NoError(t, err)
assert.NotNil(t, expTraces)
expMetrics, err := factory.CreateMetricsExporter(
ctx,
exportertest.NewNopCreateSettings(),
cfg,
)
assert.NoError(t, err)
assert.NotNil(t, expMetrics)

err = expTraces.Start(ctx, nil)
assert.NoError(t, err)
err = expMetrics.Start(ctx, nil)
assert.NoError(t, err)

finishShutdown := make(chan bool)
go func() {
err = expMetrics.Shutdown(ctx)
assert.NoError(t, err)
err = expTraces.Shutdown(ctx)
assert.NoError(t, err)
finishShutdown <- true
}()

select {
case <-finishShutdown:
break
case <-time.After(time.Second * 10):
t.Fatal("Timed out")
}
}

0 comments on commit 9a0c38c

Please sign in to comment.