Skip to content

Commit

Permalink
Add DogStatsD parser and UDP server transport
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Fischer committed Jun 29, 2020
1 parent 01393f6 commit 2034105
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 36 deletions.
23 changes: 23 additions & 0 deletions receiver/statsdreceiver/protocol/dogstatsd_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package protocol

import (
"errors"
"strings"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
)

type DogStatsDParser struct{}

func (p *DogStatsDParser) Parse(line string) (*metricspb.Metric, error) {
parts := strings.Split(line, ":")
if len(parts) < 2 {
return nil, errors.New("not enough statsd message parts")
}

return &metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: parts[0],
},
}, nil
}
9 changes: 9 additions & 0 deletions receiver/statsdreceiver/protocol/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package protocol

import (
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
)

type Parser interface {
Parse(in string) (*metricspb.Metric, error)
}
62 changes: 26 additions & 36 deletions receiver/statsdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ package statsdreceiver
import (
"context"
"errors"
"fmt"
"net/http"
"sync"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/transport"
)

var (
Expand All @@ -24,11 +26,13 @@ var _ component.MetricsReceiver = (*statsdReceiver)(nil)
// statsdReceiver implements the component.MetricsReceiver for StatsD protocol.
type statsdReceiver struct {
sync.Mutex
logger *zap.Logger
addr string
server *http.Server
defaultAttrsPrefix string
nextConsumer consumer.MetricsConsumerOld
logger *zap.Logger
addr string
// config *Config

server transport.Server
parser protocol.Parser
nextConsumer consumer.MetricsConsumerOld

startOnce sync.Once
stopOnce sync.Once
Expand All @@ -44,16 +48,21 @@ func New(
return nil, errNilNextConsumer
}

if addr == "" {
addr = "localhost:8125"
}

server, err := transport.NewUDPServer(addr)
if err != nil {
return nil, err
}

r := &statsdReceiver{
logger: logger,
addr: addr,
parser: &protocol.DogStatsDParser{},
nextConsumer: nextConsumer,
}
r.server = &http.Server{
Addr: addr,
Handler: r,
ReadTimeout: timeout,
WriteTimeout: timeout,
server: server,
}
return r, nil
}
Expand All @@ -63,13 +72,13 @@ func (ddr *statsdReceiver) Start(_ context.Context, host component.Host) error {
ddr.Lock()
defer ddr.Unlock()

err := errAlreadyStarted
err := componenterror.ErrAlreadyStarted
ddr.startOnce.Do(func() {
err = nil
go func() {
err = ddr.server.ListenAndServe()
err = ddr.server.ListenAndServe(ddr.parser, ddr.nextConsumer)
if err != nil {
host.ReportFatalError(fmt.Errorf("error starting statsd receiver: %v", err))
host.ReportFatalError(err)
}
}()
})
Expand All @@ -84,26 +93,7 @@ func (ddr *statsdReceiver) Shutdown(context.Context) error {

var err = errAlreadyStopped
ddr.stopOnce.Do(func() {
err = ddr.server.Shutdown(context.Background())
err = ddr.server.Close()
})
return err
}

// ServeHTTP acts as the default and only HTTP handler for the StatsD receiver.
func (ddr *statsdReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusBadRequest)
return
}

w.Write([]byte("OK"))
}

func (ddr *statsdReceiver) handleHTTPErr(w http.ResponseWriter, err error, msg string) {
w.WriteHeader(http.StatusBadRequest)
ddr.logger.Error(msg, zap.Error(err))
_, err = w.Write([]byte(msg))
if err != nil {
ddr.logger.Error("error writing to response writer", zap.Error(err))
}
}
43 changes: 43 additions & 0 deletions receiver/statsdreceiver/transport/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package transport

import (
"errors"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol"
"go.opentelemetry.io/collector/consumer"
)

var (
errNilListenAndServeParameters = errors.New(
"no parameter of ListenAndServe can be nil")
)

// Server abstracts the type of transport being used and offer an
// interface to handle serving clients over that transport.
type Server interface {
// ListenAndServe is a blocking call that starts to listen for client messages
// on the specific transport, and prepares the message to be processed by
// the Parser and passed to the next consumer.
ListenAndServe(
p protocol.Parser,
mc consumer.MetricsConsumerOld,
) error

// Close stops any running ListenAndServe, however, it waits for any
// data already received to be parsed and sent to the next consumer.
Close() error
}
124 changes: 124 additions & 0 deletions receiver/statsdreceiver/transport/udp_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package transport

import (
"bytes"
"context"
"io"
"net"
"strings"
"sync"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol"
)

type udpServer struct {
wg sync.WaitGroup
packetConn net.PacketConn
}

var _ (Server) = (*udpServer)(nil)

// NewUDPServer creates a transport.Server using UDP as its transport.
func NewUDPServer(addr string) (Server, error) {
packetConn, err := net.ListenPacket("udp", addr)
if err != nil {
return nil, err
}

u := udpServer{
packetConn: packetConn,
}
return &u, nil
}

func (u *udpServer) ListenAndServe(
parser protocol.Parser,
nextConsumer consumer.MetricsConsumerOld,
) error {
if parser == nil || nextConsumer == nil {
return errNilListenAndServeParameters
}

buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6)
for {
n, _, err := u.packetConn.ReadFrom(buf)
if n > 0 {
u.wg.Add(1)
bufCopy := make([]byte, n)
copy(bufCopy, buf)
go func() {
u.handlePacket(parser, nextConsumer, bufCopy)
u.wg.Done()
}()
}
if err != nil {
if netErr, ok := err.(net.Error); ok {
if netErr.Temporary() {
continue
}
}
return err
}
}
}

func (u *udpServer) Close() error {
err := u.packetConn.Close()
u.wg.Wait()
return err
}

func (u *udpServer) handlePacket(
p protocol.Parser,
nextConsumer consumer.MetricsConsumerOld,
data []byte,
) {
ctx := context.Background()
var numReceivedTimeseries, numInvalidTimeseries int
var metrics []*metricspb.Metric
buf := bytes.NewBuffer(data)
for {
bytes, err := buf.ReadBytes((byte)('\n'))
if err == io.EOF {
if len(bytes) == 0 {
// Completed without errors.
break
}
}
line := strings.TrimSpace(string(bytes))
if line != "" {
numReceivedTimeseries++
metric, err := p.Parse(line)
if err != nil {
numInvalidTimeseries++
continue
}

metrics = append(metrics, metric)
}
}

md := consumerdata.MetricsData{
Metrics: metrics,
}
// TODO: handle error?
nextConsumer.ConsumeMetricsData(ctx, md)
}

0 comments on commit 2034105

Please sign in to comment.