Skip to content

Commit

Permalink
Remaining unmarshaler components refactored (#4083)
Browse files Browse the repository at this point in the history
* refactor otelcol.reeiver.jaeger to not implement river.Unmarshaler

Signed-off-by: erikbaranowski <[email protected]>

* refactor river.Unmarshaler implementation from otelcol.auth.sigv4

Signed-off-by: erikbaranowski <[email protected]>

---------

Signed-off-by: erikbaranowski <[email protected]>
  • Loading branch information
erikbaranowski authored Jun 7, 2023
1 parent 91f69c7 commit 038ca0a
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 82 deletions.
13 changes: 3 additions & 10 deletions component/otelcol/auth/sigv4/sigv4.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sigv4
import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol/auth"
"github.com/grafana/agent/pkg/river"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/sigv4authextension"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
Expand All @@ -30,8 +29,7 @@ type Arguments struct {
}

var (
_ river.Unmarshaler = (*Arguments)(nil)
_ auth.Arguments = Arguments{}
_ auth.Arguments = Arguments{}
)

// Convert implements auth.Arguments.
Expand All @@ -50,13 +48,8 @@ func (args Arguments) Convert() (otelconfig.Extension, error) {
return &res, nil
}

// UnmarshalRiver implements river.Unmarshaler.
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
type arguments Arguments
if err := f((*arguments)(args)); err != nil {
return err
}

// Validate implements river.Validator.
func (args Arguments) Validate() error {
_, err := args.Convert()
return err
}
Expand Down
172 changes: 104 additions & 68 deletions component/otelcol/receiver/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/receiver"
"github.com/grafana/agent/pkg/river"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
otelconfiggrpc "go.opentelemetry.io/collector/config/configgrpc"
otelconfighttp "go.opentelemetry.io/collector/config/confighttp"
)

func init() {
Expand All @@ -37,72 +38,10 @@ type Arguments struct {
}

var (
_ river.Unmarshaler = (*Arguments)(nil)
_ receiver.Arguments = Arguments{}
)

// DefaultArguments provides default settings for Arguments. All protocols are
// configured with defaults and then set to nil in UnmarshalRiver if they were
// not defined in the source config.
var DefaultArguments = Arguments{
Protocols: ProtocolsArguments{
GRPC: &otelcol.GRPCServerArguments{
Endpoint: "0.0.0.0:14250",
Transport: "tcp",
},
ThriftHTTP: &otelcol.HTTPServerArguments{
Endpoint: "0.0.0.0:14268",
},
ThriftBinary: &ProtocolUDP{
Endpoint: "0.0.0.0:6832",
QueueSize: 1_000,
MaxPacketSize: 65 * units.KiB,
Workers: 10,
},
ThriftCompact: &ProtocolUDP{
Endpoint: "0.0.0.0:6831",
QueueSize: 1_000,
MaxPacketSize: 65 * units.KiB,
Workers: 10,
},
},
}

// UnmarshalRiver implements river.Unmarshaler.
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
*args = DefaultArguments

type arguments Arguments

// Unmarshal into a temporary struct so we can detect which protocols were
// actually enabled by the user.
var temp arguments
if err := f(&temp); err != nil {
return err
}

// Remove protocols from args if they weren't provided by the user.
if temp.Protocols.GRPC == nil {
args.Protocols.GRPC = nil
}
if temp.Protocols.ThriftHTTP == nil {
args.Protocols.ThriftHTTP = nil
}
if temp.Protocols.ThriftBinary == nil {
args.Protocols.ThriftBinary = nil
}
if temp.Protocols.ThriftCompact == nil {
args.Protocols.ThriftCompact = nil
}

// Finally, unmarshal into the real struct.
if err := f((*arguments)(args)); err != nil {
return err
}
return args.Validate()
}

// Validate returns an error if args is invalid.
// Validate implements river.Validator.
func (args *Arguments) Validate() error {
if args.Protocols.GRPC == nil &&
args.Protocols.ThriftHTTP == nil &&
Expand Down Expand Up @@ -150,10 +89,55 @@ func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
// ProtocolsArguments configures protocols for otelcol.receiver.jaeger to
// listen on.
type ProtocolsArguments struct {
GRPC *otelcol.GRPCServerArguments `river:"grpc,block,optional"`
ThriftHTTP *otelcol.HTTPServerArguments `river:"thrift_http,block,optional"`
ThriftBinary *ProtocolUDP `river:"thrift_binary,block,optional"`
ThriftCompact *ProtocolUDP `river:"thrift_compact,block,optional"`
GRPC *GRPC `river:"grpc,block,optional"`
ThriftHTTP *ThriftHTTP `river:"thrift_http,block,optional"`
ThriftBinary *ThriftBinary `river:"thrift_binary,block,optional"`
ThriftCompact *ThriftCompact `river:"thrift_compact,block,optional"`
}

type GRPC struct {
GRPCServerArguments *otelcol.GRPCServerArguments `river:",squash"`
}

// SetToDefault implements river.Defaulter.
func (args *GRPC) SetToDefault() {
*args = GRPC{
GRPCServerArguments: &otelcol.GRPCServerArguments{
Endpoint: "0.0.0.0:14250",
Transport: "tcp",
},
}
}

// Convert converts proto into the upstream type.
func (args *GRPC) Convert() *otelconfiggrpc.GRPCServerSettings {
if args == nil {
return nil
}

return args.GRPCServerArguments.Convert()
}

type ThriftHTTP struct {
HTTPServerArguments *otelcol.HTTPServerArguments `river:",squash"`
}

// SetToDefault implements river.Defaulter.
func (args *ThriftHTTP) SetToDefault() {
*args = ThriftHTTP{
HTTPServerArguments: &otelcol.HTTPServerArguments{
Endpoint: "0.0.0.0:14268",
},
}
}

// Convert converts proto into the upstream type.
func (args *ThriftHTTP) Convert() *otelconfighttp.HTTPServerSettings {
if args == nil {
return nil
}

return args.HTTPServerArguments.Convert()
}

// ProtocolUDP configures a UDP server.
Expand Down Expand Up @@ -182,6 +166,58 @@ func (proto *ProtocolUDP) Convert() *jaegerreceiver.ProtocolUDP {
}
}

// ThriftCompact wraps ProtocolUDP and provides additional behavior.
type ThriftCompact struct {
ProtocolUDP *ProtocolUDP `river:",squash"`
}

// SetToDefault implements river.Defaulter.
func (args *ThriftCompact) SetToDefault() {
*args = ThriftCompact{
ProtocolUDP: &ProtocolUDP{
Endpoint: "0.0.0.0:6831",
QueueSize: 1_000,
MaxPacketSize: 65 * units.KiB,
Workers: 10,
},
}
}

// Convert converts proto into the upstream type.
func (args *ThriftCompact) Convert() *jaegerreceiver.ProtocolUDP {
if args == nil {
return nil
}

return args.ProtocolUDP.Convert()
}

// ThriftCompact wraps ProtocolUDP and provides additional behavior.
type ThriftBinary struct {
ProtocolUDP *ProtocolUDP `river:",squash"`
}

// SetToDefault implements river.Defaulter.
func (args *ThriftBinary) SetToDefault() {
*args = ThriftBinary{
ProtocolUDP: &ProtocolUDP{
Endpoint: "0.0.0.0:6832",
QueueSize: 1_000,
MaxPacketSize: 65 * units.KiB,
Workers: 10,
},
}
}

// Convert converts proto into the upstream type.
func (args *ThriftBinary) Convert() *jaegerreceiver.ProtocolUDP {
if args == nil {
return nil
}

return args.ProtocolUDP.Convert()
}

// RemoteSamplingArguments configures remote sampling settings.
type RemoteSamplingArguments struct {
// TODO(rfratto): can we work with upstream to provide a hook to provide a
Expand Down
20 changes: 16 additions & 4 deletions component/otelcol/receiver/jaeger/jaeger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ func TestArguments_UnmarshalRiver(t *testing.T) {
var args jaeger.Arguments
require.NoError(t, river.Unmarshal([]byte(in), &args))

require.Equal(t, jaeger.DefaultArguments.Protocols.GRPC, args.Protocols.GRPC)
defaults := &jaeger.GRPC{}
defaults.SetToDefault()

require.Equal(t, defaults, args.Protocols.GRPC)
require.Nil(t, args.Protocols.ThriftHTTP)
require.Nil(t, args.Protocols.ThriftBinary)
require.Nil(t, args.Protocols.ThriftCompact)
Expand All @@ -73,8 +76,11 @@ func TestArguments_UnmarshalRiver(t *testing.T) {
var args jaeger.Arguments
require.NoError(t, river.Unmarshal([]byte(in), &args))

defaults := &jaeger.ThriftHTTP{}
defaults.SetToDefault()

require.Nil(t, args.Protocols.GRPC)
require.Equal(t, jaeger.DefaultArguments.Protocols.ThriftHTTP, args.Protocols.ThriftHTTP)
require.Equal(t, defaults, args.Protocols.ThriftHTTP)
require.Nil(t, args.Protocols.ThriftBinary)
require.Nil(t, args.Protocols.ThriftCompact)
})
Expand All @@ -88,9 +94,12 @@ func TestArguments_UnmarshalRiver(t *testing.T) {
var args jaeger.Arguments
require.NoError(t, river.Unmarshal([]byte(in), &args))

defaults := &jaeger.ThriftBinary{}
defaults.SetToDefault()

require.Nil(t, args.Protocols.GRPC)
require.Nil(t, args.Protocols.ThriftHTTP)
require.Equal(t, jaeger.DefaultArguments.Protocols.ThriftBinary, args.Protocols.ThriftBinary)
require.Equal(t, defaults, args.Protocols.ThriftBinary)
require.Nil(t, args.Protocols.ThriftCompact)
})

Expand All @@ -103,10 +112,13 @@ func TestArguments_UnmarshalRiver(t *testing.T) {
var args jaeger.Arguments
require.NoError(t, river.Unmarshal([]byte(in), &args))

defaults := &jaeger.ThriftCompact{}
defaults.SetToDefault()

require.Nil(t, args.Protocols.GRPC)
require.Nil(t, args.Protocols.ThriftHTTP)
require.Nil(t, args.Protocols.ThriftBinary)
require.Equal(t, jaeger.DefaultArguments.Protocols.ThriftCompact, args.Protocols.ThriftCompact)
require.Equal(t, defaults, args.Protocols.ThriftCompact)
})
}

Expand Down

0 comments on commit 038ca0a

Please sign in to comment.