Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parsing for labels and guages in statsdreceiver #903

Merged
merged 7 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions receiver/statsdreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ General format is:

`<name>:<value>|c|@<sample-rate>|#<tag1-key>:<tag1-value>`

<!-- ### Gauge
### Gauge

`<name>:<value>|g|@<sample-rate>|#<tag1-key>:<tag1-value>`

### Timer/Histogram
<!-- ### Timer/Histogram

`<name>:<value>|<ms/h>|@<sample-rate>|#<tag1-key>:<tag1-value>` -->

Expand Down Expand Up @@ -66,4 +66,4 @@ service:

A simple way to send a metric to `localhost:8125`:

`echo "test.metric:1|c" | nc -w 1 -u localhost 8125`
`echo "test.metric:42|c|#myKey:myVal" | nc -w 1 -u localhost 8125`
168 changes: 137 additions & 31 deletions receiver/statsdreceiver/protocol/statsd_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,45 @@ var (
errEmptyMetricValue = errors.New("empty metric value")
)

func getSupportedTypes() []string {
return []string{"c", "g"}
}

// StatsDParser supports the Parse method for parsing StatsD messages with Tags.
type StatsDParser struct{}

type statsDMetric struct {
name string
value string
statsdMetricType string
metricType metricspb.MetricDescriptor_Type
sampleRate float64
labelKeys []*metricspb.LabelKey
labelValues []*metricspb.LabelValue
}

var timeNowFunc = func() int64 {
return time.Now().Unix()
}

// Parse returns an OTLP metric representation of the input StatsD string.
func (p *StatsDParser) Parse(line string) (*metricspb.Metric, error) {
parsedMetric, err := parseMessageToMetric(line)
if err != nil {
return nil, err
}

metricPoint, err := buildPoint(parsedMetric)
if err != nil {
return nil, err
}

return buildMetric(parsedMetric, metricPoint), nil
}

func parseMessageToMetric(line string) (*statsDMetric, error) {
result := &statsDMetric{}

parts := strings.Split(line, "|")
if len(parts) < 2 {
return nil, fmt.Errorf("invalid message format: %s", line)
Expand All @@ -45,35 +79,78 @@ func (p *StatsDParser) Parse(line string) (*metricspb.Metric, error) {
return nil, fmt.Errorf("invalid <name>:<value> format: %s", parts[0])
}

metricName := parts[0][0:separatorIndex]
if metricName == "" {
result.name = parts[0][0:separatorIndex]
if result.name == "" {
return nil, errEmptyMetricName
}
metricValueString := parts[0][separatorIndex+1:]
if metricValueString == "" {
result.value = parts[0][separatorIndex+1:]
if result.value == "" {
return nil, errEmptyMetricValue
}

metricType := parts[1]
result.statsdMetricType = parts[1]
sonofachamp marked this conversation as resolved.
Show resolved Hide resolved
if !contains(getSupportedTypes(), result.statsdMetricType) {
return nil, fmt.Errorf("unsupported metric type: %s", result.statsdMetricType)
}

// TODO: add sample rate and tag parsing
additionalParts := parts[2:]
for _, part := range additionalParts {
// TODO: Sample rate doesn't currently have a place to go in the protocol
if strings.HasPrefix(part, "@") {
sampleRateStr := strings.TrimPrefix(part, "@")

metricPoint, err := buildPoint(metricType, metricValueString)
if err != nil {
return nil, err
f, err := strconv.ParseFloat(sampleRateStr, 64)
if err != nil {
return nil, fmt.Errorf("parse sample rate: %s", sampleRateStr)
}

result.sampleRate = f
} else if strings.HasPrefix(part, "#") {
tagsStr := strings.TrimPrefix(part, "#")

tagSets := strings.Split(tagsStr, ",")

result.labelKeys = make([]*metricspb.LabelKey, 0, len(tagSets))
result.labelValues = make([]*metricspb.LabelValue, 0, len(tagSets))

for _, tagSet := range tagSets {
tagParts := strings.Split(tagSet, ":")
if len(tagParts) != 2 {
return nil, fmt.Errorf("invalid tag format: %s", tagParts)
}
result.labelKeys = append(result.labelKeys, &metricspb.LabelKey{Key: tagParts[0]})
result.labelValues = append(result.labelValues, &metricspb.LabelValue{
Value: tagParts[1],
HasValue: true,
})
}
} else {
return nil, fmt.Errorf("unrecognized message part: %s", part)
}
}

return buildMetric(metricName, metricPoint), nil
return result, nil
}

func buildMetric(metricName string, point *metricspb.Point) *metricspb.Metric {
func contains(slice []string, element string) bool {
for _, val := range slice {
if val == element {
return true
}
}
return false
}

func buildMetric(metric *statsDMetric, point *metricspb.Point) *metricspb.Metric {
return &metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: metricName,
Type: metricspb.MetricDescriptor_GAUGE_INT64,
Name: metric.name,
Type: metric.metricType,
LabelKeys: metric.labelKeys,
},
Timeseries: []*metricspb.TimeSeries{
{
LabelValues: metric.labelValues,
Points: []*metricspb.Point{
point,
},
Expand All @@ -82,30 +159,59 @@ func buildMetric(metricName string, point *metricspb.Point) *metricspb.Metric {
}
}

func buildPoint(metricType, metricValue string) (*metricspb.Point, error) {
point := &metricspb.Point{
Timestamp: &timestamppb.Timestamp{
Seconds: time.Now().Unix(),
},
func buildPoint(parsedMetric *statsDMetric) (*metricspb.Point, error) {
now := &timestamppb.Timestamp{
Seconds: timeNowFunc(),
}

switch metricType {
switch parsedMetric.statsdMetricType {
case "c":
// TODO: support both Int64 and Double values.
i, err := strconv.ParseInt(metricValue, 10, 64)
if err != nil {
f, err := strconv.ParseFloat(metricValue, 64)
if err != nil {
return nil, fmt.Errorf("parse metric value string: %s", metricValue)
return buildGaugeOrCounterPoint(parsedMetric, now, func(parsedMetric *statsDMetric, isDouble bool) {
if isDouble {
parsedMetric.metricType = metricspb.MetricDescriptor_CUMULATIVE_DOUBLE
return
}
parsedMetric.metricType = metricspb.MetricDescriptor_CUMULATIVE_INT64
})
case "g":
return buildGaugeOrCounterPoint(parsedMetric, now, func(parsedMetric *statsDMetric, isDouble bool) {
if isDouble {
parsedMetric.metricType = metricspb.MetricDescriptor_GAUGE_DOUBLE
return
}
i = int64(f)
parsedMetric.metricType = metricspb.MetricDescriptor_GAUGE_INT64
})
}

return nil, fmt.Errorf("unhandled metric type: %s", parsedMetric.statsdMetricType)
}

func buildGaugeOrCounterPoint(parsedMetric *statsDMetric, now *timestamppb.Timestamp,
metricTypeSetter func(parsedMetric *statsDMetric, isDouble bool)) (*metricspb.Point, error) {
var point *metricspb.Point
var isDouble bool

i, err := strconv.ParseInt(parsedMetric.value, 10, 64)
if err != nil {
f, err := strconv.ParseFloat(parsedMetric.value, 64)
if err != nil {
return nil, fmt.Errorf("parse metric value string: %s", parsedMetric.value)
}
point.Value = &metricspb.Point_Int64Value{
Int64Value: i,
point = &metricspb.Point{
Timestamp: now,
Value: &metricspb.Point_DoubleValue{
DoubleValue: f,
},
}
isDouble = true
} else {
point = &metricspb.Point{
Timestamp: now,
Value: &metricspb.Point_Int64Value{
Int64Value: i,
},
}
default:
return nil, fmt.Errorf("unhandled metric type: %s", metricType)
}

metricTypeSetter(parsedMetric, isDouble)
return point, nil
}
Loading