Skip to content

Commit

Permalink
APM: Only do stats obfuscation when we know its safe to do so (#3155)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajgajg1134 authored Feb 12, 2025
1 parent 5e83937 commit 7125f59
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 24 deletions.
2 changes: 1 addition & 1 deletion ddtrace/tracer/civisibility_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (t *ciVisibilityTransport) send(p *payload) (body io.ReadCloser, err error)
// Returns:
//
// An error indicating that stats are not supported.
func (t *ciVisibilityTransport) sendStats(*pb.ClientStatsPayload) error {
func (t *ciVisibilityTransport) sendStats(*pb.ClientStatsPayload, int) error {
// Stats are not supported by CI Visibility agentless / EVP proxy.
return nil
}
Expand Down
17 changes: 11 additions & 6 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,9 @@ type agentFeatures struct {

// metaStructAvailable reports whether the trace-agent can receive spans with the `meta_struct` field.
metaStructAvailable bool

// obfuscationVersion reports the trace-agent's version of obfuscation logic. A value of 0 means this field wasn't present.
obfuscationVersion int
}

// HasFlag reports whether the agent has set the feat feature flag.
Expand All @@ -755,12 +758,13 @@ func loadAgentFeatures(agentDisabled bool, agentURL *url.URL, httpClient *http.C
}
defer resp.Body.Close()
type infoResponse struct {
Endpoints []string `json:"endpoints"`
ClientDropP0s bool `json:"client_drop_p0s"`
FeatureFlags []string `json:"feature_flags"`
PeerTags []string `json:"peer_tags"`
SpanMetaStruct bool `json:"span_meta_structs"`
Config struct {
Endpoints []string `json:"endpoints"`
ClientDropP0s bool `json:"client_drop_p0s"`
FeatureFlags []string `json:"feature_flags"`
PeerTags []string `json:"peer_tags"`
SpanMetaStruct bool `json:"span_meta_structs"`
ObfuscationVersion int `json:"obfuscation_version"`
Config struct {
StatsdPort int `json:"statsd_port"`
} `json:"config"`
}
Expand All @@ -775,6 +779,7 @@ func loadAgentFeatures(agentDisabled bool, agentURL *url.URL, httpClient *http.C
features.StatsdPort = info.Config.StatsdPort
features.metaStructAvailable = info.SpanMetaStruct
features.peerTags = info.PeerTags
features.obfuscationVersion = info.ObfuscationVersion
for _, endpoint := range info.Endpoints {
switch endpoint {
case "/v0.6/stats":
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/tracer/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestLoadAgentFeatures(t *testing.T) {

t.Run("OK", func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Write([]byte(`{"endpoints":["/v0.6/stats"],"feature_flags":["a","b"],"client_drop_p0s":true,"peer_tags":["peer.hostname"],"config": {"statsd_port":8999}}`))
w.Write([]byte(`{"endpoints":["/v0.6/stats"],"feature_flags":["a","b"],"client_drop_p0s":true,"obfuscation_version":2,"peer_tags":["peer.hostname"],"config": {"statsd_port":8999}}`))
}))
defer srv.Close()
cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2))
Expand All @@ -238,6 +238,7 @@ func TestLoadAgentFeatures(t *testing.T) {
assert.True(t, cfg.agent.HasFlag("a"))
assert.True(t, cfg.agent.HasFlag("b"))
assert.EqualValues(t, cfg.agent.peerTags, []string{"peer.hostname"})
assert.Equal(t, 2, cfg.agent.obfuscationVersion)
})

t.Run("discovery", func(t *testing.T) {
Expand Down
24 changes: 22 additions & 2 deletions ddtrace/tracer/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"github.com/DataDog/datadog-go/v5/statsd"
)

// tracerObfuscationVersion indicates which version of stats obfuscation logic we implement
// In the future this can be pulled directly from our obfuscation import.
var tracerObfuscationVersion = 1

// defaultStatsBucketSize specifies the default span of time that will be
// covered in one stats bucket.
var defaultStatsBucketSize = (10 * time.Second).Nanoseconds()
Expand Down Expand Up @@ -157,7 +161,11 @@ func (c *concentrator) runIngester() {
}

func (c *concentrator) newTracerStatSpan(s *span, obfuscator *obfuscate.Obfuscator) (*tracerStatSpan, bool) {
statSpan, ok := c.spanConcentrator.NewStatSpan(s.Service, obfuscatedResource(obfuscator, s.Type, s.Resource),
resource := s.Resource
if c.shouldObfuscate() {
resource = obfuscatedResource(obfuscator, s.Type, s.Resource)
}
statSpan, ok := c.spanConcentrator.NewStatSpan(s.Service, resource,
s.Name, s.Type, s.ParentID, s.Start, s.Duration, s.Error, s.Meta, s.Metrics, c.cfg.agent.peerTags)
if !ok {
return nil, false
Expand All @@ -169,6 +177,11 @@ func (c *concentrator) newTracerStatSpan(s *span, obfuscator *obfuscate.Obfuscat
}, true
}

func (c *concentrator) shouldObfuscate() bool {
// Obfuscate if agent reports an obfuscation version AND our version is at least as new
return c.cfg.agent.obfuscationVersion > 0 && c.cfg.agent.obfuscationVersion <= tracerObfuscationVersion
}

// add s into the concentrator's internal stats buckets.
func (c *concentrator) add(s *tracerStatSpan) {
c.spanConcentrator.AddSpan(s.statSpan, c.aggregationKey, "", nil, s.origin)
Expand Down Expand Up @@ -204,6 +217,13 @@ const (
func (c *concentrator) flushAndSend(timenow time.Time, includeCurrent bool) {
csps := c.spanConcentrator.Flush(timenow.UnixNano(), includeCurrent)

obfVersion := 0
if c.shouldObfuscate() {
obfVersion = tracerObfuscationVersion
} else {
log.Debug("Stats Obfuscation was skipped, agent will obfuscate (tracer %d, agent %d)", tracerObfuscationVersion, c.cfg.agent.obfuscationVersion)
}

if len(csps) == 0 {
// nothing to flush
return
Expand All @@ -214,7 +234,7 @@ func (c *concentrator) flushAndSend(timenow time.Time, includeCurrent bool) {
// compatible in case this ever changes we can just iterate through all of them.
for _, csp := range csps {
flushedBuckets += len(csp.Stats)
if err := c.cfg.transport.sendStats(csp); err != nil {
if err := c.cfg.transport.sendStats(csp, obfVersion); err != nil {
c.statsd().Incr("datadog.tracer.stats.flush_errors", nil, 1)
log.Error("Error sending stats payload: %v", err)
}
Expand Down
55 changes: 54 additions & 1 deletion ddtrace/tracer/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/pkg/obfuscate"
"github.com/DataDog/datadog-go/v5/statsd"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/constants"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils"
Expand Down Expand Up @@ -119,7 +120,7 @@ func TestConcentrator(t *testing.T) {
// stats should be sent if the concentrator is stopped
t.Run("stop", func(t *testing.T) {
transport := newDummyTransport()
c := newConcentrator(&config{transport: transport}, 500000, &statsd.NoOpClientDirect{})
c := newConcentrator(&config{transport: transport}, 500_000, &statsd.NoOpClientDirect{})
assert.Len(t, transport.Stats(), 0)
ss1, ok := c.newTracerStatSpan(&s1, nil)
assert.True(t, ok)
Expand All @@ -130,3 +131,55 @@ func TestConcentrator(t *testing.T) {
})
})
}

func TestShouldObfuscate(t *testing.T) {
bucketSize := int64(500_000)
tsp := newDummyTransport()
for _, params := range []struct {
name string
tracerVersion int
agentVersion int
expectedShouldObfuscate bool
}{
{name: "version equal", tracerVersion: 2, agentVersion: 2, expectedShouldObfuscate: true},
{name: "agent version missing", tracerVersion: 2, agentVersion: 0, expectedShouldObfuscate: false},
{name: "agent version older", tracerVersion: 2, agentVersion: 1, expectedShouldObfuscate: true},
{name: "agent version newer", tracerVersion: 2, agentVersion: 3, expectedShouldObfuscate: false},
} {
t.Run(params.name, func(t *testing.T) {
c := newConcentrator(&config{transport: tsp, env: "someEnv", agent: agentFeatures{obfuscationVersion: params.agentVersion}}, bucketSize, &statsd.NoOpClientDirect{})
defer func(oldVersion int) { tracerObfuscationVersion = oldVersion }(tracerObfuscationVersion)
tracerObfuscationVersion = params.tracerVersion
assert.Equal(t, params.expectedShouldObfuscate, c.shouldObfuscate())
})
}
}

func TestObfuscation(t *testing.T) {
bucketSize := int64(500_000)
s1 := span{
Name: "redis-query",
Start: time.Now().UnixNano() + 3*bucketSize,
Duration: 1,
Metrics: map[string]float64{keyMeasured: 1},
Type: "redis",
Resource: "GET somekey",
}
tsp := newDummyTransport()
c := newConcentrator(&config{transport: tsp, env: "someEnv", agent: agentFeatures{obfuscationVersion: 2}}, bucketSize, &statsd.NoOpClientDirect{})
defer func(oldVersion int) { tracerObfuscationVersion = oldVersion }(tracerObfuscationVersion)
tracerObfuscationVersion = 2

assert.Len(t, tsp.Stats(), 0)
ss1, ok := c.newTracerStatSpan(&s1, obfuscate.NewObfuscator(obfuscate.Config{}))
assert.True(t, ok)
c.Start()
c.In <- ss1
c.Stop()
actualStats := tsp.Stats()
assert.Len(t, actualStats, 1)
assert.Len(t, actualStats[0].Stats, 1)
assert.Len(t, actualStats[0].Stats[0].Stats, 1)
assert.Equal(t, 2, tsp.obfVersion)
assert.Equal(t, "GET", actualStats[0].Stats[0].Stats[0].Resource)
}
16 changes: 12 additions & 4 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2189,12 +2189,13 @@ func startTestTracer(t testing.TB, opts ...StartOption) (trc *tracer, transport
// Mock Transport with a real Encoder
type dummyTransport struct {
sync.RWMutex
traces spanLists
stats []*pb.ClientStatsPayload
traces spanLists
stats []*pb.ClientStatsPayload
obfVersion int
}

func newDummyTransport() *dummyTransport {
return &dummyTransport{traces: spanLists{}}
return &dummyTransport{traces: spanLists{}, obfVersion: -1}
}

func (t *dummyTransport) Len() int {
Expand All @@ -2203,9 +2204,10 @@ func (t *dummyTransport) Len() int {
return len(t.traces)
}

func (t *dummyTransport) sendStats(p *pb.ClientStatsPayload) error {
func (t *dummyTransport) sendStats(p *pb.ClientStatsPayload, obfVersion int) error {
t.Lock()
t.stats = append(t.stats, p)
t.obfVersion = obfVersion
t.Unlock()
return nil
}
Expand All @@ -2216,6 +2218,12 @@ func (t *dummyTransport) Stats() []*pb.ClientStatsPayload {
return t.stats
}

func (t *dummyTransport) ObfuscationVersion() int {
t.RLock()
defer t.RUnlock()
return t.obfVersion
}

func (t *dummyTransport) send(p *payload) (io.ReadCloser, error) {
traces, err := decode(p)
if err != nil {
Expand Down
22 changes: 13 additions & 9 deletions ddtrace/tracer/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"time"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"

traceinternal "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/version"
Expand Down Expand Up @@ -56,12 +55,13 @@ func defaultHTTPClient(timeout time.Duration) *http.Client {
}

const (
defaultHostname = "localhost"
defaultPort = "8126"
defaultAddress = defaultHostname + ":" + defaultPort
defaultURL = "http://" + defaultAddress
defaultHTTPTimeout = 10 * time.Second // defines the current timeout before giving up with the send process
traceCountHeader = "X-Datadog-Trace-Count" // header containing the number of traces in the payload
defaultHostname = "localhost"
defaultPort = "8126"
defaultAddress = defaultHostname + ":" + defaultPort
defaultURL = "http://" + defaultAddress
defaultHTTPTimeout = 10 * time.Second // defines the current timeout before giving up with the send process
traceCountHeader = "X-Datadog-Trace-Count" // header containing the number of traces in the payload
obfuscationVersionHeader = "Datadog-Obfuscation-Version" // header containing the version of obfuscation used, if any
)

// transport is an interface for communicating data to the agent.
Expand All @@ -70,7 +70,8 @@ type transport interface {
// It returns a non-nil response body when no error occurred.
send(p *payload) (body io.ReadCloser, err error)
// sendStats sends the given stats payload to the agent.
sendStats(s *pb.ClientStatsPayload) error
// tracerObfuscationVersion is the version of obfuscation applied (0 if none was applied)
sendStats(s *pb.ClientStatsPayload, tracerObfuscationVersion int) error
// endpoint returns the URL to which the transport will send traces.
endpoint() string
}
Expand Down Expand Up @@ -115,7 +116,7 @@ func newHTTPTransport(url string, client *http.Client) *httpTransport {
}
}

func (t *httpTransport) sendStats(p *pb.ClientStatsPayload) error {
func (t *httpTransport) sendStats(p *pb.ClientStatsPayload, tracerObfuscationVersion int) error {
var buf bytes.Buffer
if err := msgp.Encode(&buf, p); err != nil {
return err
Expand All @@ -124,6 +125,9 @@ func (t *httpTransport) sendStats(p *pb.ClientStatsPayload) error {
if err != nil {
return err
}
if tracerObfuscationVersion > 0 {
req.Header.Set(obfuscationVersionHeader, strconv.Itoa(tracerObfuscationVersion))
}
resp, err := t.client.Do(req)
if err != nil {
return err
Expand Down

0 comments on commit 7125f59

Please sign in to comment.