From 88dbd2550d7e53767442ade062556aa0b04499df Mon Sep 17 00:00:00 2001 From: Dani Louca Date: Fri, 8 Oct 2021 15:34:41 -0400 Subject: [PATCH] Log datapoints option Signed-off-by: Dani Louca --- exporter/signalfxexporter/README.md | 2 ++ exporter/signalfxexporter/config.go | 4 +++ exporter/signalfxexporter/dpclient.go | 19 +++++++++++- exporter/signalfxexporter/exporter.go | 2 ++ .../internal/translation/converter.go | 31 +++++++++++++++++++ 5 files changed, 57 insertions(+), 1 deletion(-) diff --git a/exporter/signalfxexporter/README.md b/exporter/signalfxexporter/README.md index 0cb2dc029066..348489b6495b 100644 --- a/exporter/signalfxexporter/README.md +++ b/exporter/signalfxexporter/README.md @@ -64,6 +64,8 @@ The following configuration options can also be configured: state: [interrupt, user, system] ``` - `headers` (no default): Headers to pass in the payload. +- `log_data_points` (default = `false`): If the log level is set to `debug` + and this is true, all datapoints dispatched to Splunk Observability Cloud will be logged - `log_dimension_updates` (default = `false`): Whether or not to log dimension updates. - `timeout` (default = 5s): Amount of time to wait for a send operation to diff --git a/exporter/signalfxexporter/config.go b/exporter/signalfxexporter/config.go index a59821c63c5c..043863b0eb46 100644 --- a/exporter/signalfxexporter/config.go +++ b/exporter/signalfxexporter/config.go @@ -64,6 +64,9 @@ type Config struct { // here. Headers map[string]string `mapstructure:"headers"` + // Whether to log datapoints dispatched to Splunk Observability Cloud + LogDataPoints bool `mapstructure:"log_data_points"` + // Whether to log dimension updates being sent to SignalFx. LogDimensionUpdates bool `mapstructure:"log_dimension_updates"` @@ -137,6 +140,7 @@ func (cfg *Config) getOptionsFromConfig() (*exporterOptions, error) { apiURL: apiURL, httpTimeout: cfg.Timeout, token: cfg.AccessToken, + logDataPoints: cfg.LogDataPoints, logDimUpdate: cfg.LogDimensionUpdates, metricTranslator: metricTranslator, }, nil diff --git a/exporter/signalfxexporter/dpclient.go b/exporter/signalfxexporter/dpclient.go index 7c39d691ba37..77f2943c5b16 100644 --- a/exporter/signalfxexporter/dpclient.go +++ b/exporter/signalfxexporter/dpclient.go @@ -28,6 +28,7 @@ import ( sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model" "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/model/otlp" "go.opentelemetry.io/collector/model/pdata" "go.uber.org/zap" @@ -42,6 +43,8 @@ type sfxClientBase struct { zippers sync.Pool } +var metricsMarshaler = otlp.NewJSONMetricsMarshaler() + // avoid attempting to compress things that fit into a single ethernet frame func (s *sfxClientBase) getReader(b []byte) (io.Reader, bool, error) { var err error @@ -64,6 +67,7 @@ func (s *sfxClientBase) getReader(b []byte) (io.Reader, bool, error) { // sfxDPClient sends the data to the SignalFx backend. type sfxDPClient struct { sfxClientBase + logDataPoints bool logger *zap.Logger accessTokenPassthrough bool converter *translation.MetricsConverter @@ -78,6 +82,15 @@ func (s *sfxDPClient) pushMetricsData( return 0, nil } + if s.logDataPoints { + buf, err := metricsMarshaler.MarshalMetrics(md) + if err != nil { + s.logger.Error("Failed to marshal metrics for logging", zap.Error(err)) + } else { + s.logger.Debug("received metrics", zap.String("pdata", string(buf))) + } + } + // All metrics in the pdata.Metrics will have the same access token because of the BatchPerResourceMetrics. metricToken := s.retrieveAccessToken(rms.At(0)) @@ -86,7 +99,11 @@ func (s *sfxDPClient) pushMetricsData( for i := 0; i < rms.Len(); i++ { sfxDataPoints = append(sfxDataPoints, s.converter.MetricDataToSignalFxV2(rms.At(i))...) } - + if s.logDataPoints { + for _, dp := range sfxDataPoints { + s.logger.Debug("Dispatching SFx datapoint", zap.String("dp", translation.DatapointToString(dp))) + } + } return s.pushMetricsDataForToken(ctx, sfxDataPoints, metricToken) } diff --git a/exporter/signalfxexporter/exporter.go b/exporter/signalfxexporter/exporter.go index 9df67c514a85..11527769e72c 100644 --- a/exporter/signalfxexporter/exporter.go +++ b/exporter/signalfxexporter/exporter.go @@ -68,6 +68,7 @@ type exporterOptions struct { apiURL *url.URL httpTimeout time.Duration token string + logDataPoints bool logDimUpdate bool metricTranslator *translation.MetricTranslator } @@ -109,6 +110,7 @@ func newSignalFxExporter( }, zippers: newGzipPool(), }, + logDataPoints: options.logDataPoints, logger: logger, accessTokenPassthrough: config.AccessTokenPassthrough, converter: converter, diff --git a/exporter/signalfxexporter/internal/translation/converter.go b/exporter/signalfxexporter/internal/translation/converter.go index 235a60a8bbc6..422decf20e15 100644 --- a/exporter/signalfxexporter/internal/translation/converter.go +++ b/exporter/signalfxexporter/internal/translation/converter.go @@ -127,6 +127,8 @@ func (c *MetricsConverter) metricToSfxDataPoints(metric pdata.Metric, extraDimen dps[resultSliceLen] = dp } resultSliceLen++ + } else { + c.logger.Debug("Datapoint does not match filter, skipping", zap.String("dp", DatapointToString(dp))) } } dps = dps[:resultSliceLen] @@ -502,3 +504,32 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger { }) return logger.WithOptions(opts) } + +func DatapointToString(dp *sfxpb.DataPoint) string { + var tsStr string + if dp.Timestamp != 0 { + tsStr = strconv.FormatInt(dp.Timestamp, 10) + } + + var dimsStr string + for _, dim := range dp.Dimensions { + dimsStr = dimsStr + dim.String() + } + + return fmt.Sprintf("%s: %s (%s) %s\n%s", dp.Metric, dp.Value.String(), dpTypeToString(*dp.MetricType), tsStr, dimsStr) +} + +func dpTypeToString(t sfxpb.MetricType) string { + switch t { + case sfxpb.MetricType_GAUGE: + return "Gauge" + case sfxpb.MetricType_COUNTER: + return "Counter" + case sfxpb.MetricType_ENUM: + return "Enum" + case sfxpb.MetricType_CUMULATIVE_COUNTER: + return "Cumulative Counter" + default: + return fmt.Sprintf("unsupported type %d", t) + } +}