diff --git a/agent/hcp/deps.go b/agent/hcp/deps.go index 76a2fe32419c..de93da4d756e 100644 --- a/agent/hcp/deps.go +++ b/agent/hcp/deps.go @@ -9,7 +9,6 @@ import ( "net/url" "time" - "github.com/coredns/coredns/plugin/pkg/log" hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/agent/hcp/scada" @@ -45,7 +44,15 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) { // This step should not block server initialization, so errors are logged, but not returned. func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Logger) *telemetry.OTELSink { ctx := context.Background() - url, err := verifyCCMRegistration(ctx, hcpClient) + telemetryCfg, err := fetchTelemetryConfig(ctx, hcpClient, logger) + if err != nil { + return nil + } + + url, err := verifyCCMRegistration(telemetryCfg, logger) + if err != nil { + return nil + } // if endpoint is empty, no metrics endpoint configuration for this Consul server // (e.g. not registered with CCM or feature flag to control rollout) so do not enable the HCP metrics sink. @@ -60,8 +67,10 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo } sinkOpts := &telemetry.OTELSinkOpts{ - Logger: logger, - Reader: telemetry.NewOTELReader(metricsClient, url, 10*time.Second), + Logger: logger, + Labels: telemetryCfg.Labels, + Filters: telemetryCfg.MetricsConfig.Filters, + Reader: telemetry.NewOTELReader(metricsClient, url, 10*time.Second), } sink, err := telemetry.NewOTELSink(sinkOpts) @@ -73,18 +82,22 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo return sink } -// verifyCCMRegistration checks that a server is registered with the HCP management plane -// by making a HTTP request to the HCP TelemetryConfig endpoint. -// If registered, it returns the endpoint for the HCP Telemetry Gateway endpoint where metrics should be forwarded. -func verifyCCMRegistration(ctx context.Context, client hcpclient.Client) (string, error) { +func fetchTelemetryConfig(ctx context.Context, client hcpclient.Client, logger hclog.Logger) (*hcpclient.TelemetryConfig, error) { reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() telemetryCfg, err := client.FetchTelemetryConfig(reqCtx) if err != nil { - return "", fmt.Errorf("failed to fetch telemetry config %w", err) + return nil, fmt.Errorf("failed to fetch telemetry config %w", err) } + return telemetryCfg, nil +} + +// verifyCCMRegistration checks that a server is registered with the HCP management plane +// by making a HTTP request to the HCP TelemetryConfig endpoint. +// If registered, it returns the endpoint for the HCP Telemetry Gateway endpoint where metrics should be forwarded. +func verifyCCMRegistration(telemetryCfg *hcpclient.TelemetryConfig, logger hclog.Logger) (string, error) { endpoint := telemetryCfg.Endpoint if override := telemetryCfg.MetricsConfig.Endpoint; override != "" { endpoint = override @@ -98,7 +111,7 @@ func verifyCCMRegistration(ctx context.Context, client hcpclient.Client) (string // The endpoint from the HCP gateway is a domain without scheme, and without the metrics path, so they must be added. url, err := url.Parse(fmt.Sprintf("https://%s/v1/metrics", endpoint)) if err != nil { - log.Error("failed to parse url: %w", err) + logger.Error("failed to parse url: %w", err) return "", fmt.Errorf("failed to parse url: %w", err) } diff --git a/agent/hcp/deps_test.go b/agent/hcp/deps_test.go index d4fad278db2a..4ada8bd0b2ed 100644 --- a/agent/hcp/deps_test.go +++ b/agent/hcp/deps_test.go @@ -1,7 +1,6 @@ package hcp import ( - "context" "fmt" "testing" @@ -74,69 +73,51 @@ func TestSink(t *testing.T) { func TestVerifyCCMRegistration(t *testing.T) { for name, test := range map[string]struct { - expect func(*client.MockClient) - wantErr string - expectedURL string + telemetryCfg *client.TelemetryConfig + wantErr string + expectedURL string }{ - "failsWithFetchTelemetryFailure": { - expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("FetchTelemetryConfig error")) - }, - wantErr: "failed to fetch telemetry config", - }, "failsWithURLParseErr": { - expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ - // Minimum 2 chars for a domain to be valid. - Endpoint: "s", - MetricsConfig: &client.MetricsConfig{ - // Invalid domain chars - Endpoint: " ", - }, - }, nil) + telemetryCfg: &client.TelemetryConfig{ + // Minimum 2 chars for a domain to be valid. + Endpoint: "s", + MetricsConfig: &client.MetricsConfig{ + // Invalid domain chars + Endpoint: " ", + }, }, wantErr: "failed to parse url:", }, "noErrWithEmptyEndpoint": { - expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ + telemetryCfg: &client.TelemetryConfig{ + Endpoint: "", + MetricsConfig: &client.MetricsConfig{ Endpoint: "", - MetricsConfig: &client.MetricsConfig{ - Endpoint: "", - }, - }, nil) + }, }, expectedURL: "", }, "success": { - expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ - Endpoint: "test.com", - MetricsConfig: &client.MetricsConfig{ - Endpoint: "", - }, - }, nil) + telemetryCfg: &client.TelemetryConfig{ + Endpoint: "test.com", + MetricsConfig: &client.MetricsConfig{ + Endpoint: "", + }, }, expectedURL: "https://test.com/v1/metrics", }, "successMetricsEndpointOverride": { - expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ - Endpoint: "test.com", - MetricsConfig: &client.MetricsConfig{ - Endpoint: "override.com", - }, - }, nil) + telemetryCfg: &client.TelemetryConfig{ + Endpoint: "test.com", + MetricsConfig: &client.MetricsConfig{ + Endpoint: "override.com", + }, }, expectedURL: "https://override.com/v1/metrics", }, } { t.Run(name, func(t *testing.T) { - ctx := context.Background() - mClient := client.NewMockClient(t) - test.expect(mClient) - - url, err := verifyCCMRegistration(ctx, mClient) + url, err := verifyCCMRegistration(test.telemetryCfg, hclog.NewNullLogger()) if test.wantErr != "" { require.Empty(t, url) require.Error(t, err) diff --git a/agent/hcp/telemetry/filter.go b/agent/hcp/telemetry/filter.go new file mode 100644 index 000000000000..f79da97b7f93 --- /dev/null +++ b/agent/hcp/telemetry/filter.go @@ -0,0 +1,41 @@ +package telemetry + +import ( + "fmt" + "regexp" + + "github.com/hashicorp/go-multierror" +) + +// FilterList holds a map of filters, i.e. regular expressions. +// These filters are used to identify which Consul metrics can be transmitted to HCP. +type FilterList struct { + filters map[string]*regexp.Regexp +} + +// NewFilterList returns a FilterList which holds valid regex +// used to filter metrics. It will not fail if invalid REGEX is given, but returns a list of errors. +func NewFilterList(filters []string) (*FilterList, error) { + var err error + f := &FilterList{} + compiledList := make(map[string]*regexp.Regexp, len(filters)) + for idx, filter := range filters { + re, err := regexp.Compile(filter) + if err != nil { + multierror.Append(err, fmt.Errorf("compilation of filter at index %d failed: %w", idx, err)) + } + compiledList[filter] = re + } + f.filters = compiledList + return f, err +} + +// Match returns true if the metric name matches a REGEX in the allowed metric filters. +func (fl *FilterList) Match(name string) bool { + for _, re := range fl.filters { + if re.Match([]byte(name)) { + return true + } + } + return false +} diff --git a/agent/hcp/telemetry/filter_test.go b/agent/hcp/telemetry/filter_test.go new file mode 100644 index 000000000000..2113f6522394 --- /dev/null +++ b/agent/hcp/telemetry/filter_test.go @@ -0,0 +1,39 @@ +package telemetry + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFilter(t *testing.T) { + for name, tc := range map[string]struct { + filters []string + wantMatch bool + wantErr string + }{ + "badFilterRegex": { + filters: []string{"(*LF)"}, + wantErr: "compilation of filter at index 0 failed", + }, + "matchFound": { + filters: []string{"raft.*"}, + wantMatch: true, + }, + "matchNotFound": { + filters: []string{"mem.heap_size"}, + wantMatch: false, + }, + } { + t.Run(name, func(t *testing.T) { + f, err := NewFilterList(tc.filters) + if tc.wantErr != "" { + require.Contains(t, err.Error(), tc.wantErr) + + } else { + m := f.Match("consul.raft.peers") + require.Equal(t, tc.wantMatch, m) + } + }) + } +} diff --git a/agent/hcp/telemetry/otel_sink.go b/agent/hcp/telemetry/otel_sink.go index b0fd58010d2d..b1963c9002cd 100644 --- a/agent/hcp/telemetry/otel_sink.go +++ b/agent/hcp/telemetry/otel_sink.go @@ -20,13 +20,16 @@ import ( ) type OTELSinkOpts struct { - Reader otelsdk.Reader - Logger hclog.Logger + Reader otelsdk.Reader + Logger hclog.Logger + Filters []string + Labels map[string]string } type OTELSink struct { spaceReplacer *strings.Replacer logger hclog.Logger + filters *FilterList meterProvider *otelsdk.MeterProvider meter *otelmetric.Meter @@ -41,7 +44,10 @@ type OTELSink struct { } func NewOTELReader(client client.MetricsClient, endpoint string, exportInterval time.Duration) otelsdk.Reader { - exporter := NewOTELExporter(client, endpoint) + exporter := &OTELExporter{ + client: client, + endpoint: endpoint, + } return otelsdk.NewPeriodicReader(exporter, otelsdk.WithInterval(exportInterval)) } @@ -54,8 +60,21 @@ func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) { return nil, fmt.Errorf("failed to init OTEL sink: provide valid OTELSinkOpts Reader") } + attrs := make([]attribute.KeyValue, len(opts.Labels)) + for k, v := range opts.Labels { + attrs = append(attrs, attribute.KeyValue{ + Key: attribute.Key(k), + Value: attribute.StringValue(v), + }) + } + + filterList, err := NewFilterList(opts.Filters) + if err != nil { + opts.Logger.Error("Failed to initialize all filters: %w", err) + } + // Setup OTEL Metrics SDK to aggregate, convert and export metrics periodically. - res := resource.NewSchemaless() + res := resource.NewWithAttributes("", attrs...) meterProvider := otelsdk.NewMeterProvider(otelsdk.WithResource(res), otelsdk.WithReader(opts.Reader)) meter := meterProvider.Meter("github.com/hashicorp/consul/agent/hcp/telemetry") @@ -65,6 +84,7 @@ func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) { } return &OTELSink{ + filters: filterList, spaceReplacer: strings.NewReplacer(" ", "_"), logger: opts.Logger.Named("otel_sink"), meterProvider: meterProvider, @@ -97,6 +117,10 @@ func (o *OTELSink) IncrCounter(key []string, val float32) { func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) { k := o.flattenKey(key) + if !o.filters.Match(k) { + return + } + // Set value in global Gauge store. o.gaugeStore.Store(k, float64(val), toAttributes(labels)) @@ -118,6 +142,10 @@ func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometr func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) { k := o.flattenKey(key) + if !o.filters.Match(k) { + return + } + o.mutex.Lock() defer o.mutex.Unlock() @@ -133,13 +161,17 @@ func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gomet } attrs := toAttributes(labels) - (*inst).Record(context.TODO(), float64(val), attrs...) + (*inst).Record(context.Background(), float64(val), attrs...) } // IncrCounterWithLabels emits a Consul counter metric that gets registed by an OpenTelemetry Histogram instrument. func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) { k := o.flattenKey(key) + if !o.filters.Match(k) { + return + } + o.mutex.Lock() defer o.mutex.Unlock() @@ -156,7 +188,7 @@ func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gom } attrs := toAttributes(labels) - (*inst).Add(context.TODO(), float64(val), attrs...) + (*inst).Add(context.Background(), float64(val), attrs...) } // EmitKey unsupported. diff --git a/agent/hcp/telemetry/otel_sink_test.go b/agent/hcp/telemetry/otel_sink_test.go index 4f631e678caf..8dfe64d2e9c1 100644 --- a/agent/hcp/telemetry/otel_sink_test.go +++ b/agent/hcp/telemetry/otel_sink_test.go @@ -147,8 +147,9 @@ func TestOTELSink(t *testing.T) { ctx := context.Background() opts := &OTELSinkOpts{ - Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), - Reader: reader, + Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), + Reader: reader, + Filters: []string{"raft", "autopilot"}, } sink, err := NewOTELSink(opts) @@ -176,12 +177,18 @@ func TestOTELSink(t *testing.T) { // Validate resource require.Equal(t, resource.NewSchemaless(), collected.Resource) + require.Equal(t, 1, len(collected.ScopeMetrics)) - // Validate metrics - for _, actual := range collected.ScopeMetrics[0].Metrics { - name := actual.Name - expected, ok := expectedMetrics[name] - require.True(t, ok, "metric key %s should be in expectedMetrics map", name) + collectedMetrics := collected.ScopeMetrics[0].Metrics + + collectedMetricsMap := make(map[string]metricdata.Metrics, len(collectedMetrics)) + for _, v := range collectedMetrics { + collectedMetricsMap[v.Name] = v + } + + for key, expected := range expectedMetrics { + actual, ok := collectedMetricsMap[key] + require.True(t, ok, "metric key %s should be in expectedMetrics map", key) isSameMetrics(t, expected, actual) } }