diff --git a/agent/hcp/client/client.go b/agent/hcp/client/client.go index aec1039525aa..4e83098c5378 100644 --- a/agent/hcp/client/client.go +++ b/agent/hcp/client/client.go @@ -296,3 +296,17 @@ func (t *TelemetryConfig) Enabled() (string, bool) { // The endpoint from Telemetry Gateway is a domain without scheme, and without the metrics path, so they must be added. return endpoint + metricsGatewayPath, true } + +// DefaultLabels returns a set of string pairs that must be added as attributes to all exported telemetry data. +func (t *TelemetryConfig) DefaultLabels(nodeID string) map[string]string { + labels := map[string]string{ + "__replica__": nodeID, // used for Cortex HA-metrics (deduplication) + "node_id": nodeID, // used to delineate Consul nodes in graphs + } + + for k, v := range t.Labels { + labels[k] = v + } + + return labels +} diff --git a/agent/hcp/client/metrics_client.go b/agent/hcp/client/metrics_client.go index 15bd71097f79..a45d6343fc46 100644 --- a/agent/hcp/client/metrics_client.go +++ b/agent/hcp/client/metrics_client.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-retryablehttp" hcpcfg "github.com/hashicorp/hcp-sdk-go/config" + "github.com/hashicorp/hcp-sdk-go/resource" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" "golang.org/x/oauth2" @@ -37,6 +38,7 @@ type MetricsClient interface { // cloudConfig represents cloud config for TLS abstracted in an interface for easy testing. type CloudConfig interface { HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) + Resource() (resource.Resource, error) } // otlpClient is an implementation of MetricsClient with a retryable http client for retries and to honor throttle. @@ -64,8 +66,14 @@ func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, erro return nil, fmt.Errorf("failed to init telemetry client: %v", err) } + r, err := cfg.Resource() + if err != nil { + return nil, fmt.Errorf("failed to init telemetry client: %v", err) + } + header := make(http.Header) header.Set("Content-Type", "application/x-protobuf") + header.Set("x-hcp-resource-id", r.String()) return &otlpClient{ client: c, diff --git a/agent/hcp/client/metrics_client_test.go b/agent/hcp/client/metrics_client_test.go index 7c64d731d0b1..ee4c4262bba9 100644 --- a/agent/hcp/client/metrics_client_test.go +++ b/agent/hcp/client/metrics_client_test.go @@ -2,6 +2,7 @@ package client import ( "context" + "fmt" "net/http" "net/http/httptest" "testing" @@ -34,8 +35,17 @@ func TestNewMetricsClient(t *testing.T) { }, "failsHCPConfig": { wantErr: "failed to init telemetry client", - cfg: MockErrCloudCfg{}, - ctx: context.Background(), + cfg: MockCloudCfg{ + ConfigErr: fmt.Errorf("test bad hcp config"), + }, + ctx: context.Background(), + }, + "failsBadResource": { + wantErr: "failed to init telemetry client", + cfg: MockCloudCfg{ + ResourceErr: fmt.Errorf("test bad resource"), + }, + ctx: context.Background(), }, } { t.Run(name, func(t *testing.T) { @@ -68,7 +78,7 @@ func TestExportMetrics(t *testing.T) { t.Run(name, func(t *testing.T) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { require.Equal(t, r.Header.Get("Content-Type"), "application/x-protobuf") - + require.Equal(t, r.Header.Get("x-hcp-resource-id"), testResourceID) require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token") body := colpb.ExportMetricsServiceResponse{} diff --git a/agent/hcp/client/mock_CloudConfig.go b/agent/hcp/client/mock_CloudConfig.go index 860f50fe459a..5f2ef50046d7 100644 --- a/agent/hcp/client/mock_CloudConfig.go +++ b/agent/hcp/client/mock_CloudConfig.go @@ -2,14 +2,16 @@ package client import ( "crypto/tls" - "errors" "net/url" hcpcfg "github.com/hashicorp/hcp-sdk-go/config" "github.com/hashicorp/hcp-sdk-go/profile" + "github.com/hashicorp/hcp-sdk-go/resource" "golang.org/x/oauth2" ) +const testResourceID = "organization/test-org/project/test-project/test-type/test-id" + type mockHCPCfg struct{} func (m *mockHCPCfg) Token() (*oauth2.Token, error) { @@ -25,14 +27,21 @@ func (m *mockHCPCfg) APIAddress() string { return "" } func (m *mockHCPCfg) PortalURL() *url.URL { return &url.URL{} } func (m *mockHCPCfg) Profile() *profile.UserProfile { return nil } -type MockCloudCfg struct{} - -func (m MockCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) { - return &mockHCPCfg{}, nil +type MockCloudCfg struct { + ConfigErr error + ResourceErr error } -type MockErrCloudCfg struct{} +func (m MockCloudCfg) Resource() (resource.Resource, error) { + r := resource.Resource{ + ID: "test-id", + Type: "test-type", + Organization: "test-org", + Project: "test-project", + } + return r, m.ResourceErr +} -func (m MockErrCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) { - return nil, errors.New("test bad HCP config") +func (m MockCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) { + return &mockHCPCfg{}, m.ConfigErr } diff --git a/agent/hcp/config/config.go b/agent/hcp/config/config.go index cf87b685e352..8d1358fa4adf 100644 --- a/agent/hcp/config/config.go +++ b/agent/hcp/config/config.go @@ -7,6 +7,7 @@ import ( "crypto/tls" hcpcfg "github.com/hashicorp/hcp-sdk-go/config" + "github.com/hashicorp/hcp-sdk-go/resource" ) // CloudConfig defines configuration for connecting to HCP services @@ -30,6 +31,10 @@ func (c *CloudConfig) WithTLSConfig(cfg *tls.Config) { c.TLSConfig = cfg } +func (c *CloudConfig) Resource() (resource.Resource, error) { + return resource.FromString(c.ResourceID) +} + func (c *CloudConfig) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) { if c.TLSConfig == nil { c.TLSConfig = &tls.Config{} diff --git a/agent/hcp/deps.go b/agent/hcp/deps.go index 702f7d27a396..fcf2a6ef7800 100644 --- a/agent/hcp/deps.go +++ b/agent/hcp/deps.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/agent/hcp/scada" "github.com/hashicorp/consul/agent/hcp/telemetry" + "github.com/hashicorp/consul/types" "github.com/hashicorp/go-hclog" ) @@ -23,7 +24,7 @@ type Deps struct { Sink metrics.MetricSink } -func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) { +func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (d Deps, err error) { d.Client, err = hcpclient.NewClient(cfg) if err != nil { return @@ -34,7 +35,7 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) { return } - d.Sink = sink(d.Client, &cfg, logger) + d.Sink = sink(d.Client, &cfg, logger, nodeID) return } @@ -42,7 +43,7 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) { // sink provides initializes an OTELSink which forwards Consul metrics to HCP. // The sink is only initialized if the server is registered with the management plane (CCM). // This step should not block server initialization, so errors are logged, but not returned. -func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Logger) metrics.MetricSink { +func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Logger, nodeID types.NodeID) metrics.MetricSink { ctx := context.Background() ctx = hclog.WithContext(ctx, logger) @@ -73,8 +74,10 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo } sinkOpts := &telemetry.OTELSinkOpts{ - Ctx: ctx, - Reader: telemetry.NewOTELReader(metricsClient, u, telemetry.DefaultExportInterval), + Ctx: ctx, + Reader: telemetry.NewOTELReader(metricsClient, u, telemetry.DefaultExportInterval), + Labels: telemetryCfg.DefaultLabels(string(nodeID)), + Filters: telemetryCfg.MetricsConfig.Filters, } sink, err := telemetry.NewOTELSink(sinkOpts) diff --git a/agent/hcp/deps_test.go b/agent/hcp/deps_test.go index 017d7c791c5e..54ec7b6de478 100644 --- a/agent/hcp/deps_test.go +++ b/agent/hcp/deps_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/hashicorp/consul/agent/hcp/client" + "github.com/hashicorp/consul/types" ) func TestSink(t *testing.T) { @@ -48,6 +49,9 @@ func TestSink(t *testing.T) { mockCloudCfg: client.MockCloudCfg{}, }, "noSinkWhenMetricsClientInitFails": { + mockCloudCfg: client.MockCloudCfg{ + ConfigErr: fmt.Errorf("test bad hcp config"), + }, expect: func(mockClient *client.MockClient) { mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ Endpoint: "https://test.com", @@ -56,7 +60,6 @@ func TestSink(t *testing.T) { }, }, nil) }, - mockCloudCfg: client.MockErrCloudCfg{}, }, "failsWithFetchTelemetryFailure": { expect: func(mockClient *client.MockClient) { @@ -92,7 +95,7 @@ func TestSink(t *testing.T) { c := client.NewMockClient(t) l := hclog.NewNullLogger() test.expect(c) - sinkOpts := sink(c, test.mockCloudCfg, l) + sinkOpts := sink(c, test.mockCloudCfg, l, types.NodeID("server1234")) if !test.expectedSink { require.Nil(t, sinkOpts) return diff --git a/agent/hcp/telemetry/filter.go b/agent/hcp/telemetry/filter.go new file mode 100644 index 000000000000..54dca7d44aef --- /dev/null +++ b/agent/hcp/telemetry/filter.go @@ -0,0 +1,37 @@ +package telemetry + +import ( + "fmt" + "regexp" + "strings" + + "github.com/hashicorp/go-multierror" +) + +// newFilterRegex returns a valid regex used to filter metrics. +// It will fail if there are 0 valid regex filters given. +func newFilterRegex(filters []string) (*regexp.Regexp, error) { + var mErr error + validFilters := make([]string, 0, len(filters)) + for _, filter := range filters { + _, err := regexp.Compile(filter) + if err != nil { + mErr = multierror.Append(mErr, fmt.Errorf("compilation of filter %q failed: %w", filter, err)) + continue + } + validFilters = append(validFilters, filter) + } + + if len(validFilters) == 0 { + return nil, multierror.Append(mErr, fmt.Errorf("no valid filters")) + } + + // Combine the valid regex strings with an OR. + finalRegex := strings.Join(validFilters, "|") + composedRegex, err := regexp.Compile(finalRegex) + if err != nil { + return nil, fmt.Errorf("failed to compile regex: %w", err) + } + + return composedRegex, nil +} diff --git a/agent/hcp/telemetry/filter_test.go b/agent/hcp/telemetry/filter_test.go new file mode 100644 index 000000000000..abe962f4cd47 --- /dev/null +++ b/agent/hcp/telemetry/filter_test.go @@ -0,0 +1,58 @@ +package telemetry + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFilter(t *testing.T) { + t.Parallel() + for name, tc := range map[string]struct { + filters []string + expectedRegexString string + matches []string + wantErr string + wantMatch bool + }{ + "badFilterRegex": { + filters: []string{"(*LF)"}, + wantErr: "no valid filters", + }, + "failsWithNoRegex": { + filters: []string{}, + wantErr: "no valid filters", + }, + "matchFound": { + filters: []string{"raft.*", "mem.*"}, + expectedRegexString: "raft.*|mem.*", + matches: []string{"consul.raft.peers", "consul.mem.heap_size"}, + wantMatch: true, + }, + "matchNotFound": { + filters: []string{"mem.*"}, + matches: []string{"consul.raft.peers", "consul.txn.apply"}, + expectedRegexString: "mem.*", + wantMatch: false, + }, + } { + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() + f, err := newFilterRegex(tc.filters) + + if tc.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.wantErr) + return + } + + require.NoError(t, err) + require.Equal(t, tc.expectedRegexString, f.String()) + for _, metric := range tc.matches { + m := f.MatchString(metric) + require.Equal(t, tc.wantMatch, m) + } + }) + } +} diff --git a/agent/hcp/telemetry/otel_sink.go b/agent/hcp/telemetry/otel_sink.go index 9a984150b773..39e9aa599cd8 100644 --- a/agent/hcp/telemetry/otel_sink.go +++ b/agent/hcp/telemetry/otel_sink.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "net/url" + "regexp" "strings" "sync" "time" @@ -24,8 +25,10 @@ const DefaultExportInterval = 10 * time.Second // OTELSinkOpts is used to provide configuration when initializing an OTELSink using NewOTELSink. type OTELSinkOpts struct { - Reader otelsdk.Reader - Ctx context.Context + Reader otelsdk.Reader + Ctx context.Context + Filters []string + Labels map[string]string } // OTELSink captures and aggregates telemetry data as per the OpenTelemetry (OTEL) specification. @@ -35,6 +38,7 @@ type OTELSink struct { // spaceReplacer cleans the flattened key by removing any spaces. spaceReplacer *strings.Replacer logger hclog.Logger + filters *regexp.Regexp // meterProvider is an OTEL MeterProvider, the entrypoint to the OTEL Metrics SDK. // It handles reading/export of aggregated metric data. @@ -81,14 +85,30 @@ func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) { return nil, fmt.Errorf("ferror: provide valid context") } - // Setup OTEL Metrics SDK to aggregate, convert and export metrics. - res := resource.NewSchemaless() + logger := hclog.FromContext(opts.Ctx).Named("otel_sink") + + filterList, err := newFilterRegex(opts.Filters) + if err != nil { + logger.Error("Failed to initialize all filters", "error", err) + } + + attrs := make([]attribute.KeyValue, 0, len(opts.Labels)) + for k, v := range opts.Labels { + kv := attribute.KeyValue{ + Key: attribute.Key(k), + Value: attribute.StringValue(v), + } + attrs = append(attrs, kv) + } + // Setup OTEL Metrics SDK to aggregate, convert and export metrics periodically. + res := resource.NewWithAttributes("", attrs...) meterProvider := otelsdk.NewMeterProvider(otelsdk.WithResource(res), otelsdk.WithReader(opts.Reader)) meter := meterProvider.Meter("github.com/hashicorp/consul/agent/hcp/telemetry") return &OTELSink{ + filters: filterList, spaceReplacer: strings.NewReplacer(" ", "_"), - logger: hclog.FromContext(opts.Ctx).Named("otel_sink"), + logger: logger, meterProvider: meterProvider, meter: &meter, gaugeStore: NewGaugeStore(), @@ -118,6 +138,10 @@ func (o *OTELSink) IncrCounter(key []string, val float32) { func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) { k := o.flattenKey(key) + if !o.filters.MatchString(k) { + return + } + // Set value in global Gauge store. o.gaugeStore.Set(k, float64(val), toAttributes(labels)) @@ -131,7 +155,7 @@ func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometr // It must be explicitly de-registered to be removed (which we do not do), to ensure new gauge values are exported every cycle. inst, err := (*o.meter).Float64ObservableGauge(k, otelmetric.WithFloat64Callback(o.gaugeStore.gaugeCallback(k))) if err != nil { - o.logger.Error("Failed to emit gauge: %w", err) + o.logger.Error("Failed to create gauge instrument", "error", err) return } o.gaugeInstruments[k] = inst @@ -142,6 +166,10 @@ func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometr func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) { k := o.flattenKey(key) + if !o.filters.MatchString(k) { + return + } + o.mutex.Lock() defer o.mutex.Unlock() @@ -149,7 +177,7 @@ func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gomet if !ok { histogram, err := (*o.meter).Float64Histogram(k) if err != nil { - o.logger.Error("Failed to emit gauge: %w", err) + o.logger.Error("Failed create histogram instrument", "error", err) return } inst = histogram @@ -164,6 +192,10 @@ func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gomet func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) { k := o.flattenKey(key) + if !o.filters.MatchString(k) { + return + } + o.mutex.Lock() defer o.mutex.Unlock() @@ -171,7 +203,7 @@ func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gom if !ok { counter, err := (*o.meter).Float64Counter(k) if err != nil { - o.logger.Error("Failed to emit gauge: %w", err) + o.logger.Error("Failed to create counter instrument:", "error", err) return } diff --git a/agent/hcp/telemetry/otel_sink_test.go b/agent/hcp/telemetry/otel_sink_test.go index 2b4dc26abda7..34127bdf9d77 100644 --- a/agent/hcp/telemetry/otel_sink_test.go +++ b/agent/hcp/telemetry/otel_sink_test.go @@ -17,8 +17,13 @@ import ( ) var ( + expectedResource = resource.NewWithAttributes("", attribute.KeyValue{ + Key: attribute.Key("node_id"), + Value: attribute.StringValue("test"), + }) + attrs = attribute.NewSet(attribute.KeyValue{ - Key: attribute.Key("server.id"), + Key: attribute.Key("metric.label"), Value: attribute.StringValue("test"), }) @@ -129,6 +134,16 @@ func TestNewOTELSink(t *testing.T) { Ctx: context.Background(), }, }, + "success": { + opts: &OTELSinkOpts{ + Ctx: context.Background(), + Reader: metric.NewManualReader(), + Labels: map[string]string{ + "server": "test", + }, + Filters: []string{"raft"}, + }, + }, } { test := test t.Run(name, func(t *testing.T) { @@ -153,8 +168,12 @@ func TestOTELSink(t *testing.T) { ctx := context.Background() opts := &OTELSinkOpts{ - Reader: reader, - Ctx: ctx, + Reader: reader, + Ctx: ctx, + Filters: []string{"raft", "autopilot"}, + Labels: map[string]string{ + "node_id": "test", + }, } sink, err := NewOTELSink(opts) @@ -162,7 +181,7 @@ func TestOTELSink(t *testing.T) { labels := []gometrics.Label{ { - Name: "server.id", + Name: "metric.label", Value: "test", }, } @@ -189,6 +208,10 @@ func TestOTELSink_Race(t *testing.T) { opts := &OTELSinkOpts{ Ctx: ctx, Reader: reader, + Labels: map[string]string{ + "node_id": "test", + }, + Filters: []string{"test"}, } sink, err := NewOTELSink(opts) @@ -303,7 +326,7 @@ func performSinkOperation(sink *OTELSink, k string, v metricdata.Metrics, errCh func isSame(t *testing.T, expectedMap map[string]metricdata.Metrics, actual metricdata.ResourceMetrics) { // Validate resource - require.Equal(t, resource.NewSchemaless(), actual.Resource) + require.Equal(t, expectedResource, actual.Resource) // Validate Metrics require.NotEmpty(t, actual.ScopeMetrics) diff --git a/agent/setup.go b/agent/setup.go index 46e60d58b266..6e6bb322681d 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -104,7 +104,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl var extraSinks []metrics.MetricSink if cfg.IsCloudEnabled() { - d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger) + d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger, cfg.NodeID) if err != nil { return d, err }