From 20341056f59e0bdc4c95fa753ad93ab64a905737 Mon Sep 17 00:00:00 2001 From: Nick Fischer Date: Mon, 29 Jun 2020 14:10:59 -0700 Subject: [PATCH] Add DogStatsD parser and UDP server transport --- .../protocol/dogstatsd_parser.go | 23 ++++ receiver/statsdreceiver/protocol/parser.go | 9 ++ receiver/statsdreceiver/receiver.go | 62 ++++----- receiver/statsdreceiver/transport/server.go | 43 ++++++ .../statsdreceiver/transport/udp_server.go | 124 ++++++++++++++++++ 5 files changed, 225 insertions(+), 36 deletions(-) create mode 100644 receiver/statsdreceiver/protocol/dogstatsd_parser.go create mode 100644 receiver/statsdreceiver/protocol/parser.go create mode 100644 receiver/statsdreceiver/transport/server.go create mode 100644 receiver/statsdreceiver/transport/udp_server.go diff --git a/receiver/statsdreceiver/protocol/dogstatsd_parser.go b/receiver/statsdreceiver/protocol/dogstatsd_parser.go new file mode 100644 index 000000000000..f28632f30151 --- /dev/null +++ b/receiver/statsdreceiver/protocol/dogstatsd_parser.go @@ -0,0 +1,23 @@ +package protocol + +import ( + "errors" + "strings" + + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" +) + +type DogStatsDParser struct{} + +func (p *DogStatsDParser) Parse(line string) (*metricspb.Metric, error) { + parts := strings.Split(line, ":") + if len(parts) < 2 { + return nil, errors.New("not enough statsd message parts") + } + + return &metricspb.Metric{ + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: parts[0], + }, + }, nil +} diff --git a/receiver/statsdreceiver/protocol/parser.go b/receiver/statsdreceiver/protocol/parser.go new file mode 100644 index 000000000000..5c1fa487b85d --- /dev/null +++ b/receiver/statsdreceiver/protocol/parser.go @@ -0,0 +1,9 @@ +package protocol + +import ( + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" +) + +type Parser interface { + Parse(in string) (*metricspb.Metric, error) +} diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 86540ff19883..53d353977968 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -3,14 +3,16 @@ package statsdreceiver import ( "context" "errors" - "fmt" - "net/http" "sync" "time" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/transport" ) var ( @@ -24,11 +26,13 @@ var _ component.MetricsReceiver = (*statsdReceiver)(nil) // statsdReceiver implements the component.MetricsReceiver for StatsD protocol. type statsdReceiver struct { sync.Mutex - logger *zap.Logger - addr string - server *http.Server - defaultAttrsPrefix string - nextConsumer consumer.MetricsConsumerOld + logger *zap.Logger + addr string + // config *Config + + server transport.Server + parser protocol.Parser + nextConsumer consumer.MetricsConsumerOld startOnce sync.Once stopOnce sync.Once @@ -44,16 +48,21 @@ func New( return nil, errNilNextConsumer } + if addr == "" { + addr = "localhost:8125" + } + + server, err := transport.NewUDPServer(addr) + if err != nil { + return nil, err + } + r := &statsdReceiver{ logger: logger, addr: addr, + parser: &protocol.DogStatsDParser{}, nextConsumer: nextConsumer, - } - r.server = &http.Server{ - Addr: addr, - Handler: r, - ReadTimeout: timeout, - WriteTimeout: timeout, + server: server, } return r, nil } @@ -63,13 +72,13 @@ func (ddr *statsdReceiver) Start(_ context.Context, host component.Host) error { ddr.Lock() defer ddr.Unlock() - err := errAlreadyStarted + err := componenterror.ErrAlreadyStarted ddr.startOnce.Do(func() { err = nil go func() { - err = ddr.server.ListenAndServe() + err = ddr.server.ListenAndServe(ddr.parser, ddr.nextConsumer) if err != nil { - host.ReportFatalError(fmt.Errorf("error starting statsd receiver: %v", err)) + host.ReportFatalError(err) } }() }) @@ -84,26 +93,7 @@ func (ddr *statsdReceiver) Shutdown(context.Context) error { var err = errAlreadyStopped ddr.stopOnce.Do(func() { - err = ddr.server.Shutdown(context.Background()) + err = ddr.server.Close() }) return err } - -// ServeHTTP acts as the default and only HTTP handler for the StatsD receiver. -func (ddr *statsdReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - w.WriteHeader(http.StatusBadRequest) - return - } - - w.Write([]byte("OK")) -} - -func (ddr *statsdReceiver) handleHTTPErr(w http.ResponseWriter, err error, msg string) { - w.WriteHeader(http.StatusBadRequest) - ddr.logger.Error(msg, zap.Error(err)) - _, err = w.Write([]byte(msg)) - if err != nil { - ddr.logger.Error("error writing to response writer", zap.Error(err)) - } -} diff --git a/receiver/statsdreceiver/transport/server.go b/receiver/statsdreceiver/transport/server.go new file mode 100644 index 000000000000..bc35b1e34fce --- /dev/null +++ b/receiver/statsdreceiver/transport/server.go @@ -0,0 +1,43 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport + +import ( + "errors" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol" + "go.opentelemetry.io/collector/consumer" +) + +var ( + errNilListenAndServeParameters = errors.New( + "no parameter of ListenAndServe can be nil") +) + +// Server abstracts the type of transport being used and offer an +// interface to handle serving clients over that transport. +type Server interface { + // ListenAndServe is a blocking call that starts to listen for client messages + // on the specific transport, and prepares the message to be processed by + // the Parser and passed to the next consumer. + ListenAndServe( + p protocol.Parser, + mc consumer.MetricsConsumerOld, + ) error + + // Close stops any running ListenAndServe, however, it waits for any + // data already received to be parsed and sent to the next consumer. + Close() error +} diff --git a/receiver/statsdreceiver/transport/udp_server.go b/receiver/statsdreceiver/transport/udp_server.go new file mode 100644 index 000000000000..fdab2b516243 --- /dev/null +++ b/receiver/statsdreceiver/transport/udp_server.go @@ -0,0 +1,124 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport + +import ( + "bytes" + "context" + "io" + "net" + "strings" + "sync" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerdata" + + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol" +) + +type udpServer struct { + wg sync.WaitGroup + packetConn net.PacketConn +} + +var _ (Server) = (*udpServer)(nil) + +// NewUDPServer creates a transport.Server using UDP as its transport. +func NewUDPServer(addr string) (Server, error) { + packetConn, err := net.ListenPacket("udp", addr) + if err != nil { + return nil, err + } + + u := udpServer{ + packetConn: packetConn, + } + return &u, nil +} + +func (u *udpServer) ListenAndServe( + parser protocol.Parser, + nextConsumer consumer.MetricsConsumerOld, +) error { + if parser == nil || nextConsumer == nil { + return errNilListenAndServeParameters + } + + buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6) + for { + n, _, err := u.packetConn.ReadFrom(buf) + if n > 0 { + u.wg.Add(1) + bufCopy := make([]byte, n) + copy(bufCopy, buf) + go func() { + u.handlePacket(parser, nextConsumer, bufCopy) + u.wg.Done() + }() + } + if err != nil { + if netErr, ok := err.(net.Error); ok { + if netErr.Temporary() { + continue + } + } + return err + } + } +} + +func (u *udpServer) Close() error { + err := u.packetConn.Close() + u.wg.Wait() + return err +} + +func (u *udpServer) handlePacket( + p protocol.Parser, + nextConsumer consumer.MetricsConsumerOld, + data []byte, +) { + ctx := context.Background() + var numReceivedTimeseries, numInvalidTimeseries int + var metrics []*metricspb.Metric + buf := bytes.NewBuffer(data) + for { + bytes, err := buf.ReadBytes((byte)('\n')) + if err == io.EOF { + if len(bytes) == 0 { + // Completed without errors. + break + } + } + line := strings.TrimSpace(string(bytes)) + if line != "" { + numReceivedTimeseries++ + metric, err := p.Parse(line) + if err != nil { + numInvalidTimeseries++ + continue + } + + metrics = append(metrics, metric) + } + } + + md := consumerdata.MetricsData{ + Metrics: metrics, + } + // TODO: handle error? + nextConsumer.ConsumeMetricsData(ctx, md) +}