From 5e4ddd4210ab6c5dfdae3ba6ae27336b1bff50d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juraci=20Paix=C3=A3o=20Kr=C3=B6hling?= Date: Tue, 23 Mar 2021 14:56:20 +0100 Subject: [PATCH] Custom authenticator logic Fixes #2101 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juraci Paixão Kröhling --- config/configauth/README.md | 39 +++- config/configauth/authenticator.go | 44 ++--- config/configauth/authenticator_test.go | 35 +--- config/configauth/configauth.go | 74 ++++---- config/configauth/configauth_test.go | 66 +++---- config/configauth/context.go | 15 +- config/configauth/context_test.go | 8 +- config/configauth/mocks.go | 63 +++++++ config/configauth/mocks_test.go | 75 ++++++++ config/configgrpc/configgrpc.go | 6 +- config/configgrpc/configgrpc_test.go | 29 +-- extension/authoidcextension/README.md | 35 ++++ extension/authoidcextension/config.go | 46 +++++ .../authoidcextension/extension.go | 112 ++++++------ .../authoidcextension/extension_test.go | 173 ++++++++---------- extension/authoidcextension/factory.go | 52 ++++++ extension/authoidcextension/factory_test.go | 57 ++++++ .../authoidcextension}/oidc_server_test.go | 2 +- receiver/jaegerreceiver/factory.go | 6 +- receiver/jaegerreceiver/trace_receiver.go | 17 +- .../jaegerreceiver/trace_receiver_test.go | 13 +- receiver/opencensusreceiver/config.go | 12 +- receiver/opencensusreceiver/config_test.go | 22 --- receiver/opencensusreceiver/factory.go | 6 +- receiver/opencensusreceiver/opencensus.go | 66 ++++--- .../opencensusreceiver/opencensus_test.go | 29 +++ receiver/opencensusreceiver/options.go | 18 +- receiver/otlpreceiver/factory.go | 35 ++-- receiver/otlpreceiver/otlp.go | 41 +++-- receiver/otlpreceiver/otlp_test.go | 9 +- .../default_extensions_test.go | 5 +- service/defaultcomponents/defaults.go | 2 + 32 files changed, 771 insertions(+), 441 deletions(-) create mode 100644 config/configauth/mocks.go create mode 100644 config/configauth/mocks_test.go create mode 100644 extension/authoidcextension/README.md create mode 100644 extension/authoidcextension/config.go rename config/configauth/oidc_authenticator.go => extension/authoidcextension/extension.go (66%) rename config/configauth/oidc_authenticator_test.go => extension/authoidcextension/extension_test.go (79%) create mode 100644 extension/authoidcextension/factory.go create mode 100644 extension/authoidcextension/factory_test.go rename {config/configauth => extension/authoidcextension}/oidc_server_test.go (99%) diff --git a/config/configauth/README.md b/config/configauth/README.md index 18331b1a0fdc..8b3c435e1698 100644 --- a/config/configauth/README.md +++ b/config/configauth/README.md @@ -1,17 +1,36 @@ # Authentication configuration for receivers -This module allows server types, such as gRPC and HTTP, to be configured to perform authentication for requests and/or RPCs. Each server type is responsible for getting the request/RPC metadata and passing down to the authenticator. Currently, only bearer token authentication is supported, although the module is ready to accept new authenticators. +This module allows server types, such as gRPC and HTTP, to be configured to perform authentication for requests and/or RPCs. Each server type is responsible for getting the request/RPC metadata and passing down to the authenticator. + +The currently known authenticators: + +- [oidc](../../extension/authoidcextension) Examples: ```yaml +extensions: + oidc: + # see the blog post on securing the otelcol for information + # on how to setup an OIDC server and how to generate the TLS certs + # required for this example + # https://medium.com/opentelemetry/securing-your-opentelemetry-collector-1a4f9fa5bd6f + issuer_url: http://localhost:8080/auth/realms/opentelemetry + audience: account + receivers: - somereceiver: - grpc: - authentication: - attribute: authorization - oidc: - issuer_url: https://auth.example.com/ - issuer_ca_path: /etc/pki/tls/cert.pem - client_id: my-oidc-client - username_claim: email + otlp/with_auth: + protocols: + grpc: + endpoint: localhost:4318 + tls_settings: + cert_file: /tmp/certs/cert.pem + key_file: /tmp/certs/cert-key.pem + auth: + authenticator: oidc ``` + +## Creating an authenticator + +New authenticators can be added by creating a new extension that also implements the `configauth.Authenticator` extension. Generic authenticators that may be used by a good number of users might be accepted as part of the core distribution, or as part of the contrib distribution. If you have interest in contributing one authenticator, open an issue with your proposal. + +For other cases, you'll need to include your custom authenticator as part of your custom OpenTelemetry Collector, perhaps being built using the [OpenTelemetry Collector Builder](https://github.com/open-telemetry/opentelemetry-collector-builder). diff --git a/config/configauth/authenticator.go b/config/configauth/authenticator.go index 1c050ee38963..0a80a3c5f976 100644 --- a/config/configauth/authenticator.go +++ b/config/configauth/authenticator.go @@ -17,28 +17,30 @@ package configauth import ( "context" "errors" - "io" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + + "go.opentelemetry.io/collector/component" ) var ( - errNoOIDCProvided = errors.New("no OIDC information provided") errMetadataNotFound = errors.New("no request metadata found") - defaultAttribute = "authorization" ) -// Authenticator will authenticate the incoming request/RPC +// Authenticator is an Extension that can be used as an authenticator for the configauth.Authentication option. +// Authenticators are then included as part of OpenTelemetry Collector builds and can be referenced by their +// names from the Authentication configuration. Each Authenticator is free to define its own behavior and configuration options, +// but note that the expectations that come as part of Extensions exist here as well. For instance, multiple instances of the same +// authenticator should be possible to exist under different names. type Authenticator interface { - io.Closer + component.Extension // Authenticate checks whether the given context contains valid auth data. Successfully authenticated calls will always return a nil error and a context with the auth data. + // Implementations should add the derived subject (user, principal) to a new context built based on the given context under the key configauth.SubjectKey. + // If group/membership information is available, it should also be added, under the key configauth.GroupsKey. Authenticate(context.Context, map[string][]string) (context.Context, error) - // Start will - Start(context.Context) error - // UnaryInterceptor is a helper method to provide a gRPC-compatible UnaryInterceptor, typically calling the authenticator's Authenticate method. UnaryInterceptor(context.Context, interface{}, *grpc.UnaryServerInfo, grpc.UnaryHandler) (interface{}, error) @@ -46,24 +48,17 @@ type Authenticator interface { StreamInterceptor(interface{}, grpc.ServerStream, *grpc.StreamServerInfo, grpc.StreamHandler) error } -type authenticateFunc func(context.Context, map[string][]string) (context.Context, error) -type unaryInterceptorFunc func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, authenticate authenticateFunc) (interface{}, error) -type streamInterceptorFunc func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, authenticate authenticateFunc) error - -// NewAuthenticator creates an authenticator based on the given configuration -func NewAuthenticator(cfg Authentication) (Authenticator, error) { - if cfg.OIDC == nil { - return nil, errNoOIDCProvided - } +// AuthenticateFunc defines the signature for the function responsible for performing the authentication based on the context data. +type AuthenticateFunc func(context.Context, map[string][]string) (context.Context, error) - if len(cfg.Attribute) == 0 { - cfg.Attribute = defaultAttribute - } +// UnaryInterceptorFunc defines the signature for the function intercepting unary gRPC calls. +type UnaryInterceptorFunc func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, authenticate AuthenticateFunc) (interface{}, error) - return newOIDCAuthenticator(cfg) -} +// StreamInterceptorFunc defines the signature for hte function intercepting streaming gRPC calls. +type StreamInterceptorFunc func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, authenticate AuthenticateFunc) error -func defaultUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler, authenticate authenticateFunc) (interface{}, error) { +// DefaultUnaryInterceptor provides a default implementation of a unary inteceptor, useful for most authenticators. +func DefaultUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler, authenticate AuthenticateFunc) (interface{}, error) { headers, ok := metadata.FromIncomingContext(ctx) if !ok { return nil, errMetadataNotFound @@ -77,7 +72,8 @@ func defaultUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.Unary return handler(ctx, req) } -func defaultStreamInterceptor(srv interface{}, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler, authenticate authenticateFunc) error { +// DefaultStreamInterceptor provides a default implementation of a stream interceptor, useful for most authenticators. +func DefaultStreamInterceptor(srv interface{}, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler, authenticate AuthenticateFunc) error { ctx := stream.Context() headers, ok := metadata.FromIncomingContext(ctx) if !ok { diff --git a/config/configauth/authenticator_test.go b/config/configauth/authenticator_test.go index b148485285e9..98167c9f4160 100644 --- a/config/configauth/authenticator_test.go +++ b/config/configauth/authenticator_test.go @@ -24,29 +24,6 @@ import ( "google.golang.org/grpc/metadata" ) -func TestNewAuthenticator(t *testing.T) { - // test - p, err := NewAuthenticator(Authentication{ - OIDC: &OIDC{ - Audience: "some-audience", - IssuerURL: "http://example.com", - }, - }) - - // verify - assert.NotNil(t, p) - assert.NoError(t, err) -} - -func TestMissingOIDC(t *testing.T) { - // test - p, err := NewAuthenticator(Authentication{}) - - // verify - assert.Nil(t, p) - assert.Equal(t, errNoOIDCProvided, err) -} - func TestDefaultUnaryInterceptorAuthSucceeded(t *testing.T) { // prepare handlerCalled := false @@ -62,7 +39,7 @@ func TestDefaultUnaryInterceptorAuthSucceeded(t *testing.T) { ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("authorization", "some-auth-data")) // test - res, err := defaultUnaryInterceptor(ctx, nil, &grpc.UnaryServerInfo{}, handler, authFunc) + res, err := DefaultUnaryInterceptor(ctx, nil, &grpc.UnaryServerInfo{}, handler, authFunc) // verify assert.Nil(t, res) @@ -86,7 +63,7 @@ func TestDefaultUnaryInterceptorAuthFailure(t *testing.T) { ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("authorization", "some-auth-data")) // test - res, err := defaultUnaryInterceptor(ctx, nil, &grpc.UnaryServerInfo{}, handler, authFunc) + res, err := DefaultUnaryInterceptor(ctx, nil, &grpc.UnaryServerInfo{}, handler, authFunc) // verify assert.Nil(t, res) @@ -106,7 +83,7 @@ func TestDefaultUnaryInterceptorMissingMetadata(t *testing.T) { } // test - res, err := defaultUnaryInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{}, handler, authFunc) + res, err := DefaultUnaryInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{}, handler, authFunc) // verify assert.Nil(t, res) @@ -131,7 +108,7 @@ func TestDefaultStreamInterceptorAuthSucceeded(t *testing.T) { } // test - err := defaultStreamInterceptor(nil, streamServer, &grpc.StreamServerInfo{}, handler, authFunc) + err := DefaultStreamInterceptor(nil, streamServer, &grpc.StreamServerInfo{}, handler, authFunc) // verify assert.NoError(t, err) @@ -157,7 +134,7 @@ func TestDefaultStreamInterceptorAuthFailure(t *testing.T) { } // test - err := defaultStreamInterceptor(nil, streamServer, &grpc.StreamServerInfo{}, handler, authFunc) + err := DefaultStreamInterceptor(nil, streamServer, &grpc.StreamServerInfo{}, handler, authFunc) // verify assert.Equal(t, expectedErr, err) @@ -179,7 +156,7 @@ func TestDefaultStreamInterceptorMissingMetadata(t *testing.T) { } // test - err := defaultStreamInterceptor(nil, streamServer, &grpc.StreamServerInfo{}, handler, authFunc) + err := DefaultStreamInterceptor(nil, streamServer, &grpc.StreamServerInfo{}, handler, authFunc) // verify assert.Equal(t, errMetadataNotFound, err) diff --git a/config/configauth/configauth.go b/config/configauth/configauth.go index b76597ac901a..a27cf7a2e192 100644 --- a/config/configauth/configauth.go +++ b/config/configauth/configauth.go @@ -15,60 +15,50 @@ package configauth import ( - "context" + "errors" + "fmt" "google.golang.org/grpc" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" +) + +var ( + errAuthenticatorNotFound error = errors.New("authenticator not found") + errAuthenticatorNotProvided error = errors.New("authenticator not provided") ) // Authentication defines the auth settings for the receiver type Authentication struct { - // The attribute (header name) to look for auth data. Optional, default value: "authentication". - Attribute string `mapstructure:"attribute"` - - // OIDC configures this receiver to use the given OIDC provider as the backend for the authentication mechanism. - // Required. - OIDC *OIDC `mapstructure:"oidc"` + // Authenticator specifies the name of the extension to use in order to authenticate the incoming data point. + AuthenticatorName string `mapstructure:"authenticator"` } -// OIDC defines the OpenID Connect properties for this processor -type OIDC struct { - // IssuerURL is the base URL for the OIDC provider. - // Required. - IssuerURL string `mapstructure:"issuer_url"` - - // Audience of the token, used during the verification. - // For example: "https://accounts.google.com" or "https://login.salesforce.com". - // Required. - Audience string `mapstructure:"audience"` - - // The local path for the issuer CA's TLS server cert. - // Optional. - IssuerCAPath string `mapstructure:"issuer_ca_path"` - - // The claim to use as the username, in case the token's 'sub' isn't the suitable source. - // Optional. - UsernameClaim string `mapstructure:"username_claim"` - - // The claim that holds the subject's group membership information. - // Optional. - GroupsClaim string `mapstructure:"groups_claim"` -} - -// ToServerOptions builds a set of server options ready to be used by the gRPC server -func (a *Authentication) ToServerOptions() ([]grpc.ServerOption, error) { - auth, err := NewAuthenticator(*a) - if err != nil { - return nil, err +// ToServerOption builds a set of server options ready to be used by the gRPC server +func (a *Authentication) ToServerOption(ext map[config.NamedEntity]component.Extension) ([]grpc.ServerOption, error) { + if a.AuthenticatorName == "" { + return nil, errAuthenticatorNotProvided } - // perhaps we should use a timeout here? - // TODO: we need a hook to call auth.Close() - if err := auth.Start(context.Background()); err != nil { - return nil, err + authenticator := selectAuthenticator(ext, a.AuthenticatorName) + if authenticator == nil { + return nil, fmt.Errorf("failed to resolve authenticator %q: %w", a.AuthenticatorName, errAuthenticatorNotFound) } return []grpc.ServerOption{ - grpc.UnaryInterceptor(auth.UnaryInterceptor), - grpc.StreamInterceptor(auth.StreamInterceptor), + grpc.UnaryInterceptor(authenticator.UnaryInterceptor), + grpc.StreamInterceptor(authenticator.StreamInterceptor), }, nil } + +func selectAuthenticator(extensions map[config.NamedEntity]component.Extension, requested string) Authenticator { + for name, ext := range extensions { + if auth, ok := ext.(Authenticator); ok { + if name.Name() == requested { + return auth + } + } + } + return nil +} diff --git a/config/configauth/configauth_test.go b/config/configauth/configauth_test.go index c04efaf15d83..4a0f5d950b06 100644 --- a/config/configauth/configauth_test.go +++ b/config/configauth/configauth_test.go @@ -18,26 +18,25 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" ) func TestToServerOptions(t *testing.T) { // prepare - oidcServer, err := newOIDCServer() - require.NoError(t, err) - oidcServer.Start() - defer oidcServer.Close() - - config := Authentication{ - OIDC: &OIDC{ - IssuerURL: oidcServer.URL, - Audience: "unit-test", - GroupsClaim: "memberships", - }, + cfg := &Authentication{ + AuthenticatorName: "mock", + } + ext := map[config.NamedEntity]component.Extension{ + &config.ExtensionSettings{ + NameVal: "mock", + TypeVal: "mock", + }: &MockAuthenticator{}, } // test - opts, err := config.ToServerOptions() + opts, err := cfg.ToServerOption(ext) // verify assert.NoError(t, err) @@ -45,30 +44,33 @@ func TestToServerOptions(t *testing.T) { assert.Len(t, opts, 2) // we have two interceptors } -func TestInvalidConfigurationFailsOnToServerOptions(t *testing.T) { - - for _, tt := range []struct { - cfg Authentication +func TestToServerOptionFails(t *testing.T) { + testCases := []struct { + desc string + cfg *Authentication + ext map[config.NamedEntity]component.Extension + expected error }{ { - Authentication{}, + desc: "Authenticator not provided", + cfg: &Authentication{}, + ext: map[config.NamedEntity]component.Extension{}, + expected: errAuthenticatorNotProvided, }, { - Authentication{ - OIDC: &OIDC{ - IssuerURL: "http://oidc.acme.invalid", - Audience: "unit-test", - GroupsClaim: "memberships", - }, + desc: "Authenticator not found", + cfg: &Authentication{ + AuthenticatorName: "does-not-exist", }, + ext: map[config.NamedEntity]component.Extension{}, + expected: errAuthenticatorNotFound, }, - } { - // test - opts, err := tt.cfg.ToServerOptions() - - // verify - assert.Error(t, err) - assert.Nil(t, opts) } - + for _, tC := range testCases { + t.Run(tC.desc, func(t *testing.T) { + opts, err := tC.cfg.ToServerOption(tC.ext) + assert.ErrorIs(t, err, tC.expected) + assert.Nil(t, opts) + }) + } } diff --git a/config/configauth/context.go b/config/configauth/context.go index a7e9eb2376c8..3d5220f34013 100644 --- a/config/configauth/context.go +++ b/config/configauth/context.go @@ -17,21 +17,24 @@ package configauth import "context" var ( - subjectKey = subjectType{} - groupsKey = groupsType{} + // SubjectKey is the key to use when setting the subject (user, principal) information to the context. The value under this key should be a plain-text string. + SubjectKey = subjectType{} + + // GroupsKey is the key to use when setting the subject's membership to the context. The value under this key should be a slice of strings. + GroupsKey = groupsType{} ) type subjectType struct{} type groupsType struct{} -// SubjectFromContext returns a list of groups the subject in the context belongs to +// SubjectFromContext returns the subject behind this context. func SubjectFromContext(ctx context.Context) (string, bool) { - value, ok := ctx.Value(subjectKey).(string) + value, ok := ctx.Value(SubjectKey).(string) return value, ok } -// GroupsFromContext returns a list of groups the subject in the context belongs to +// GroupsFromContext returns a list of groups the subject in the context belongs to. func GroupsFromContext(ctx context.Context) ([]string, bool) { - value, ok := ctx.Value(groupsKey).([]string) + value, ok := ctx.Value(GroupsKey).([]string) return value, ok } diff --git a/config/configauth/context_test.go b/config/configauth/context_test.go index 61dec7ab0baf..270f4f90e2bd 100644 --- a/config/configauth/context_test.go +++ b/config/configauth/context_test.go @@ -23,7 +23,7 @@ import ( func TestSubjectFromContext(t *testing.T) { // prepare - ctx := context.WithValue(context.Background(), subjectKey, "my-subject") + ctx := context.WithValue(context.Background(), SubjectKey, "my-subject") // test sub, ok := SubjectFromContext(ctx) @@ -47,7 +47,7 @@ func TestSubjectFromContextNotPresent(t *testing.T) { func TestSubjectFromContextWrongType(t *testing.T) { // prepare - ctx := context.WithValue(context.Background(), subjectKey, 123) + ctx := context.WithValue(context.Background(), SubjectKey, 123) // test sub, ok := SubjectFromContext(ctx) @@ -59,7 +59,7 @@ func TestSubjectFromContextWrongType(t *testing.T) { func TestGroupsFromContext(t *testing.T) { // prepare - ctx := context.WithValue(context.Background(), groupsKey, []string{"my-groups"}) + ctx := context.WithValue(context.Background(), GroupsKey, []string{"my-groups"}) // test groups, ok := GroupsFromContext(ctx) @@ -83,7 +83,7 @@ func TestGroupsFromContextNotPresent(t *testing.T) { func TestGroupsFromContextWrongType(t *testing.T) { // prepare - ctx := context.WithValue(context.Background(), subjectKey, 123) + ctx := context.WithValue(context.Background(), SubjectKey, 123) // test sub, ok := GroupsFromContext(ctx) diff --git a/config/configauth/mocks.go b/config/configauth/mocks.go new file mode 100644 index 000000000000..39200718b8f8 --- /dev/null +++ b/config/configauth/mocks.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package configauth + +import ( + "context" + + "google.golang.org/grpc" + + "go.opentelemetry.io/collector/component" +) + +var ( + _ Authenticator = (*MockAuthenticator)(nil) + _ component.Extension = (*MockAuthenticator)(nil) +) + +// MockAuthenticator provides a testing mock for code dealing with authentication. +type MockAuthenticator struct { + // AuthenticateFunc to use during the authentication phase of this mock. Optional. + AuthenticateFunc AuthenticateFunc + // TODO: implement the other funcs +} + +// Authenticate executes the mock's AuthenticateFunc, if provided, or just returns the given context unchanged. +func (m *MockAuthenticator) Authenticate(ctx context.Context, headers map[string][]string) (context.Context, error) { + if m.AuthenticateFunc == nil { + return ctx, nil + } + return m.AuthenticateFunc(ctx, headers) +} + +// UnaryInterceptor isn't currently implemented and always returns nil. +func (m *MockAuthenticator) UnaryInterceptor(context.Context, interface{}, *grpc.UnaryServerInfo, grpc.UnaryHandler) (interface{}, error) { + return nil, nil +} + +// StreamInterceptor isn't currently implemented and always returns nil. +func (m *MockAuthenticator) StreamInterceptor(interface{}, grpc.ServerStream, *grpc.StreamServerInfo, grpc.StreamHandler) error { + return nil +} + +// Start isn't currently implemented and always returns nil. +func (m *MockAuthenticator) Start(context.Context, component.Host) error { + return nil +} + +// Shutdown isn't currently implemented and always returns nil. +func (m *MockAuthenticator) Shutdown(ctx context.Context) error { + return nil +} diff --git a/config/configauth/mocks_test.go b/config/configauth/mocks_test.go new file mode 100644 index 000000000000..123592acd40e --- /dev/null +++ b/config/configauth/mocks_test.go @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package configauth + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAuthenticateFunc(t *testing.T) { + // prepare + m := &MockAuthenticator{} + called := false + m.AuthenticateFunc = func(c context.Context, m map[string][]string) (context.Context, error) { + called = true + return c, nil + } + + // test + _, err := m.Authenticate(context.Background(), nil) + + // verify + assert.NoError(t, err) + assert.True(t, called) +} + +func TestNilOperations(t *testing.T) { + // prepare + m := &MockAuthenticator{} + + // test and verify + origCtx := context.Background() + + { + ctx, err := m.Authenticate(origCtx, nil) + assert.NoError(t, err) + assert.Equal(t, origCtx, ctx) + } + + { + ret, err := m.UnaryInterceptor(origCtx, nil, nil, nil) + assert.Nil(t, ret) + assert.NoError(t, err) + } + + { + err := m.StreamInterceptor(nil, nil, nil, nil) + assert.NoError(t, err) + } + + { + err := m.Start(origCtx, nil) + assert.NoError(t, err) + } + + { + err := m.Shutdown(origCtx) + assert.NoError(t, err) + } + +} diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index 2da35dc802ed..d4fbcbf5ddd3 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -27,6 +27,8 @@ import ( "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/keepalive" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configauth" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configtls" @@ -239,7 +241,7 @@ func (gss *GRPCServerSettings) ToListener() (net.Listener, error) { } // ToServerOption maps configgrpc.GRPCServerSettings to a slice of server options for gRPC -func (gss *GRPCServerSettings) ToServerOption() ([]grpc.ServerOption, error) { +func (gss *GRPCServerSettings) ToServerOption(ext map[config.NamedEntity]component.Extension) ([]grpc.ServerOption, error) { var opts []grpc.ServerOption if gss.TLSSetting != nil { @@ -295,7 +297,7 @@ func (gss *GRPCServerSettings) ToServerOption() ([]grpc.ServerOption, error) { } if gss.Auth != nil { - authOpts, err := gss.Auth.ToServerOptions() + authOpts, err := gss.Auth.ToServerOption(ext) if err != nil { return nil, err } diff --git a/config/configgrpc/configgrpc_test.go b/config/configgrpc/configgrpc_test.go index 9aa9e032253c..b09e66abfb0a 100644 --- a/config/configgrpc/configgrpc_test.go +++ b/config/configgrpc/configgrpc_test.go @@ -25,6 +25,8 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configauth" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configtls" @@ -71,7 +73,7 @@ func TestAllGrpcClientSettings(t *testing.T) { func TestDefaultGrpcServerSettings(t *testing.T) { gss := &GRPCServerSettings{} - opts, err := gss.ToServerOption() + opts, err := gss.ToServerOption(map[config.NamedEntity]component.Extension{}) assert.NoError(t, err) assert.Len(t, opts, 0) } @@ -104,7 +106,7 @@ func TestAllGrpcServerSettingsExceptAuth(t *testing.T) { }, }, } - opts, err := gss.ToServerOption() + opts, err := gss.ToServerOption(map[config.NamedEntity]component.Extension{}) assert.NoError(t, err) assert.Len(t, opts, 7) } @@ -113,19 +115,24 @@ func TestGrpcServerAuthSettings(t *testing.T) { gss := &GRPCServerSettings{} // sanity check - _, err := gss.ToServerOption() + _, err := gss.ToServerOption(map[config.NamedEntity]component.Extension{}) require.NoError(t, err) // test gss.Auth = &configauth.Authentication{ - OIDC: &configauth.OIDC{}, + AuthenticatorName: "mock", } - opts, err := gss.ToServerOption() + ext := map[config.NamedEntity]component.Extension{ + &config.ExtensionSettings{ + NameVal: "mock", + TypeVal: "mock", + }: &configauth.MockAuthenticator{}, + } + opts, err := gss.ToServerOption(ext) // verify - // an error here is a positive confirmation that Auth kicked in - assert.Error(t, err) - assert.Nil(t, opts) + assert.NoError(t, err) + assert.NotNil(t, opts) } func TestGRPCClientSettingsError(t *testing.T) { @@ -260,7 +267,7 @@ func TestGRPCServerSettingsError(t *testing.T) { } for _, test := range tests { t.Run(test.err, func(t *testing.T) { - _, err := test.settings.ToServerOption() + _, err := test.settings.ToServerOption(map[config.NamedEntity]component.Extension{}) assert.Regexp(t, test.err, err) }) } @@ -413,7 +420,7 @@ func TestHttpReception(t *testing.T) { } ln, err := gss.ToListener() assert.NoError(t, err) - opts, err := gss.ToServerOption() + opts, err := gss.ToServerOption(map[config.NamedEntity]component.Extension{}) assert.NoError(t, err) s := grpc.NewServer(opts...) otelcol.RegisterTraceServiceServer(s, &grpcTraceServer{}) @@ -458,7 +465,7 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) { } ln, err := gss.ToListener() assert.NoError(t, err) - opts, err := gss.ToServerOption() + opts, err := gss.ToServerOption(map[config.NamedEntity]component.Extension{}) assert.NoError(t, err) s := grpc.NewServer(opts...) otelcol.RegisterTraceServiceServer(s, &grpcTraceServer{}) diff --git a/extension/authoidcextension/README.md b/extension/authoidcextension/README.md new file mode 100644 index 000000000000..fbffe45502ac --- /dev/null +++ b/extension/authoidcextension/README.md @@ -0,0 +1,35 @@ +# Authenticator - OIDC + +This extension implements a `configauth.Authenticator`, to be used in receivers inside the `auth` settings. The authenticator type has to be set to `oidc`. + +## Configuration + +```yaml +extensions: + oidc: + issuer_url: https://tenant1.example.com/ + issuer_ca_path: /etc/pki/tls/cert.pem + client_id: my-oidc-client + username_claim: email + +receivers: + otlp: + protocols: + grpc: + authentication: + attribute: authorization + authenticator: oidc + +processors: + +exporters: + logging: + logLevel: debug + +service: + pipelines: + traces: + receivers: [otlp] + processors: [] + exporters: [logging] +``` \ No newline at end of file diff --git a/extension/authoidcextension/config.go b/extension/authoidcextension/config.go new file mode 100644 index 000000000000..31d402c45884 --- /dev/null +++ b/extension/authoidcextension/config.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package authoidcextension + +import "go.opentelemetry.io/collector/config" + +// Config has the configuration for the OIDC Authenticator extension. +type Config struct { + config.ExtensionSettings `mapstructure:",squash"` + + // The attribute (header name) to look for auth data. Optional, default value: "authentication". + Attribute string `mapstructure:"attribute"` + + // IssuerURL is the base URL for the OIDC provider. + // Required. + IssuerURL string `mapstructure:"issuer_url"` + + // Audience of the token, used during the verification. + // For example: "https://accounts.google.com" or "https://login.salesforce.com". + // Required. + Audience string `mapstructure:"audience"` + + // The local path for the issuer CA's TLS server cert. + // Optional. + IssuerCAPath string `mapstructure:"issuer_ca_path"` + + // The claim to use as the username, in case the token's 'sub' isn't the suitable source. + // Optional. + UsernameClaim string `mapstructure:"username_claim"` + + // The claim that holds the subject's group membership information. + // Optional. + GroupsClaim string `mapstructure:"groups_claim"` +} diff --git a/config/configauth/oidc_authenticator.go b/extension/authoidcextension/extension.go similarity index 66% rename from config/configauth/oidc_authenticator.go rename to extension/authoidcextension/extension.go index 30b146404bd5..39d74933419a 100644 --- a/config/configauth/oidc_authenticator.go +++ b/extension/authoidcextension/extension.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package configauth +package authoidcextension import ( "context" @@ -29,23 +29,28 @@ import ( "time" "github.com/coreos/go-oidc" + "go.uber.org/zap" "google.golang.org/grpc" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configauth" ) -type oidcAuthenticator struct { - attribute string - config OIDC - provider *oidc.Provider - verifier *oidc.IDTokenVerifier +type oidcExtension struct { + cfg *Config + unaryInterceptor configauth.UnaryInterceptorFunc + streamInterceptor configauth.StreamInterceptorFunc + + provider *oidc.Provider + verifier *oidc.IDTokenVerifier - unaryInterceptor unaryInterceptorFunc - streamInterceptor streamInterceptorFunc + logger *zap.Logger } var ( - _ Authenticator = (*oidcAuthenticator)(nil) + _ configauth.Authenticator = (*oidcExtension)(nil) - errNoClientIDProvided = errors.New("no ClientID provided for the OIDC configuration") + errNoAudienceProvided = errors.New("no Audience provided for the OIDC configuration") errNoIssuerURL = errors.New("no IssuerURL provided for the OIDC configuration") errInvalidAuthenticationHeaderFormat = errors.New("invalid authorization header format") errFailedToObtainClaimsFromToken = errors.New("failed to get the subject from the token issued by the OIDC provider") @@ -55,27 +60,48 @@ var ( errNotAuthenticated = errors.New("authentication didn't succeed") ) -func newOIDCAuthenticator(cfg Authentication) (*oidcAuthenticator, error) { - if cfg.OIDC.Audience == "" { - return nil, errNoClientIDProvided +func newExtension(cfg *Config, logger *zap.Logger) (*oidcExtension, error) { + if cfg.Audience == "" { + return nil, errNoAudienceProvided } - if cfg.OIDC.IssuerURL == "" { + if cfg.IssuerURL == "" { return nil, errNoIssuerURL } + if cfg.Attribute == "" { cfg.Attribute = defaultAttribute } - return &oidcAuthenticator{ - attribute: cfg.Attribute, - config: *cfg.OIDC, - unaryInterceptor: defaultUnaryInterceptor, - streamInterceptor: defaultStreamInterceptor, + return &oidcExtension{ + cfg: cfg, + logger: logger, + unaryInterceptor: configauth.DefaultUnaryInterceptor, + streamInterceptor: configauth.DefaultStreamInterceptor, }, nil } -func (o *oidcAuthenticator) Authenticate(ctx context.Context, headers map[string][]string) (context.Context, error) { - authHeaders := headers[o.attribute] +func (e *oidcExtension) Start(ctx context.Context, _ component.Host) error { + provider, err := getProviderForConfig(e.cfg) + if err != nil { + return fmt.Errorf("failed to get configuration from the auth server: %w", err) + } + e.provider = provider + + e.verifier = e.provider.Verifier(&oidc.Config{ + ClientID: e.cfg.Audience, + }) + + return nil +} + +// Shutdown is invoked during service shutdown. +func (e *oidcExtension) Shutdown(context.Context) error { + return nil +} + +// Authenticate checks whether the given context contains valid auth data. Successfully authenticated calls will always return a nil error and a context with the auth data. +func (e *oidcExtension) Authenticate(ctx context.Context, headers map[string][]string) (context.Context, error) { + authHeaders := headers[e.cfg.Attribute] if len(authHeaders) == 0 { return ctx, errNotAuthenticated } @@ -86,7 +112,7 @@ func (o *oidcAuthenticator) Authenticate(ctx context.Context, headers map[string return ctx, errInvalidAuthenticationHeaderFormat } - idToken, err := o.verifier.Verify(ctx, parts[1]) + idToken, err := e.verifier.Verify(ctx, parts[1]) if err != nil { return ctx, fmt.Errorf("failed to verify token: %w", err) } @@ -102,47 +128,29 @@ func (o *oidcAuthenticator) Authenticate(ctx context.Context, headers map[string return ctx, errFailedToObtainClaimsFromToken } - sub, err := getSubjectFromClaims(claims, o.config.UsernameClaim, idToken.Subject) + sub, err := getSubjectFromClaims(claims, e.cfg.UsernameClaim, idToken.Subject) if err != nil { return ctx, fmt.Errorf("failed to get subject from claims in the token: %w", err) } - ctx = context.WithValue(ctx, subjectKey, sub) + ctx = context.WithValue(ctx, configauth.SubjectKey, sub) - gr, err := getGroupsFromClaims(claims, o.config.GroupsClaim) + gr, err := getGroupsFromClaims(claims, e.cfg.GroupsClaim) if err != nil { return ctx, fmt.Errorf("failed to get groups from claims in the token: %w", err) } - ctx = context.WithValue(ctx, groupsKey, gr) + ctx = context.WithValue(ctx, configauth.GroupsKey, gr) return ctx, nil } -func (o *oidcAuthenticator) Start(context.Context) error { - provider, err := getProviderForConfig(o.config) - if err != nil { - return fmt.Errorf("failed to get configuration from the auth server: %w", err) - } - o.provider = provider - - o.verifier = o.provider.Verifier(&oidc.Config{ - ClientID: o.config.Audience, - }) - - return nil -} - -func (o *oidcAuthenticator) Close() error { - // no-op at the moment - // once we implement caching of the tokens we might need this - return nil -} - -func (o *oidcAuthenticator) UnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - return o.unaryInterceptor(ctx, req, info, handler, o.Authenticate) +// UnaryInterceptor is a helper method to provide a gRPC-compatible UnaryInterceptor, typically calling the authenticator's Authenticate method. +func (e *oidcExtension) UnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + return e.unaryInterceptor(ctx, req, info, handler, e.Authenticate) } -func (o *oidcAuthenticator) StreamInterceptor(srv interface{}, str grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - return o.streamInterceptor(srv, str, info, handler, o.Authenticate) +// StreamInterceptor is a helper method to provide a gRPC-compatible StreamInterceptor, typically calling the authenticator's Authenticate method. +func (e *oidcExtension) StreamInterceptor(srv interface{}, str grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return e.streamInterceptor(srv, str, info, handler, e.Authenticate) } func getSubjectFromClaims(claims map[string]interface{}, usernameClaim string, fallback string) (string, error) { @@ -188,7 +196,7 @@ func getGroupsFromClaims(claims map[string]interface{}, groupsClaim string) ([]s return []string{}, nil } -func getProviderForConfig(config OIDC) (*oidc.Provider, error) { +func getProviderForConfig(config *Config) (*oidc.Provider, error) { t := &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{ diff --git a/config/configauth/oidc_authenticator_test.go b/extension/authoidcextension/extension_test.go similarity index 79% rename from config/configauth/oidc_authenticator_test.go rename to extension/authoidcextension/extension_test.go index f64e9e2bc950..2d813416cbcf 100644 --- a/config/configauth/oidc_authenticator_test.go +++ b/extension/authoidcextension/extension_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package configauth +package authoidcextension import ( "context" @@ -33,7 +33,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "google.golang.org/grpc" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configauth" ) func TestOIDCAuthenticationSucceeded(t *testing.T) { @@ -43,17 +47,15 @@ func TestOIDCAuthenticationSucceeded(t *testing.T) { oidcServer.Start() defer oidcServer.Close() - config := Authentication{ - OIDC: &OIDC{ - IssuerURL: oidcServer.URL, - Audience: "unit-test", - GroupsClaim: "memberships", - }, + config := &Config{ + IssuerURL: oidcServer.URL, + Audience: "unit-test", + GroupsClaim: "memberships", } - p, err := newOIDCAuthenticator(config) + p, err := newExtension(config, zap.NewNop()) require.NoError(t, err) - err = p.Start(context.Background()) + err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) payload, _ := json.Marshal(map[string]interface{}{ @@ -74,11 +76,11 @@ func TestOIDCAuthenticationSucceeded(t *testing.T) { assert.NotNil(t, ctx) assert.NoError(t, err) - subject, ok := SubjectFromContext(ctx) + subject, ok := configauth.SubjectFromContext(ctx) assert.True(t, ok) assert.EqualValues(t, "jdoe@example.com", subject) - groups, ok := GroupsFromContext(ctx) + groups, ok := configauth.GroupsFromContext(ctx) assert.True(t, ok) assert.Contains(t, groups, "department-1") assert.Contains(t, groups, "department-2") @@ -120,7 +122,7 @@ func TestOIDCProviderForConfigWithTLS(t *testing.T) { oidcServer.StartTLS() // prepare the processor configuration - config := OIDC{ + config := &Config{ IssuerURL: oidcServer.URL, IssuerCAPath: caFile.Name(), Audience: "unit-test", @@ -195,7 +197,7 @@ func TestOIDCFailedToLoadIssuerCAFromPathInvalidContent(t *testing.T) { _, err = file.Write([]byte("foobar")) require.NoError(t, err) - config := OIDC{ + config := &Config{ IssuerCAPath: file.Name(), } @@ -209,12 +211,10 @@ func TestOIDCFailedToLoadIssuerCAFromPathInvalidContent(t *testing.T) { func TestOIDCInvalidAuthHeader(t *testing.T) { // prepare - p, err := newOIDCAuthenticator(Authentication{ - OIDC: &OIDC{ - Audience: "some-audience", - IssuerURL: "http://example.com", - }, - }) + p, err := newExtension(&Config{ + Audience: "some-audience", + IssuerURL: "http://example.com", + }, zap.NewNop()) require.NoError(t, err) // test @@ -227,12 +227,10 @@ func TestOIDCInvalidAuthHeader(t *testing.T) { func TestOIDCNotAuthenticated(t *testing.T) { // prepare - p, err := newOIDCAuthenticator(Authentication{ - OIDC: &OIDC{ - Audience: "some-audience", - IssuerURL: "http://example.com", - }, - }) + p, err := newExtension(&Config{ + Audience: "some-audience", + IssuerURL: "http://example.com", + }, zap.NewNop()) require.NoError(t, err) // test @@ -245,16 +243,14 @@ func TestOIDCNotAuthenticated(t *testing.T) { func TestProviderNotReacheable(t *testing.T) { // prepare - p, err := newOIDCAuthenticator(Authentication{ - OIDC: &OIDC{ - Audience: "some-audience", - IssuerURL: "http://example.com", - }, - }) + p, err := newExtension(&Config{ + Audience: "some-audience", + IssuerURL: "http://example.com", + }, zap.NewNop()) require.NoError(t, err) // test - err = p.Start(context.Background()) + err = p.Start(context.Background(), componenttest.NewNopHost()) // verify assert.Error(t, err) @@ -267,15 +263,13 @@ func TestFailedToVerifyToken(t *testing.T) { oidcServer.Start() defer oidcServer.Close() - p, err := newOIDCAuthenticator(Authentication{ - OIDC: &OIDC{ - IssuerURL: oidcServer.URL, - Audience: "unit-test", - }, - }) + p, err := newExtension(&Config{ + IssuerURL: oidcServer.URL, + Audience: "unit-test", + }, zap.NewNop()) require.NoError(t, err) - err = p.Start(context.Background()) + err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) // test @@ -295,48 +289,42 @@ func TestFailedToGetGroupsClaimFromToken(t *testing.T) { for _, tt := range []struct { casename string - config Authentication + config *Config expectedError error }{ { "groupsClaimNonExisting", - Authentication{ - OIDC: &OIDC{ - IssuerURL: oidcServer.URL, - Audience: "unit-test", - GroupsClaim: "non-existing-claim", - }, + &Config{ + IssuerURL: oidcServer.URL, + Audience: "unit-test", + GroupsClaim: "non-existing-claim", }, errGroupsClaimNotFound, }, { "usernameClaimNonExisting", - Authentication{ - OIDC: &OIDC{ - IssuerURL: oidcServer.URL, - Audience: "unit-test", - UsernameClaim: "non-existing-claim", - }, + &Config{ + IssuerURL: oidcServer.URL, + Audience: "unit-test", + UsernameClaim: "non-existing-claim", }, errClaimNotFound, }, { "usernameNotString", - Authentication{ - OIDC: &OIDC{ - IssuerURL: oidcServer.URL, - Audience: "unit-test", - UsernameClaim: "some-non-string-field", - }, + &Config{ + IssuerURL: oidcServer.URL, + Audience: "unit-test", + UsernameClaim: "some-non-string-field", }, errUsernameNotString, }, } { t.Run(tt.casename, func(t *testing.T) { - p, err := newOIDCAuthenticator(tt.config) + p, err := newExtension(tt.config, zap.NewNop()) require.NoError(t, err) - err = p.Start(context.Background()) + err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) payload, _ := json.Marshal(map[string]interface{}{ @@ -437,50 +425,44 @@ func TestEmptyGroupsClaim(t *testing.T) { func TestMissingClient(t *testing.T) { // prepare - config := Authentication{ - OIDC: &OIDC{ - IssuerURL: "http://example.com/", - }, + config := &Config{ + IssuerURL: "http://example.com/", } // test - p, err := newOIDCAuthenticator(config) + p, err := newExtension(config, zap.NewNop()) // verify assert.Nil(t, p) - assert.Equal(t, errNoClientIDProvided, err) + assert.Equal(t, errNoAudienceProvided, err) } func TestMissingIssuerURL(t *testing.T) { // prepare - config := Authentication{ - OIDC: &OIDC{ - Audience: "some-audience", - }, + config := &Config{ + Audience: "some-audience", } // test - p, err := newOIDCAuthenticator(config) + p, err := newExtension(config, zap.NewNop()) // verify assert.Nil(t, p) assert.Equal(t, errNoIssuerURL, err) } -func TestClose(t *testing.T) { +func TestShutdown(t *testing.T) { // prepare - config := Authentication{ - OIDC: &OIDC{ - Audience: "some-audience", - IssuerURL: "http://example.com/", - }, + config := &Config{ + Audience: "some-audience", + IssuerURL: "http://example.com/", } - p, err := newOIDCAuthenticator(config) + p, err := newExtension(config, zap.NewNop()) require.NoError(t, err) require.NotNil(t, p) // test - err = p.Close() // for now, we never fail + err = p.Shutdown(context.Background()) // for now, we never fail // verify assert.NoError(t, err) @@ -488,18 +470,16 @@ func TestClose(t *testing.T) { func TestUnaryInterceptor(t *testing.T) { // prepare - config := Authentication{ - OIDC: &OIDC{ - Audience: "some-audience", - IssuerURL: "http://example.com/", - }, + config := &Config{ + Audience: "some-audience", + IssuerURL: "http://example.com/", } - p, err := newOIDCAuthenticator(config) + p, err := newExtension(config, zap.NewNop()) require.NoError(t, err) require.NotNil(t, p) interceptorCalled := false - p.unaryInterceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, authenticate authenticateFunc) (interface{}, error) { + p.unaryInterceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, authenticate configauth.AuthenticateFunc) (interface{}, error) { interceptorCalled = true return nil, nil } @@ -518,18 +498,16 @@ func TestUnaryInterceptor(t *testing.T) { func TestStreamInterceptor(t *testing.T) { // prepare - config := Authentication{ - OIDC: &OIDC{ - Audience: "some-audience", - IssuerURL: "http://example.com/", - }, + config := &Config{ + Audience: "some-audience", + IssuerURL: "http://example.com/", } - p, err := newOIDCAuthenticator(config) + p, err := newExtension(config, zap.NewNop()) require.NoError(t, err) require.NotNil(t, p) interceptorCalled := false - p.streamInterceptor = func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, authenticate authenticateFunc) error { + p.streamInterceptor = func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, authenticate configauth.AuthenticateFunc) error { interceptorCalled = true return nil } @@ -547,3 +525,12 @@ func TestStreamInterceptor(t *testing.T) { assert.NoError(t, err) assert.True(t, interceptorCalled) } + +type mockServerStream struct { + grpc.ServerStream + ctx context.Context +} + +func (m *mockServerStream) Context() context.Context { + return m.ctx +} diff --git a/extension/authoidcextension/factory.go b/extension/authoidcextension/factory.go new file mode 100644 index 000000000000..fc2d3519518c --- /dev/null +++ b/extension/authoidcextension/factory.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package authoidcextension + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/extension/extensionhelper" +) + +const ( + // The value of extension "type" in configuration. + typeStr = "oidc" + + defaultAttribute = "authorization" +) + +// NewFactory creates a factory for the OIDC Authenticator extension. +func NewFactory() component.ExtensionFactory { + return extensionhelper.NewFactory( + typeStr, + createDefaultConfig, + createExtension) +} + +func createDefaultConfig() config.Extension { + return &Config{ + ExtensionSettings: config.ExtensionSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + Attribute: defaultAttribute, + } +} + +func createExtension(_ context.Context, params component.ExtensionCreateParams, cfg config.Extension) (component.Extension, error) { + return newExtension(cfg.(*Config), params.Logger) +} diff --git a/extension/authoidcextension/factory_test.go b/extension/authoidcextension/factory_test.go new file mode 100644 index 000000000000..a01e8cc670f3 --- /dev/null +++ b/extension/authoidcextension/factory_test.go @@ -0,0 +1,57 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package authoidcextension + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configcheck" +) + +func TestCreateDefaultConfig(t *testing.T) { + // prepare and test + expected := &Config{ + ExtensionSettings: *config.NewExtensionSettings(typeStr), + Attribute: defaultAttribute, + } + + // test + cfg := createDefaultConfig() + + // verify + assert.Equal(t, expected, cfg) + assert.NoError(t, configcheck.ValidateConfig(cfg)) +} + +func TestCreateExtension(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Audience = "collector" + cfg.IssuerURL = "https://auth.example.com" + + ext, err := createExtension(context.Background(), component.ExtensionCreateParams{Logger: zap.NewNop()}, cfg) + assert.NoError(t, err) + assert.NotNil(t, ext) +} + +func TestNewFactory(t *testing.T) { + f := NewFactory() + assert.NotNil(t, f) +} diff --git a/config/configauth/oidc_server_test.go b/extension/authoidcextension/oidc_server_test.go similarity index 99% rename from config/configauth/oidc_server_test.go rename to extension/authoidcextension/oidc_server_test.go index 414d07dab93d..d7002834c79e 100644 --- a/config/configauth/oidc_server_test.go +++ b/extension/authoidcextension/oidc_server_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package configauth +package authoidcextension import ( "bytes" diff --git a/receiver/jaegerreceiver/factory.go b/receiver/jaegerreceiver/factory.go index c7636a93f7e3..a304f0fbf1b1 100644 --- a/receiver/jaegerreceiver/factory.go +++ b/receiver/jaegerreceiver/factory.go @@ -103,16 +103,12 @@ func createTracesReceiver( var config configuration // Set ports if rCfg.Protocols.GRPC != nil { + config.CollectorGRPCServerSettings = *rCfg.Protocols.GRPC var err error config.CollectorGRPCPort, err = extractPortFromEndpoint(rCfg.Protocols.GRPC.NetAddr.Endpoint) if err != nil { return nil, fmt.Errorf("unable to extract port for GRPC: %w", err) } - - config.CollectorGRPCOptions, err = rCfg.Protocols.GRPC.ToServerOption() - if err != nil { - return nil, err - } } if rCfg.Protocols.ThriftHTTP != nil { diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index dde546fa9bc9..a299b7f0162b 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -59,11 +59,11 @@ import ( // configuration defines the behavior and the ports that // the Jaeger receiver will use. type configuration struct { - CollectorThriftPort int - CollectorHTTPPort int - CollectorHTTPSettings confighttp.HTTPServerSettings - CollectorGRPCPort int - CollectorGRPCOptions []grpc.ServerOption + CollectorThriftPort int + CollectorHTTPPort int + CollectorHTTPSettings confighttp.HTTPServerSettings + CollectorGRPCPort int + CollectorGRPCServerSettings configgrpc.GRPCServerSettings AgentCompactThriftPort int AgentCompactThriftConfig ServerConfigUDP @@ -482,7 +482,12 @@ func (jr *jReceiver) startCollector(host component.Host) error { } if jr.collectorGRPCEnabled() { - jr.grpc = grpc.NewServer(jr.config.CollectorGRPCOptions...) + opts, err := jr.config.CollectorGRPCServerSettings.ToServerOption(host.GetExtensions()) + if err != nil { + return fmt.Errorf("failed to build the options for the Jaeger gRPC Collector: %v", err) + } + + jr.grpc = grpc.NewServer(opts...) gaddr := jr.collectorGRPCAddr() gln, gerr := net.Listen("tcp", gaddr) if gerr != nil { diff --git a/receiver/jaegerreceiver/trace_receiver_test.go b/receiver/jaegerreceiver/trace_receiver_test.go index eeec55d0efd5..72a85e21c896 100644 --- a/receiver/jaegerreceiver/trace_receiver_test.go +++ b/receiver/jaegerreceiver/trace_receiver_test.go @@ -236,22 +236,21 @@ func TestGRPCReception(t *testing.T) { func TestGRPCReceptionWithTLS(t *testing.T) { // prepare - var grpcServerOptions []grpc.ServerOption - tlsCreds := configtls.TLSServerSetting{ + tlsCreds := &configtls.TLSServerSetting{ TLSSetting: configtls.TLSSetting{ CertFile: path.Join(".", "testdata", "server.crt"), KeyFile: path.Join(".", "testdata", "server.key"), }, } - tlsCfg, err := tlsCreds.LoadTLSConfig() - assert.NoError(t, err) - grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsCfg))) + grpcServerSettings := configgrpc.GRPCServerSettings{ + TLSSetting: tlsCreds, + } port := testutil.GetAvailablePort(t) config := &configuration{ - CollectorGRPCPort: int(port), - CollectorGRPCOptions: grpcServerOptions, + CollectorGRPCPort: int(port), + CollectorGRPCServerSettings: grpcServerSettings, } sink := new(consumertest.TracesSink) diff --git a/receiver/opencensusreceiver/config.go b/receiver/opencensusreceiver/config.go index be372c67c782..a9298b6fcf98 100644 --- a/receiver/opencensusreceiver/config.go +++ b/receiver/opencensusreceiver/config.go @@ -33,21 +33,15 @@ type Config struct { CorsOrigins []string `mapstructure:"cors_allowed_origins"` } -func (cfg *Config) buildOptions() ([]ocOption, error) { +func (cfg *Config) buildOptions() []ocOption { var opts []ocOption if len(cfg.CorsOrigins) > 0 { opts = append(opts, withCorsOrigins(cfg.CorsOrigins)) } - grpcServerOptions, err := cfg.GRPCServerSettings.ToServerOption() - if err != nil { - return nil, err - } - if len(grpcServerOptions) > 0 { - opts = append(opts, withGRPCServerOptions(grpcServerOptions...)) - } + opts = append(opts, withGRPCServerSettings(cfg.GRPCServerSettings)) - return opts, nil + return opts } var _ config.Receiver = (*Config)(nil) diff --git a/receiver/opencensusreceiver/config_test.go b/receiver/opencensusreceiver/config_test.go index 2937b9761b7e..aa53810c737f 100644 --- a/receiver/opencensusreceiver/config_test.go +++ b/receiver/opencensusreceiver/config_test.go @@ -172,25 +172,3 @@ func TestLoadConfig(t *testing.T) { }, }) } - -func TestBuildOptions_TLSCredentials(t *testing.T) { - cfg := Config{ - ReceiverSettings: config.ReceiverSettings{ - NameVal: "IncorrectTLS", - }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - TLSSetting: &configtls.TLSServerSetting{ - TLSSetting: configtls.TLSSetting{ - CertFile: "willfail", - }, - }, - }, - } - _, err := cfg.buildOptions() - assert.EqualError(t, err, `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) - - cfg.TLSSetting = &configtls.TLSServerSetting{} - opt, err := cfg.buildOptions() - assert.NoError(t, err) - assert.NotNil(t, opt) -} diff --git a/receiver/opencensusreceiver/factory.go b/receiver/opencensusreceiver/factory.go index 44add91713c7..c2d2e6b8ec69 100644 --- a/receiver/opencensusreceiver/factory.go +++ b/receiver/opencensusreceiver/factory.go @@ -98,12 +98,10 @@ func createReceiver(cfg config.Receiver) (*ocReceiver, error) { receiver, ok := receivers[rCfg] if !ok { // Build the configuration options. - opts, err := rCfg.buildOptions() - if err != nil { - return nil, err - } + opts := rCfg.buildOptions() // We don't have a receiver, so create one. + var err error receiver, err = newOpenCensusReceiver( rCfg.Name(), rCfg.NetAddr.Transport, rCfg.NetAddr.Endpoint, nil, nil, opts...) if err != nil { diff --git a/receiver/opencensusreceiver/opencensus.go b/receiver/opencensusreceiver/opencensus.go index 6cb00c409ef5..b254defd6058 100644 --- a/receiver/opencensusreceiver/opencensus.go +++ b/receiver/opencensusreceiver/opencensus.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/receiver/opencensusreceiver/ocmetrics" @@ -39,13 +40,13 @@ import ( // ocReceiver is the type that exposes Trace and Metrics reception. type ocReceiver struct { - mu sync.Mutex - ln net.Listener - serverGRPC *grpc.Server - serverHTTP *http.Server - gatewayMux *gatewayruntime.ServeMux - corsOrigins []string - grpcServerOptions []grpc.ServerOption + mu sync.Mutex + ln net.Listener + serverGRPC *grpc.Server + serverHTTP *http.Server + gatewayMux *gatewayruntime.ServeMux + corsOrigins []string + grpcServerSettings configgrpc.GRPCServerSettings traceReceiverOpts []octrace.Option @@ -103,44 +104,61 @@ func (ocr *ocReceiver) Start(_ context.Context, host component.Host) error { return ocr.start(host) } -func (ocr *ocReceiver) registerTraceConsumer() error { +func (ocr *ocReceiver) registerTraceConsumer(host component.Host) error { var err = componenterror.ErrAlreadyStarted ocr.startTracesReceiverOnce.Do(func() { - ocr.traceReceiver, err = octrace.New( - ocr.instanceName, ocr.traceConsumer, ocr.traceReceiverOpts...) - if err == nil { - srv := ocr.grpcServer() - agenttracepb.RegisterTraceServiceServer(srv, ocr.traceReceiver) + ocr.traceReceiver, err = octrace.New(ocr.instanceName, ocr.traceConsumer, ocr.traceReceiverOpts...) + if err != nil { + return } + + var srv *grpc.Server + srv, err = ocr.grpcServer(host) + if err != nil { + return + } + + agenttracepb.RegisterTraceServiceServer(srv, ocr.traceReceiver) + }) return err } -func (ocr *ocReceiver) registerMetricsConsumer() error { +func (ocr *ocReceiver) registerMetricsConsumer(host component.Host) error { var err = componenterror.ErrAlreadyStarted ocr.startMetricsReceiverOnce.Do(func() { - ocr.metricsReceiver, err = ocmetrics.New( - ocr.instanceName, ocr.metricsConsumer) - if err == nil { - srv := ocr.grpcServer() - agentmetricspb.RegisterMetricsServiceServer(srv, ocr.metricsReceiver) + ocr.metricsReceiver, err = ocmetrics.New(ocr.instanceName, ocr.metricsConsumer) + if err != nil { + return + } + + var srv *grpc.Server + srv, err = ocr.grpcServer(host) + if err != nil { + return } + + agentmetricspb.RegisterMetricsServiceServer(srv, ocr.metricsReceiver) }) return err } -func (ocr *ocReceiver) grpcServer() *grpc.Server { +func (ocr *ocReceiver) grpcServer(host component.Host) (*grpc.Server, error) { ocr.mu.Lock() defer ocr.mu.Unlock() if ocr.serverGRPC == nil { - ocr.serverGRPC = obsreport.GRPCServerWithObservabilityEnabled(ocr.grpcServerOptions...) + opts, err := ocr.grpcServerSettings.ToServerOption(host.GetExtensions()) + if err != nil { + return nil, err + } + ocr.serverGRPC = obsreport.GRPCServerWithObservabilityEnabled(opts...) } - return ocr.serverGRPC + return ocr.serverGRPC, nil } // Shutdown is a method to turn off receiving. @@ -156,14 +174,14 @@ func (ocr *ocReceiver) start(host component.Host) error { hasConsumer := false if ocr.traceConsumer != nil { hasConsumer = true - if err := ocr.registerTraceConsumer(); err != nil && err != componenterror.ErrAlreadyStarted { + if err := ocr.registerTraceConsumer(host); err != nil && err != componenterror.ErrAlreadyStarted { return err } } if ocr.metricsConsumer != nil { hasConsumer = true - if err := ocr.registerMetricsConsumer(); err != nil && err != componenterror.ErrAlreadyStarted { + if err := ocr.registerMetricsConsumer(host); err != nil && err != componenterror.ErrAlreadyStarted { return err } } diff --git a/receiver/opencensusreceiver/opencensus_test.go b/receiver/opencensusreceiver/opencensus_test.go index 2fd4fd85fe44..c445eb6e5603 100644 --- a/receiver/opencensusreceiver/opencensus_test.go +++ b/receiver/opencensusreceiver/opencensus_test.go @@ -44,6 +44,9 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/internalconsumertest" "go.opentelemetry.io/collector/obsreport/obsreporttest" @@ -613,3 +616,29 @@ func TestOCReceiverMetrics_HandleNextConsumerResponse(t *testing.T) { } } } + +func TestInvalidTLSCredentials(t *testing.T) { + cfg := Config{ + ReceiverSettings: config.ReceiverSettings{ + NameVal: "IncorrectTLS", + }, + GRPCServerSettings: configgrpc.GRPCServerSettings{ + TLSSetting: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CertFile: "willfail", + }, + }, + }, + } + opt := cfg.buildOptions() + assert.NotNil(t, opt) + + addr := testutil.GetAvailableLocalAddress(t) + ocr, err := newOpenCensusReceiver("invalidtls", "tcp", addr, nil, nil, opt...) + assert.NoError(t, err) + assert.NotNil(t, ocr) + + srv, err := ocr.grpcServer(componenttest.NewNopHost()) + assert.EqualError(t, err, `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) + assert.Nil(t, srv) +} diff --git a/receiver/opencensusreceiver/options.go b/receiver/opencensusreceiver/options.go index 84fc48da1426..289610c3a1b8 100644 --- a/receiver/opencensusreceiver/options.go +++ b/receiver/opencensusreceiver/options.go @@ -15,7 +15,7 @@ package opencensusreceiver import ( - "google.golang.org/grpc" + "go.opentelemetry.io/collector/config/configgrpc" ) // ocOption interface defines for configuration settings to be applied to receivers. @@ -41,16 +41,12 @@ func withCorsOrigins(origins []string) ocOption { return &corsOrigins{origins: origins} } -var _ ocOption = (grpcServerOptions)(nil) +type grpcServerSettings configgrpc.GRPCServerSettings -type grpcServerOptions []grpc.ServerOption - -func (gsvo grpcServerOptions) withReceiver(ocr *ocReceiver) { - ocr.grpcServerOptions = gsvo -} - -// withGRPCServerOptions allows one to specify the options for starting a gRPC server. -func withGRPCServerOptions(gsOpts ...grpc.ServerOption) ocOption { - gsvOpts := grpcServerOptions(gsOpts) +func withGRPCServerSettings(settings configgrpc.GRPCServerSettings) ocOption { + gsvOpts := grpcServerSettings(settings) return gsvOpts } +func (gsvo grpcServerSettings) withReceiver(ocr *ocReceiver) { + ocr.grpcServerSettings = configgrpc.GRPCServerSettings(gsvo) +} diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index be47555637d0..4937a0a42fd3 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -77,11 +77,9 @@ func createTracesReceiver( cfg config.Receiver, nextConsumer consumer.Traces, ) (component.TracesReceiver, error) { - r, err := createReceiver(cfg, params.Logger) - if err != nil { - return nil, err - } - if err = r.registerTraceConsumer(ctx, nextConsumer); err != nil { + r := createReceiver(cfg, params.Logger) + + if err := r.registerTraceConsumer(ctx, nextConsumer); err != nil { return nil, err } return r, nil @@ -94,11 +92,9 @@ func createMetricsReceiver( cfg config.Receiver, consumer consumer.Metrics, ) (component.MetricsReceiver, error) { - r, err := createReceiver(cfg, params.Logger) - if err != nil { - return nil, err - } - if err = r.registerMetricsConsumer(ctx, consumer); err != nil { + r := createReceiver(cfg, params.Logger) + + if err := r.registerMetricsConsumer(ctx, consumer); err != nil { return nil, err } return r, nil @@ -111,17 +107,15 @@ func createLogReceiver( cfg config.Receiver, consumer consumer.Logs, ) (component.LogsReceiver, error) { - r, err := createReceiver(cfg, params.Logger) - if err != nil { - return nil, err - } - if err = r.registerLogsConsumer(ctx, consumer); err != nil { + r := createReceiver(cfg, params.Logger) + + if err := r.registerLogsConsumer(ctx, consumer); err != nil { return nil, err } return r, nil } -func createReceiver(cfg config.Receiver, logger *zap.Logger) (*otlpReceiver, error) { +func createReceiver(cfg config.Receiver, logger *zap.Logger) *otlpReceiver { rCfg := cfg.(*Config) // There must be one receiver for both metrics and traces. We maintain a map of @@ -130,16 +124,11 @@ func createReceiver(cfg config.Receiver, logger *zap.Logger) (*otlpReceiver, err // Check to see if there is already a receiver for this config. receiver, ok := receivers[rCfg] if !ok { - var err error // We don't have a receiver, so create one. - receiver, err = newOtlpReceiver(rCfg, logger) - if err != nil { - return nil, err - } - // Remember the receiver in the map + receiver = newOtlpReceiver(rCfg, logger) receivers[rCfg] = receiver } - return receiver, nil + return receiver } // This is the map of already created OTLP receivers for particular configurations. diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 34f642e8f2de..2aa782a6a408 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -59,18 +59,11 @@ type otlpReceiver struct { // newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func newOtlpReceiver(cfg *Config, logger *zap.Logger) (*otlpReceiver, error) { +func newOtlpReceiver(cfg *Config, logger *zap.Logger) *otlpReceiver { r := &otlpReceiver{ cfg: cfg, logger: logger, } - if cfg.GRPC != nil { - opts, err := cfg.GRPC.ToServerOption() - if err != nil { - return nil, err - } - r.serverGRPC = grpc.NewServer(opts...) - } if cfg.HTTP != nil { // Use our custom JSON marshaler instead of default Protobuf JSON marshaler. // This is needed because OTLP spec defines encoding for trace and span id @@ -87,12 +80,12 @@ func newOtlpReceiver(cfg *Config, logger *zap.Logger) (*otlpReceiver, error) { ) } - return r, nil + return r } func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host component.Host) error { r.logger.Info("Starting GRPC server on endpoint " + cfg.NetAddr.Endpoint) - var gln net.Listener + gln, err := cfg.ToListener() if err != nil { return err @@ -129,6 +122,25 @@ func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host func (r *otlpReceiver) startProtocolServers(host component.Host) error { var err error if r.cfg.GRPC != nil { + var opts []grpc.ServerOption + opts, err = r.cfg.GRPC.ToServerOption(host.GetExtensions()) + if err != nil { + return err + } + r.serverGRPC = grpc.NewServer(opts...) + + if r.traceReceiver != nil { + collectortrace.RegisterTraceServiceServer(r.serverGRPC, r.traceReceiver) + } + + if r.metricsReceiver != nil { + collectormetrics.RegisterMetricsServiceServer(r.serverGRPC, r.metricsReceiver) + } + + if r.logReceiver != nil { + collectorlog.RegisterLogsServiceServer(r.serverGRPC, r.logReceiver) + } + err = r.startGRPCServer(r.cfg.GRPC, host) if err != nil { return err @@ -203,9 +215,6 @@ func (r *otlpReceiver) registerTraceConsumer(ctx context.Context, tc consumer.Tr return componenterror.ErrNilNextConsumer } r.traceReceiver = trace.New(r.cfg.Name(), tc) - if r.serverGRPC != nil { - collectortrace.RegisterTraceServiceServer(r.serverGRPC, r.traceReceiver) - } if r.gatewayMux != nil { err := collectortrace.RegisterTraceServiceHandlerServer(ctx, r.gatewayMux, r.traceReceiver) if err != nil { @@ -222,9 +231,6 @@ func (r *otlpReceiver) registerMetricsConsumer(ctx context.Context, mc consumer. return componenterror.ErrNilNextConsumer } r.metricsReceiver = metrics.New(r.cfg.Name(), mc) - if r.serverGRPC != nil { - collectormetrics.RegisterMetricsServiceServer(r.serverGRPC, r.metricsReceiver) - } if r.gatewayMux != nil { return collectormetrics.RegisterMetricsServiceHandlerServer(ctx, r.gatewayMux, r.metricsReceiver) } @@ -236,9 +242,6 @@ func (r *otlpReceiver) registerLogsConsumer(ctx context.Context, tc consumer.Log return componenterror.ErrNilNextConsumer } r.logReceiver = logs.New(r.cfg.Name(), tc) - if r.serverGRPC != nil { - collectorlog.RegisterLogsServiceServer(r.serverGRPC, r.logReceiver) - } if r.gatewayMux != nil { return collectorlog.RegisterLogsServiceHandlerServer(ctx, r.gatewayMux, r.logReceiver) } diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 4ce2cf390616..d097ecf63682 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -684,8 +684,10 @@ func TestGRPCInvalidTLSCredentials(t *testing.T) { }, } - // TLS is resolved during Creation of the receiver for GRPC. - _, err := createReceiver(cfg, zap.NewNop()) + r := createReceiver(cfg, zap.NewNop()) + assert.NotNil(t, r) + + err := r.startProtocolServers(componenttest.NewNopHost()) assert.EqualError(t, err, `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) } @@ -732,8 +734,7 @@ func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.Traces, mc consu } func newReceiver(t *testing.T, factory component.ReceiverFactory, cfg *Config, tc consumer.Traces, mc consumer.Metrics) *otlpReceiver { - r, err := createReceiver(cfg, zap.NewNop()) - require.NoError(t, err) + r := createReceiver(cfg, zap.NewNop()) if tc != nil { params := component.ReceiverCreateParams{} _, err := factory.CreateTracesReceiver(context.Background(), params, cfg, tc) diff --git a/service/defaultcomponents/default_extensions_test.go b/service/defaultcomponents/default_extensions_test.go index 76edee91cf66..55dfaae0925b 100644 --- a/service/defaultcomponents/default_extensions_test.go +++ b/service/defaultcomponents/default_extensions_test.go @@ -68,7 +68,10 @@ func TestDefaultExtensions(t *testing.T) { }, } - assert.Equal(t, len(tests), len(extFactories)) + // we have one more extension that we can't test here: the OIDC Auth extension requires + // an OIDC server to get the config from, and we don't want to spawn one here for this test. + assert.Equal(t, len(tests)+1, len(extFactories)) + for _, tt := range tests { t.Run(string(tt.extension), func(t *testing.T) { factory, ok := extFactories[tt.extension] diff --git a/service/defaultcomponents/defaults.go b/service/defaultcomponents/defaults.go index 7b7cc4516f01..30214cfbf315 100644 --- a/service/defaultcomponents/defaults.go +++ b/service/defaultcomponents/defaults.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/collector/exporter/prometheusexporter" "go.opentelemetry.io/collector/exporter/prometheusremotewriteexporter" "go.opentelemetry.io/collector/exporter/zipkinexporter" + "go.opentelemetry.io/collector/extension/authoidcextension" "go.opentelemetry.io/collector/extension/healthcheckextension" "go.opentelemetry.io/collector/extension/pprofextension" "go.opentelemetry.io/collector/extension/zpagesextension" @@ -56,6 +57,7 @@ func Components() ( var errs []error extensions, err := component.MakeExtensionFactoryMap( + authoidcextension.NewFactory(), healthcheckextension.NewFactory(), pprofextension.NewFactory(), zpagesextension.NewFactory(),