Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log datapoints option #5689

Merged
merged 1 commit into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions exporter/signalfxexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ The following configuration options can also be configured:
state: [interrupt, user, system]
```
- `headers` (no default): Headers to pass in the payload.
- `log_data_points` (default = `false`): If the log level is set to `debug`
and this is true, all datapoints dispatched to Splunk Observability Cloud will be logged
- `log_dimension_updates` (default = `false`): Whether or not to log dimension
updates.
- `timeout` (default = 5s): Amount of time to wait for a send operation to
Expand Down
4 changes: 4 additions & 0 deletions exporter/signalfxexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type Config struct {
// here.
Headers map[string]string `mapstructure:"headers"`

// Whether to log datapoints dispatched to Splunk Observability Cloud
LogDataPoints bool `mapstructure:"log_data_points"`

// Whether to log dimension updates being sent to SignalFx.
LogDimensionUpdates bool `mapstructure:"log_dimension_updates"`

Expand Down Expand Up @@ -137,6 +140,7 @@ func (cfg *Config) getOptionsFromConfig() (*exporterOptions, error) {
apiURL: apiURL,
httpTimeout: cfg.Timeout,
token: cfg.AccessToken,
logDataPoints: cfg.LogDataPoints,
logDimUpdate: cfg.LogDimensionUpdates,
metricTranslator: metricTranslator,
}, nil
Expand Down
19 changes: 18 additions & 1 deletion exporter/signalfxexporter/dpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/model/otlp"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"

Expand All @@ -42,6 +43,8 @@ type sfxClientBase struct {
zippers sync.Pool
}

var metricsMarshaler = otlp.NewJSONMetricsMarshaler()

// avoid attempting to compress things that fit into a single ethernet frame
func (s *sfxClientBase) getReader(b []byte) (io.Reader, bool, error) {
var err error
Expand All @@ -64,6 +67,7 @@ func (s *sfxClientBase) getReader(b []byte) (io.Reader, bool, error) {
// sfxDPClient sends the data to the SignalFx backend.
type sfxDPClient struct {
sfxClientBase
logDataPoints bool
logger *zap.Logger
accessTokenPassthrough bool
converter *translation.MetricsConverter
Expand All @@ -78,6 +82,15 @@ func (s *sfxDPClient) pushMetricsData(
return 0, nil
}

if s.logDataPoints {
buf, err := metricsMarshaler.MarshalMetrics(md)
dloucasfx marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
s.logger.Error("Failed to marshal metrics for logging", zap.Error(err))
} else {
s.logger.Debug("received metrics", zap.String("pdata", string(buf)))
}
}

// All metrics in the pdata.Metrics will have the same access token because of the BatchPerResourceMetrics.
metricToken := s.retrieveAccessToken(rms.At(0))

Expand All @@ -86,7 +99,11 @@ func (s *sfxDPClient) pushMetricsData(
for i := 0; i < rms.Len(); i++ {
sfxDataPoints = append(sfxDataPoints, s.converter.MetricDataToSignalFxV2(rms.At(i))...)
}

if s.logDataPoints {
for _, dp := range sfxDataPoints {
s.logger.Debug("Dispatching SFx datapoint", zap.String("dp", translation.DatapointToString(dp)))
}
}
return s.pushMetricsDataForToken(ctx, sfxDataPoints, metricToken)
}

Expand Down
2 changes: 2 additions & 0 deletions exporter/signalfxexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type exporterOptions struct {
apiURL *url.URL
httpTimeout time.Duration
token string
logDataPoints bool
logDimUpdate bool
metricTranslator *translation.MetricTranslator
}
Expand Down Expand Up @@ -109,6 +110,7 @@ func newSignalFxExporter(
},
zippers: newGzipPool(),
},
logDataPoints: options.logDataPoints,
logger: logger,
accessTokenPassthrough: config.AccessTokenPassthrough,
converter: converter,
Expand Down
31 changes: 31 additions & 0 deletions exporter/signalfxexporter/internal/translation/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ func (c *MetricsConverter) metricToSfxDataPoints(metric pdata.Metric, extraDimen
dps[resultSliceLen] = dp
}
resultSliceLen++
} else {
c.logger.Debug("Datapoint does not match filter, skipping", zap.String("dp", DatapointToString(dp)))
}
}
dps = dps[:resultSliceLen]
Expand Down Expand Up @@ -502,3 +504,32 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger {
})
return logger.WithOptions(opts)
}

func DatapointToString(dp *sfxpb.DataPoint) string {
var tsStr string
if dp.Timestamp != 0 {
tsStr = strconv.FormatInt(dp.Timestamp, 10)
}

var dimsStr string
for _, dim := range dp.Dimensions {
dimsStr = dimsStr + dim.String()
}

return fmt.Sprintf("%s: %s (%s) %s\n%s", dp.Metric, dp.Value.String(), dpTypeToString(*dp.MetricType), tsStr, dimsStr)
}

func dpTypeToString(t sfxpb.MetricType) string {
switch t {
case sfxpb.MetricType_GAUGE:
return "Gauge"
case sfxpb.MetricType_COUNTER:
return "Counter"
case sfxpb.MetricType_ENUM:
return "Enum"
case sfxpb.MetricType_CUMULATIVE_COUNTER:
return "Cumulative Counter"
default:
return fmt.Sprintf("unsupported type %d", t)
}
}