From 1e6154d4ea5f539d7684e2dee0267574fad0a482 Mon Sep 17 00:00:00 2001 From: Hunter Madison Date: Wed, 26 Jul 2023 16:53:06 -0400 Subject: [PATCH 1/8] Document how to test with Minikube and Helm Signed-off-by: Hunter Madison --- examples/docker/strimzi-kafka-image/README.md | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/examples/docker/strimzi-kafka-image/README.md b/examples/docker/strimzi-kafka-image/README.md index 15519cf5..e390210b 100644 --- a/examples/docker/strimzi-kafka-image/README.md +++ b/examples/docker/strimzi-kafka-image/README.md @@ -12,7 +12,7 @@ The result is that the most recent Strimzi Kafka OAuth jars and their dependenci Building -------- -Use `docker build` to build the image: +Run `mvn install` then, use `docker build` to build the image: docker build -t strimzi/kafka:latest-kafka-3.3.2-oauth . @@ -69,10 +69,14 @@ You need to retag the built image before so you can push it to Docker Registry: Actually, Kubernetes Kind supports an even simpler option how to make an image available to Kubernetes: kind load docker-image strimzi/kafka:latest-kafka-3.3.2-oauth + +If you're using minikube, you'll need to run `minikube docker-env` before building the image. Deploying --------- +## Via the Strimzi Repository + In order for the operator to use your Kafka image, you have to replace the Kafka image coordinates in `packaging/install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml` in your `strimzi-kafka-operator` project. This image builds the kafka-3.3.2 replacement image, so we need to replace all occurrences where kafka-3.3.2 is referred to into the proper coordinates to our image: @@ -88,3 +92,15 @@ It's best to check the `060-Deployment-strimzi-cluster-operator.yaml` file manua You can now deploy Strimzi Kafka Operator following instructions in [HACKING.md](../../../HACKING.md) +## Via Helm + +You can also run the operator via its Helm chart and set the `kafka.image.registry` property to your local registry. As an example, if you've built and tagged the image as `local.dev/strimzi/kafka:0.36.0-kafka-3.5.0 `. You can run it using the operator as: + + helm repo add strimzi https://strimzi.io/charts/ --force-update + helm upgrade -i -n strimzi strimzi strimzi/strimzi-kafka-operator \ + --version 0.36.0 \ + --set watchNamespaces="{default}" \ + --set generateNetworkPolicy=false \ + --set kafka.image.registry="local.dev" \ + --wait \ + --create-namespace \ No newline at end of file From 3f507937163df07cc42f5e82181601ae1f10b68f Mon Sep 17 00:00:00 2001 From: Hunter Madison Date: Wed, 26 Jul 2023 16:53:23 -0400 Subject: [PATCH 2/8] Support disabling the "Accept" header when fetching JWK keys. When attemping to use Kubernetes' api-server as the source of JWK keys for JWT validation, it will only process requests which either do not have the "Accept" header set or requests which have the header set to "application/jwk-set+json". Signed-off-by: Hunter Madison --- .../strimzi/kafka/oauth/common/HttpUtil.java | 45 ++++++++++++++----- .../validator/JWTSignatureValidator.java | 12 +++-- ...asServerOauthValidatorCallbackHandler.java | 4 +- .../kafka/oauth/server/ServerConfig.java | 5 +++ 4 files changed, 52 insertions(+), 14 deletions(-) diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/HttpUtil.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/HttpUtil.java index 1540c3d0..ed363c91 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/HttpUtil.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/HttpUtil.java @@ -173,7 +173,27 @@ public static T get(URI uri, SSLSocketFactory socketFactory, HostnameVerifie * @throws HttpException A runtime exception when an HTTP response status signals a failed request */ public static T get(URI uri, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String authorization, Class responseType, int connectTimeout, int readTimeout) throws IOException { - return request(uri, "GET", socketFactory, hostnameVerifier, authorization, null, null, responseType, connectTimeout, readTimeout); + return request(uri, "GET", socketFactory, hostnameVerifier, authorization, null, null, responseType, connectTimeout, readTimeout, true); + } + + /** + * Perform HTTP GET request and return the response in the specified type. + * + * @param uri The target url + * @param socketFactory Socket factory to use with https:// url + * @param hostnameVerifier HostnameVerifier to use with https:// url + * @param authorization The Authorization header value + * @param responseType The type to which to convert the response (String or one of the Jackson Mapper types) + * @param connectTimeout Connect timeout in seconds + * @param readTimeout Read timeout in seconds + * @param includeAcceptHeader Determines if Accept application/json is sent to the remote server. + * @return The response as specified by the responseType. + * @param Generic type of the responseType + * @throws IOException A connection, timeout, or network exception that occurs while performing the request + * @throws HttpException A runtime exception when an HTTP response status signals a failed request + */ + public static T get(URI uri, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String authorization, Class responseType, int connectTimeout, int readTimeout, boolean includeAcceptHeader) throws IOException { + return request(uri, "GET", socketFactory, hostnameVerifier, authorization, null, null, responseType, connectTimeout, readTimeout, includeAcceptHeader); } /** @@ -248,7 +268,7 @@ public static T post(URI uri, SSLSocketFactory socketFactory, HostnameVerifi * @throws HttpException A runtime exception when an HTTP response status signals a failed request */ public static T post(URI uri, SSLSocketFactory socketFactory, HostnameVerifier verifier, String authorization, String contentType, String body, Class responseType, int connectTimeout, int readTimeout) throws IOException { - return request(uri, "POST", socketFactory, verifier, authorization, contentType, body, responseType, connectTimeout, readTimeout); + return request(uri, "POST", socketFactory, verifier, authorization, contentType, body, responseType, connectTimeout, readTimeout, true); } /** @@ -311,7 +331,7 @@ public static void put(URI uri, SSLSocketFactory socketFactory, HostnameVerifier * @throws HttpException A runtime exception when an HTTP response status signals a failed request */ public static void put(URI uri, SSLSocketFactory socketFactory, HostnameVerifier verifier, String authorization, String contentType, String body, int connectTimeout, int readTimeout) throws IOException { - request(uri, "PUT", socketFactory, verifier, authorization, contentType, body, null, connectTimeout, readTimeout); + request(uri, "PUT", socketFactory, verifier, authorization, contentType, body, null, connectTimeout, readTimeout, true); } /** @@ -323,7 +343,7 @@ public static void put(URI uri, SSLSocketFactory socketFactory, HostnameVerifier * @throws HttpException A runtime exception when an HTTP response status signals a failed request */ public static void delete(URI uri, String authorization) throws IOException { - request(uri, "DELETE", null, null, authorization, null, null, null, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT); + request(uri, "DELETE", null, null, authorization, null, null, null, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT, true); } /** @@ -337,7 +357,7 @@ public static void delete(URI uri, String authorization) throws IOException { * @throws HttpException A runtime exception when an HTTP response status signals a failed request */ public static void delete(URI uri, SSLSocketFactory socketFactory, HostnameVerifier verifier, String authorization) throws IOException { - request(uri, "DELETE", socketFactory, verifier, authorization, null, null, null, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT); + request(uri, "DELETE", socketFactory, verifier, authorization, null, null, null, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT, true); } /** @@ -353,7 +373,7 @@ public static void delete(URI uri, SSLSocketFactory socketFactory, HostnameVerif * @throws HttpException A runtime exception when an HTTP response status signals a failed request */ public static void delete(URI uri, SSLSocketFactory socketFactory, HostnameVerifier verifier, String authorization, int connectTimeout, int readTimeout) throws IOException { - request(uri, "DELETE", socketFactory, verifier, authorization, null, null, null, connectTimeout, readTimeout); + request(uri, "DELETE", socketFactory, verifier, authorization, null, null, null, connectTimeout, readTimeout, true); } /** @@ -375,7 +395,7 @@ public static void delete(URI uri, SSLSocketFactory socketFactory, HostnameVerif * @throws HttpException A runtime exception when an HTTP response status signals a failed request */ public static T request(URI uri, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String authorization, String contentType, String body, Class responseType) throws IOException { - return request(uri, null, socketFactory, hostnameVerifier, authorization, contentType, body, responseType, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT); + return request(uri, null, socketFactory, hostnameVerifier, authorization, contentType, body, responseType, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT, true); } /** @@ -395,7 +415,7 @@ public static T request(URI uri, SSLSocketFactory socketFactory, HostnameVer * @throws HttpException A runtime exception when an HTTP response status signals a failed request */ public static T request(URI uri, String method, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String authorization, String contentType, String body, Class responseType) throws IOException { - return request(uri, method, socketFactory, hostnameVerifier, authorization, contentType, body, responseType, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT); + return request(uri, method, socketFactory, hostnameVerifier, authorization, contentType, body, responseType, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT, true); } /** @@ -413,13 +433,14 @@ public static T request(URI uri, String method, SSLSocketFactory socketFacto * @param readTimeout Read timeout in seconds * @return The response as specified by the responseType. * @param Generic type of the responseType + * @param includeAcceptHeader Determines if Accept application/json is sent to the remote server. * @throws IOException A connection, timeout, or network exception that occurs while performing the request * @throws HttpException A runtime exception when an HTTP response status signals a failed request */ // Suppressed because of Spotbugs Java 11 bug - https://github.com/spotbugs/spotbugs/issues/756 @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") public static T request(URI uri, String method, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String authorization, - String contentType, String body, Class responseType, int connectTimeout, int readTimeout) throws IOException { + String contentType, String body, Class responseType, int connectTimeout, int readTimeout, boolean includeAcceptHeader) throws IOException { HttpURLConnection con; try { con = (HttpURLConnection) uri.toURL().openConnection(); @@ -443,7 +464,11 @@ public static T request(URI uri, String method, SSLSocketFactory socketFacto if (authorization != null) { con.setRequestProperty("Authorization", authorization); } - con.setRequestProperty("Accept", "application/json"); + + if (includeAcceptHeader) { + con.setRequestProperty("Accept", "application/json"); + } + if (body != null && body.length() > 0) { if (contentType == null) { throw new IllegalArgumentException("contentType must be set when body is not null"); diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java index 7c8ce79e..efc07f0e 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java @@ -92,6 +92,7 @@ public class JWTSignatureValidator implements TokenValidator { private final boolean enableMetrics; private final OAuthMetrics metrics; private final SensorKeyProducer jwksHttpSensorKeyProducer; + private final boolean includeAcceptHeader; /** * Create a new instance. @@ -115,6 +116,7 @@ public class JWTSignatureValidator implements TokenValidator { * @param readTimeoutSeconds The maximum time to wait for response from authorization server after connection has been established and request sent (in seconds) * @param enableMetrics The switch that enables metrics collection * @param failFast Should exception be thrown during initialisation if unable to retrieve JWKS keys + * @param includeAcceptHeader Should we skip sending the Accept header when requesting JWKS keys */ @SuppressWarnings("checkstyle:ParameterNumber") public JWTSignatureValidator(String validatorId, @@ -135,7 +137,8 @@ public JWTSignatureValidator(String validatorId, int connectTimeoutSeconds, int readTimeoutSeconds, boolean enableMetrics, - boolean failFast) { + boolean failFast, + boolean includeAcceptHeader) { if (validatorId == null) { throw new IllegalArgumentException("validatorId == null"); @@ -188,6 +191,8 @@ public JWTSignatureValidator(String validatorId, this.enableMetrics = enableMetrics; this.ignoreKeyUse = ignoreKeyUse; + this.includeAcceptHeader = includeAcceptHeader; + try { metrics = enableMetrics ? Services.getInstance().getMetrics() : null; jwksHttpSensorKeyProducer = new JwksHttpSensorKeyProducer(validatorId, keysUri); @@ -217,7 +222,8 @@ public JWTSignatureValidator(String validatorId, + "\n connectTimeoutSeconds: " + connectTimeoutSeconds + "\n readTimeoutSeconds: " + readTimeoutSeconds + "\n enableMetrics: " + enableMetrics - + "\n failFast: " + failFast); + + "\n failFast: " + failFast + + "\n includeAcceptHeader: " + includeAcceptHeader); } } } @@ -332,7 +338,7 @@ private PublicKey getKeyUnlessStale(String id) { private void fetchKeys() { long requestStartTime = System.currentTimeMillis(); try { - String response = HttpUtil.get(keysUri, socketFactory, hostnameVerifier, null, String.class, connectTimeout, readTimeout); + String response = HttpUtil.get(keysUri, socketFactory, hostnameVerifier, null, String.class, connectTimeout, readTimeout, includeAcceptHeader); addJwksHttpMetricSuccessTime(requestStartTime); Map newCache = new HashMap<>(); diff --git a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java index fa5e9a68..c186fb59 100644 --- a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java +++ b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java @@ -394,6 +394,7 @@ private String setupJWKSValidator(String configId, String jwksUri, String validI int jwksMinPauseSeconds = config.getValueAsInt(ServerConfig.OAUTH_JWKS_REFRESH_MIN_PAUSE_SECONDS, 1); boolean failFast = config.getValueAsBoolean(ServerConfig.OAUTH_FAIL_FAST, true); boolean jwksIgnoreKeyUse = config.getValueAsBoolean(ServerConfig.OAUTH_JWKS_IGNORE_KEY_USE, false); + boolean includeAcceptHeader = config.getValueAsBoolean(ServerConfig.OAUTH_INCLUDE_ACCEPT_HEADER, true); ValidatorKey vkey = new ValidatorKey.JwtValidatorKey( validIssuerUri, @@ -442,7 +443,8 @@ private String setupJWKSValidator(String configId, String jwksUri, String validI connectTimeout, readTimeout, enableMetrics, - failFast); + failFast, + includeAcceptHeader); ConfigurationKey confKey = configId != null ? new ConfigurationKey(configId, vkey) : new ConfigurationKey(vkey.getConfigIdHash(), vkey); validator = Services.getInstance().getValidators().get(confKey, factory); diff --git a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/ServerConfig.java b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/ServerConfig.java index 7dc9ba1a..28e448e6 100644 --- a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/ServerConfig.java +++ b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/ServerConfig.java @@ -83,6 +83,11 @@ public class ServerConfig extends Config { */ public static final String OAUTH_GROUPS_CLAIM_DELIMITER = "oauth.groups.claim.delimiter"; + /** + * "oauth.include.accept.header" + */ + public static final String OAUTH_INCLUDE_ACCEPT_HEADER = "oauth.include.accept.header"; + /** * "oauth.valid.token.type" */ From ce72e6b2e62debd44d2bda59a2ea75d17a7bd8ae Mon Sep 17 00:00:00 2001 From: Hunter Madison Date: Thu, 27 Jul 2023 08:26:29 -0400 Subject: [PATCH 3/8] Add an example making use of the new configuration option. Signed-off-by: Hunter Madison --- examples/kubernetes/README.md | 6 ++ ...a-oauth-single-authz-service-accounts.yaml | 78 +++++++++++++++++++ 2 files changed, 84 insertions(+) create mode 100644 examples/kubernetes/kafka-oauth-single-authz-service-accounts.yaml diff --git a/examples/kubernetes/README.md b/examples/kubernetes/README.md index 9b3b2218..3bada872 100644 --- a/examples/kubernetes/README.md +++ b/examples/kubernetes/README.md @@ -35,6 +35,12 @@ They assume Keycloak is used as an authorization server, with properly configure A single node Kafka cluster with OAuth 2 authentication with OAuth metrics enabled. See [README-metrics.md]() for how to setup this example. +* `kafka-oauth-single-authz-service-accounts.yaml` + + A single node Kafka cluster using [service account tokens](https://kubernetes.io/docs/reference/access-authn-authz/authentication/#service-account-tokens) for authorization and the `simple` authorizer. + It requires that the `kube-root-ca.crt` be copied from its ConfigMap to a Secret: + + kubectl get configmap/kube-root-ca.crt -o=json | jq -r '.data."ca.crt"' | kubectl create secret generic kube-root-ca --from-file=ca.crt=/dev/stdin ### Deploying Keycloak and accessing the Keycloak Admin Console diff --git a/examples/kubernetes/kafka-oauth-single-authz-service-accounts.yaml b/examples/kubernetes/kafka-oauth-single-authz-service-accounts.yaml new file mode 100644 index 00000000..6a8c6871 --- /dev/null +++ b/examples/kubernetes/kafka-oauth-single-authz-service-accounts.yaml @@ -0,0 +1,78 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: oidc-reader + annotations: + kubernetes.io/description: | + A cluster role which allows access to the OpenID Connect endpoints on + the API Server. +rules: + - nonResourceURLs: ["/.well-known/openid-configuration", "/openid/v1/jwks"] + verbs: ["get", "post"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: oidc-reader-binding + annotations: + kubernetes.io/description: | + A role binding which allows for anonymous access to those endpoints + from consuming applications. +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: oidc-reader +subjects: + - apiGroup: rbac.authorization.k8s.io + kind: User + name: "system:anonymous" +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: my-cluster +spec: + kafka: + version: 3.5.0 + replicas: 1 + listeners: + - name: plain + port: 9092 + type: internal + tls: false + authentication: + type: oauth + enablePlain: true + checkAccessTokenType: false + validIssuerUri: https://kubernetes.default.svc.cluster.local + jwksEndpointUri: https://kubernetes.default.svc.cluster.local/openid/v1/jwks + userNameClaim: sub + tlsTrustedCertificates: + - secretName: kube-root-ca + certificate: ca.crt + authorization: + type: simple + superUsers: + - system:serviceaccount:default:default + config: + offsets.topic.replication.factor: 1 + transaction.state.log.replication.factor: 1 + transaction.state.log.min.isr: 1 + jvmOptions: + javaSystemProperties: + - name: "oauth.include.accept.header" + value: "false" + logging: + type: inline + loggers: + log4j.logger.io.strimzi: DEBUG + storage: + type: ephemeral + zookeeper: + replicas: 1 + storage: + type: ephemeral + entityOperator: + topicOperator: {} + userOperator: {} From d5a099ef675934270df461d642da770dadbf7b77 Mon Sep 17 00:00:00 2001 From: Hunter Madison Date: Thu, 27 Jul 2023 09:06:38 -0400 Subject: [PATCH 4/8] Update JWKSKeyUseTest to match the new constructor definition. Signed-off-by: Hunter Madison --- .../io/strimzi/testsuite/oauth/mockoauth/JWKSKeyUseTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JWKSKeyUseTest.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JWKSKeyUseTest.java index e2318c95..d92c8ed1 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JWKSKeyUseTest.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JWKSKeyUseTest.java @@ -93,6 +93,7 @@ private static JWTSignatureValidator createTokenValidator(String validatorId, SS 60, 60, true, + true, true); } } \ No newline at end of file From c55abba9a9e58b61545b0b1ba3da60ddc38e0760 Mon Sep 17 00:00:00 2001 From: Hunter Madison Date: Fri, 28 Jul 2023 11:35:10 -0400 Subject: [PATCH 5/8] Address comments from code review. Signed-off-by: Hunter Madison --- README.md | 4 ++ examples/kubernetes/README.md | 2 +- .../JaasClientOauthLoginCallbackHandler.java | 8 ++- .../io/strimzi/kafka/oauth/common/Config.java | 5 ++ .../kafka/oauth/common/ConfigUtil.java | 18 ++++++ .../strimzi/kafka/oauth/common/HttpUtil.java | 5 +- .../oauth/common/OAuthAuthenticator.java | 59 +++++++++++-------- .../validator/JWTSignatureValidator.java | 2 +- .../OAuthIntrospectionValidator.java | 14 ++++- .../oauth/common/HttpUtilTimeoutTest.java | 2 +- .../oauth/server/authorizer/AuthzConfig.java | 6 ++ .../server/authorizer/Configuration.java | 15 ++++- .../authorizer/KeycloakRBACAuthorizer.java | 2 +- ...authOverPlainValidatorCallbackHandler.java | 2 +- ...asServerOauthValidatorCallbackHandler.java | 12 +++- .../kafka/oauth/server/ServerConfig.java | 5 -- .../oauth/HydraAuthenticationTest.java | 2 +- .../testsuite/oauth/auth/BasicTests.java | 2 +- .../oauth/auth/JwtManipulationTests.java | 2 +- .../oauth/auth/OAuthOverPlainTests.java | 4 +- .../strimzi/testsuite/oauth/authz/Common.java | 4 +- .../testsuite/oauth/authz/FloodTest.java | 2 +- .../oauth/auth/ErrorReportingTests.java | 6 +- .../testsuite/oauth/mockoauth/Common.java | 3 +- .../oauth/mockoauth/JWKSKeyUseTest.java | 3 +- .../mockoauth/KeycloakAuthorizerTest.java | 3 +- .../oauth/mockoauth/PasswordAuthTest.java | 6 +- 27 files changed, 136 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index fe6fb435..90a5a811 100644 --- a/README.md +++ b/README.md @@ -1485,3 +1485,7 @@ The JWT tokens are signed by the authorization server when they are issued. The The client may have obtained a new access token, but the Kafka broker has not yet refreshed the public keys from JWKS endpoint resulting in a mismatch. The Kafka Broker will automatically refresh JWT keys if it encounters an unknown `kid`, and the problem will self-correct in this case, you may just need to repeat your request a few times. It can also happen the other way around. Your existing client may still use the refresh token or the access token issued by the previous authorization server instance while the Kafka broker has already refreshed the keys from JWKS endpoint - resulting in a mismatch between the private key used by authorization server to sign the token, and the published public keys (JWKS endpoint). Since the problem is on the client you may need to configure your client with a newly obtained refresh token, or access token. If you configure your client with clientId and secret, it should autocorrect by itself, you just need to restart it. + +### HTTP 406: Not Acceptable errors. + +For certain servers setting the `Accept` header on outbound requests to `application/json` can cause the identity provider to reject the request. If that is an issue, you can set `oauth.include.accept.header` to `false` and remove the `Accept` header from outbound requests made by the Kafka server or client. \ No newline at end of file diff --git a/examples/kubernetes/README.md b/examples/kubernetes/README.md index 3bada872..2818375f 100644 --- a/examples/kubernetes/README.md +++ b/examples/kubernetes/README.md @@ -37,7 +37,7 @@ They assume Keycloak is used as an authorization server, with properly configure * `kafka-oauth-single-authz-service-accounts.yaml` - A single node Kafka cluster using [service account tokens](https://kubernetes.io/docs/reference/access-authn-authz/authentication/#service-account-tokens) for authorization and the `simple` authorizer. + A single node Kafka cluster using [service account tokens](https://kubernetes.io/docs/reference/access-authn-authz/authentication/#service-account-tokens) for authentication and the `simple` authorizer. It requires that the `kube-root-ca.crt` be copied from its ConfigMap to a Secret: kubectl get configmap/kube-root-ca.crt -o=json | jq -r '.data."ca.crt"' | kubectl create secret generic kube-root-ca --from-file=ca.crt=/dev/stdin diff --git a/oauth-client/src/main/java/io/strimzi/kafka/oauth/client/JaasClientOauthLoginCallbackHandler.java b/oauth-client/src/main/java/io/strimzi/kafka/oauth/client/JaasClientOauthLoginCallbackHandler.java index d8a2b1ca..90c82c6b 100644 --- a/oauth-client/src/main/java/io/strimzi/kafka/oauth/client/JaasClientOauthLoginCallbackHandler.java +++ b/oauth-client/src/main/java/io/strimzi/kafka/oauth/client/JaasClientOauthLoginCallbackHandler.java @@ -82,6 +82,7 @@ public class JaasClientOauthLoginCallbackHandler implements AuthenticateCallback private SensorKeyProducer tokenSensorKeyProducer; private final ClientMetricsHandler authenticatorMetrics = new ClientMetricsHandler(); + private boolean includeAcceptHeader; @Override public void configure(Map configs, String saslMechanism, List jaasConfigEntries) { @@ -125,6 +126,7 @@ public void configure(Map configs, String saslMechanism, List T post(URI uri, SSLSocketFactory socketFactory, HostnameVerifi * @param responseType The type to which to convert the response (String or one of the Jackson Mapper types) * @param connectTimeout Connect timeout in seconds * @param readTimeout Read timeout in seconds + * @param includeAcceptHeader TODO * @return The response as specified by the responseType. * @param Generic type of the responseType * @throws IOException A connection, timeout, or network exception that occurs while performing the request * @throws HttpException A runtime exception when an HTTP response status signals a failed request */ - public static T post(URI uri, SSLSocketFactory socketFactory, HostnameVerifier verifier, String authorization, String contentType, String body, Class responseType, int connectTimeout, int readTimeout) throws IOException { - return request(uri, "POST", socketFactory, verifier, authorization, contentType, body, responseType, connectTimeout, readTimeout, true); + public static T post(URI uri, SSLSocketFactory socketFactory, HostnameVerifier verifier, String authorization, String contentType, String body, Class responseType, int connectTimeout, int readTimeout, boolean includeAcceptHeader) throws IOException { + return request(uri, "POST", socketFactory, verifier, authorization, contentType, body, responseType, connectTimeout, readTimeout, includeAcceptHeader); } /** diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/OAuthAuthenticator.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/OAuthAuthenticator.java index 1f737761..6d301ea2 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/OAuthAuthenticator.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/OAuthAuthenticator.java @@ -72,6 +72,7 @@ public static TokenInfo loginWithAccessToken(String token, boolean isJwt, Princi * @param isJwt If the returned token is expected to be a JWT token * @param principalExtractor A PrincipalExtractor to use to determine the principal (user id) * @param scope A scope to request when authenticating + * @param includeAcceptHeader Should we skip sending the Accept header when making outbound http requests * @return A TokenInfo with access token and information extracted from it * @throws IOException If the request to the authorization server has failed * @throws IllegalStateException If the response from the authorization server could not be handled @@ -79,10 +80,10 @@ public static TokenInfo loginWithAccessToken(String token, boolean isJwt, Princi public static TokenInfo loginWithClientSecret(URI tokenEndpointUrl, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String clientId, String clientSecret, boolean isJwt, - PrincipalExtractor principalExtractor, String scope) throws IOException { + PrincipalExtractor principalExtractor, String scope, boolean includeAcceptHeader) throws IOException { return loginWithClientSecret(tokenEndpointUrl, socketFactory, hostnameVerifier, - clientId, clientSecret, isJwt, principalExtractor, scope, null, HttpUtil.DEFAULT_CONNECT_TIMEOUT, HttpUtil.DEFAULT_READ_TIMEOUT, null, 0, 0); + clientId, clientSecret, isJwt, principalExtractor, scope, null, HttpUtil.DEFAULT_CONNECT_TIMEOUT, HttpUtil.DEFAULT_READ_TIMEOUT, null, 0, 0, includeAcceptHeader); } /** @@ -98,6 +99,7 @@ public static TokenInfo loginWithClientSecret(URI tokenEndpointUrl, SSLSocketFac * @param principalExtractor A PrincipalExtractor to use to determine the principal (user id) * @param scope A scope to request when authenticating * @param audience An 'audience' attribute to set on the request when authenticating + * @param includeAcceptHeader Should we skip sending the Accept header when making outbound http requests * @return A TokenInfo with access token and information extracted from it * @throws IOException If the request to the authorization server has failed * @throws IllegalStateException If the response from the authorization server could not be handled @@ -105,10 +107,10 @@ public static TokenInfo loginWithClientSecret(URI tokenEndpointUrl, SSLSocketFac public static TokenInfo loginWithClientSecret(URI tokenEndpointUrl, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String clientId, String clientSecret, boolean isJwt, - PrincipalExtractor principalExtractor, String scope, String audience) throws IOException { + PrincipalExtractor principalExtractor, String scope, String audience, boolean includeAcceptHeader) throws IOException { return loginWithClientSecret(tokenEndpointUrl, socketFactory, hostnameVerifier, - clientId, clientSecret, isJwt, principalExtractor, scope, audience, HttpUtil.DEFAULT_CONNECT_TIMEOUT, HttpUtil.DEFAULT_READ_TIMEOUT, null, 0, 0); + clientId, clientSecret, isJwt, principalExtractor, scope, audience, HttpUtil.DEFAULT_CONNECT_TIMEOUT, HttpUtil.DEFAULT_READ_TIMEOUT, null, 0, 0, includeAcceptHeader); } /** @@ -129,6 +131,7 @@ public static TokenInfo loginWithClientSecret(URI tokenEndpointUrl, SSLSocketFac * @param metrics A MetricsHandler object to receive metrics collection callbacks * @param retries A maximum number of retries if the request fails due to network, or unexpected response status * @param retryPauseMillis A pause between consecutive requests + * @param includeAcceptHeader Should we skip sending the Accept header when making outbound http requests * @return A TokenInfo with access token and information extracted from it * @throws IOException If the request to the authorization server has failed * @throws IllegalStateException If the response from the authorization server could not be handled @@ -138,7 +141,7 @@ public static TokenInfo loginWithClientSecret(URI tokenEndpointUrl, SSLSocketFac HostnameVerifier hostnameVerifier, String clientId, String clientSecret, boolean isJwt, PrincipalExtractor principalExtractor, String scope, String audience, - int connectTimeout, int readTimeout, MetricsHandler metrics, int retries, long retryPauseMillis) throws IOException { + int connectTimeout, int readTimeout, MetricsHandler metrics, int retries, long retryPauseMillis, boolean includeAcceptHeader) throws IOException { if (log.isDebugEnabled()) { log.debug("loginWithClientSecret() - tokenEndpointUrl: {}, clientId: {}, clientSecret: {}, scope: {}, audience: {}, connectTimeout: {}, readTimeout: {}, retries: {}, retryPauseMillis: {}", tokenEndpointUrl, clientId, mask(clientSecret), scope, audience, connectTimeout, readTimeout, retries, retryPauseMillis); @@ -161,7 +164,7 @@ public static TokenInfo loginWithClientSecret(URI tokenEndpointUrl, SSLSocketFac body.append("&audience=").append(urlencode(audience)); } - return post(tokenEndpointUrl, socketFactory, hostnameVerifier, authorization, body.toString(), isJwt, principalExtractor, connectTimeout, readTimeout, metrics, retries, retryPauseMillis); + return post(tokenEndpointUrl, socketFactory, hostnameVerifier, authorization, body.toString(), isJwt, principalExtractor, connectTimeout, readTimeout, metrics, retries, retryPauseMillis, includeAcceptHeader); } /** @@ -179,6 +182,7 @@ public static TokenInfo loginWithClientSecret(URI tokenEndpointUrl, SSLSocketFac * @param principalExtractor A PrincipalExtractor to use to determine the principal (user id) * @param scope A scope to request when authenticating * @param audience An 'audience' attribute to set on the request when authenticating + * @param includeAcceptHeader Should we skip sending the Accept header when making outbound http requests * @return A TokenInfo with access token and information extracted from it * @throws IOException If the request to the authorization server has failed * @throws IllegalStateException If the response from the authorization server could not be handled @@ -187,10 +191,10 @@ public static TokenInfo loginWithPassword(URI tokenEndpointUrl, SSLSocketFactory HostnameVerifier hostnameVerifier, String username, String password, String clientId, String clientSecret, boolean isJwt, - PrincipalExtractor principalExtractor, String scope, String audience) throws IOException { + PrincipalExtractor principalExtractor, String scope, String audience, boolean includeAcceptHeader) throws IOException { return loginWithPassword(tokenEndpointUrl, socketFactory, hostnameVerifier, - username, password, clientId, clientSecret, isJwt, principalExtractor, scope, audience, HttpUtil.DEFAULT_CONNECT_TIMEOUT, HttpUtil.DEFAULT_READ_TIMEOUT, null, 0, 0); + username, password, clientId, clientSecret, isJwt, principalExtractor, scope, audience, HttpUtil.DEFAULT_CONNECT_TIMEOUT, HttpUtil.DEFAULT_READ_TIMEOUT, null, 0, 0, includeAcceptHeader); } /** @@ -212,6 +216,7 @@ public static TokenInfo loginWithPassword(URI tokenEndpointUrl, SSLSocketFactory * @param readTimeout A read timeout in seconds * @param retries A maximum number of retries if the request fails due to network, or unexpected response status * @param retryPauseMillis A pause between consecutive requests + * @param includeAcceptHeader Should we skip sending the Accept header when making outbound http requests * @return A TokenInfo with access token and information extracted from it * @throws IOException If the request to the authorization server has failed * @throws IllegalStateException If the response from the authorization server could not be handled @@ -220,10 +225,10 @@ public static TokenInfo loginWithPassword(URI tokenEndpointUrl, SSLSocketFactory HostnameVerifier hostnameVerifier, String username, String password, String clientId, String clientSecret, boolean isJwt, - PrincipalExtractor principalExtractor, String scope, String audience, int connectTimeout, int readTimeout, int retries, long retryPauseMillis) throws IOException { + PrincipalExtractor principalExtractor, String scope, String audience, int connectTimeout, int readTimeout, int retries, long retryPauseMillis, boolean includeAcceptHeader) throws IOException { return loginWithPassword(tokenEndpointUrl, socketFactory, hostnameVerifier, - username, password, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, null, retries, retryPauseMillis); + username, password, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, null, retries, retryPauseMillis, includeAcceptHeader); } /** @@ -246,6 +251,7 @@ public static TokenInfo loginWithPassword(URI tokenEndpointUrl, SSLSocketFactory * @param metrics A MetricsHandler object to receive metrics collection callbacks * @param retries A maximum number of retries if the request fails due to network, or unexpected response status * @param retryPauseMillis A pause between consecutive requests + * @param includeAcceptHeader Should we skip sending the Accept header when making outbound http requests * @return A TokenInfo with access token and information extracted from it * @throws IOException If the request to the authorization server has failed * @throws IllegalStateException If the response from the authorization server could not be handled @@ -256,7 +262,7 @@ public static TokenInfo loginWithPassword(URI tokenEndpointUrl, SSLSocketFactory String username, String password, String clientId, String clientSecret, boolean isJwt, PrincipalExtractor principalExtractor, String scope, String audience, - int connectTimeout, int readTimeout, MetricsHandler metrics, int retries, long retryPauseMillis) throws IOException { + int connectTimeout, int readTimeout, MetricsHandler metrics, int retries, long retryPauseMillis, boolean includeAcceptHeader) throws IOException { if (log.isDebugEnabled()) { log.debug("loginWithPassword() - tokenEndpointUrl: {}, username: {}, password: {}, clientId: {}, clientSecret: {}, scope: {}, audience: {}, connectTimeout: {}, readTimeout: {}, retries: {}, retryPauseMillis: {}", @@ -287,7 +293,7 @@ public static TokenInfo loginWithPassword(URI tokenEndpointUrl, SSLSocketFactory body.append("&audience=").append(urlencode(audience)); } - return post(tokenEndpointUrl, socketFactory, hostnameVerifier, authorization, body.toString(), isJwt, principalExtractor, connectTimeout, readTimeout, metrics, retries, retryPauseMillis); + return post(tokenEndpointUrl, socketFactory, hostnameVerifier, authorization, body.toString(), isJwt, principalExtractor, connectTimeout, readTimeout, metrics, retries, retryPauseMillis, includeAcceptHeader); } /** @@ -303,6 +309,7 @@ public static TokenInfo loginWithPassword(URI tokenEndpointUrl, SSLSocketFactory * @param isJwt If the returned token is expected to be a JWT token * @param principalExtractor A PrincipalExtractor to use to determine the principal (user id) * @param scope A scope to request when authenticating + * @param includeAcceptHeader Should we skip sending the Accept header when making outbound http requests * @return A TokenInfo with access token and information extracted from it * @throws IOException If the request to the authorization server has failed * @throws IllegalStateException If the response from the authorization server could not be handled @@ -310,10 +317,10 @@ public static TokenInfo loginWithPassword(URI tokenEndpointUrl, SSLSocketFactory public static TokenInfo loginWithRefreshToken(URI tokenEndpointUrl, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String refreshToken, String clientId, String clientSecret, boolean isJwt, - PrincipalExtractor principalExtractor, String scope) throws IOException { + PrincipalExtractor principalExtractor, String scope, boolean includeAcceptHeader) throws IOException { return loginWithRefreshToken(tokenEndpointUrl, socketFactory, hostnameVerifier, - refreshToken, clientId, clientSecret, isJwt, principalExtractor, scope, null); + refreshToken, clientId, clientSecret, isJwt, principalExtractor, scope, null, includeAcceptHeader); } /** @@ -330,6 +337,7 @@ public static TokenInfo loginWithRefreshToken(URI tokenEndpointUrl, SSLSocketFac * @param principalExtractor A PrincipalExtractor to use to determine the principal (user id) * @param scope A scope to request when authenticating * @param audience An 'audience' attribute to set on the request when authenticating + * @param includeAcceptHeader Should we skip sending the Accept header when making outbound http requests * @return A TokenInfo with access token and information extracted from it * @throws IOException If the request to the authorization server has failed * @throws IllegalStateException If the response from the authorization server could not be handled @@ -337,9 +345,9 @@ public static TokenInfo loginWithRefreshToken(URI tokenEndpointUrl, SSLSocketFac public static TokenInfo loginWithRefreshToken(URI tokenEndpointUrl, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String refreshToken, String clientId, String clientSecret, boolean isJwt, - PrincipalExtractor principalExtractor, String scope, String audience) throws IOException { + PrincipalExtractor principalExtractor, String scope, String audience, boolean includeAcceptHeader) throws IOException { return loginWithRefreshToken(tokenEndpointUrl, socketFactory, hostnameVerifier, - refreshToken, clientId, clientSecret, isJwt, principalExtractor, scope, audience, HttpUtil.DEFAULT_CONNECT_TIMEOUT, HttpUtil.DEFAULT_READ_TIMEOUT, 0, 0); + refreshToken, clientId, clientSecret, isJwt, principalExtractor, scope, audience, HttpUtil.DEFAULT_CONNECT_TIMEOUT, HttpUtil.DEFAULT_READ_TIMEOUT, 0, 0, includeAcceptHeader); } /** @@ -358,6 +366,7 @@ public static TokenInfo loginWithRefreshToken(URI tokenEndpointUrl, SSLSocketFac * @param readTimeout A read timeout in seconds * @param retries A maximum number of retries if the request fails due to network, or unexpected response status * @param retryPauseMillis A pause between consecutive requests + * @param includeAcceptHeader Should we skip sending the Accept header when making outbound http requests * @return A TokenInfo with access token and information extracted from it * @throws IOException If the request to the authorization server has failed * @throws IllegalStateException If the response from the authorization server could not be handled @@ -366,8 +375,8 @@ public static TokenInfo loginWithRefreshToken(URI tokenEndpointUrl, SSLSocketFac HostnameVerifier hostnameVerifier, String refreshToken, String clientId, String clientSecret, boolean isJwt, PrincipalExtractor principalExtractor, String scope, String audience, - int connectTimeout, int readTimeout, int retries, long retryPauseMillis) throws IOException { - return loginWithRefreshToken(tokenEndpointUrl, socketFactory, hostnameVerifier, refreshToken, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, null, retries, retryPauseMillis); + int connectTimeout, int readTimeout, int retries, long retryPauseMillis, boolean includeAcceptHeader) throws IOException { + return loginWithRefreshToken(tokenEndpointUrl, socketFactory, hostnameVerifier, refreshToken, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, null, retries, retryPauseMillis, includeAcceptHeader); } /** @@ -387,6 +396,7 @@ public static TokenInfo loginWithRefreshToken(URI tokenEndpointUrl, SSLSocketFac * @param metrics A MetricsHandler object to receive metrics collection callbacks * @param retries A maximum number of retries if the request fails due to network, or unexpected response status * @param retryPauseMillis A pause between consecutive requests + * @param includeAcceptHeader Should we skip sending the Accept header when making outbound http requests * @return A TokenInfo with access token and information extracted from it * @throws IOException If the request to the authorization server has failed * @throws IllegalStateException If the response from the authorization server could not be handled @@ -395,10 +405,10 @@ public static TokenInfo loginWithRefreshToken(URI tokenEndpointUrl, SSLSocketFac HostnameVerifier hostnameVerifier, String refreshToken, String clientId, String clientSecret, boolean isJwt, PrincipalExtractor principalExtractor, String scope, String audience, - int connectTimeout, int readTimeout, MetricsHandler metrics, int retries, long retryPauseMillis) throws IOException { + int connectTimeout, int readTimeout, MetricsHandler metrics, int retries, long retryPauseMillis, boolean includeAcceptHeader) throws IOException { if (log.isDebugEnabled()) { - log.debug("loginWithRefreshToken() - tokenEndpointUrl: {}, refreshToken: {}, clientId: {}, clientSecret: {}, scope: {}, audience: {}, connectTimeout: {}, readTimeout: {}, retries: {}, retryPauseMillis: {}", - tokenEndpointUrl, refreshToken, clientId, mask(clientSecret), scope, audience, connectTimeout, readTimeout, retries, retryPauseMillis); + log.debug("loginWithRefreshToken() - tokenEndpointUrl: {}, refreshToken: {}, clientId: {}, clientSecret: {}, scope: {}, audience: {}, connectTimeout: {}, readTimeout: {}, retries: {}, retryPauseMillis: {}, includeAcceptHeader: {}", + tokenEndpointUrl, refreshToken, clientId, mask(clientSecret), scope, audience, connectTimeout, readTimeout, retries, retryPauseMillis, includeAcceptHeader); } if (refreshToken == null) { @@ -420,12 +430,12 @@ public static TokenInfo loginWithRefreshToken(URI tokenEndpointUrl, SSLSocketFac body.append("&audience=").append(urlencode(audience)); } - return post(tokenEndpointUrl, socketFactory, hostnameVerifier, authorization, body.toString(), isJwt, principalExtractor, connectTimeout, readTimeout, metrics, retries, retryPauseMillis); + return post(tokenEndpointUrl, socketFactory, hostnameVerifier, authorization, body.toString(), isJwt, principalExtractor, connectTimeout, readTimeout, metrics, retries, retryPauseMillis, includeAcceptHeader); } private static TokenInfo post(URI tokenEndpointUri, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String authorization, String body, boolean isJwt, PrincipalExtractor principalExtractor, - int connectTimeout, int readTimeout, MetricsHandler metrics, int retries, long retryPauseMillis) throws IOException { + int connectTimeout, int readTimeout, MetricsHandler metrics, int retries, long retryPauseMillis, boolean includeAcceptHeader) throws IOException { JsonNode result; try { @@ -438,7 +448,8 @@ private static TokenInfo post(URI tokenEndpointUri, SSLSocketFactory socketFacto body, JsonNode.class, connectTimeout, - readTimeout) + readTimeout, + includeAcceptHeader) ); } catch (Throwable e) { diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java index efc07f0e..248e48bc 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java @@ -116,7 +116,7 @@ public class JWTSignatureValidator implements TokenValidator { * @param readTimeoutSeconds The maximum time to wait for response from authorization server after connection has been established and request sent (in seconds) * @param enableMetrics The switch that enables metrics collection * @param failFast Should exception be thrown during initialisation if unable to retrieve JWKS keys - * @param includeAcceptHeader Should we skip sending the Accept header when requesting JWKS keys + * @param includeAcceptHeader Should we skip sending the Accept header when making outbound http requests */ @SuppressWarnings("checkstyle:ParameterNumber") public JWTSignatureValidator(String validatorId, diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/OAuthIntrospectionValidator.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/OAuthIntrospectionValidator.java index e1dcf886..03ade42e 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/OAuthIntrospectionValidator.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/OAuthIntrospectionValidator.java @@ -78,6 +78,7 @@ public class OAuthIntrospectionValidator implements TokenValidator { private final MetricsHandler userInfoMetricsHandler; private final SensorKeyProducer introspectHttpSensorKeyProducer; private final SensorKeyProducer userInfoHttpSensorKeyProducer; + private final boolean includeAcceptHeader; /** * Create a new instance. @@ -101,6 +102,7 @@ public class OAuthIntrospectionValidator implements TokenValidator { * @param enableMetrics The switch that enables metrics collection * @param retries Maximum number of retries if request to the authorization server fails (0 means no retries) * @param retryPauseMillis Time to pause before retrying the request to the authorization server + * @param includeAcceptHeader Should we skip sending the Accept header when making outbound http requests */ @SuppressWarnings("checkstyle:ParameterNumber") public OAuthIntrospectionValidator(String id, @@ -121,7 +123,8 @@ public OAuthIntrospectionValidator(String id, int readTimeoutSeconds, boolean enableMetrics, int retries, - long retryPauseMillis) { + long retryPauseMillis, + boolean includeAcceptHeader) { this.validatorId = checkValidatorId(id); @@ -159,6 +162,8 @@ public OAuthIntrospectionValidator(String id, introspectHttpSensorKeyProducer = new IntrospectHttpSensorKeyProducer(validatorId, introspectionURI); userInfoHttpSensorKeyProducer = userInfoURI != null ? new UserInfoHttpSensorKeyProducer(validatorId, userInfoURI) : null; + this.includeAcceptHeader = includeAcceptHeader; + if (log.isDebugEnabled()) { log.debug("Configured OAuthIntrospectionValidator:" + "\n id: " + id @@ -180,6 +185,7 @@ public OAuthIntrospectionValidator(String id, + "\n enableMetrics: " + enableMetrics + "\n retries: " + retries + "\n retryPauseMillis: " + retryPauseMillis + + "\n includeAcceptHeader: " + includeAcceptHeader ); } } @@ -290,7 +296,8 @@ public TokenInfo validate(String token) { body.toString(), JsonNode.class, connectTimeoutSeconds, - readTimeoutSeconds) + readTimeoutSeconds, + includeAcceptHeader) ); } catch (Throwable e) { Throwable cause = e; @@ -380,7 +387,8 @@ private JsonNode getUserInfoEndpointResponse(String token) { authorization, JsonNode.class, connectTimeoutSeconds, - readTimeoutSeconds) + readTimeoutSeconds, + includeAcceptHeader) ); } catch (Throwable e) { diff --git a/oauth-common/src/test/java/io/strimzi/kafka/oauth/common/HttpUtilTimeoutTest.java b/oauth-common/src/test/java/io/strimzi/kafka/oauth/common/HttpUtilTimeoutTest.java index 22ee32ef..849f0c05 100644 --- a/oauth-common/src/test/java/io/strimzi/kafka/oauth/common/HttpUtilTimeoutTest.java +++ b/oauth-common/src/test/java/io/strimzi/kafka/oauth/common/HttpUtilTimeoutTest.java @@ -117,7 +117,7 @@ public void doTest() throws Exception { try { OAuthIntrospectionValidator validator = new OAuthIntrospectionValidator("test", "http://192.168.255.255:26309", null, null, new PrincipalExtractor(), null, null, "http://172.0.0.13/", null, "Bearer", - "kafka", "kafka-secret", null, null, timeout, timeout, false, 0, 0); + "kafka", "kafka-secret", null, null, timeout, timeout, false, 0, 0, true); start = System.currentTimeMillis(); validator.validate("token"); diff --git a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/AuthzConfig.java b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/AuthzConfig.java index e799005e..85904deb 100644 --- a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/AuthzConfig.java +++ b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/AuthzConfig.java @@ -109,6 +109,12 @@ public class AuthzConfig extends Config { */ public static final String STRIMZI_AUTHORIZATION_REUSE_GRANTS = "strimzi.authorization.reuse.grants"; + /** + * Reuse cached grants for the same principal (user id) possibly fetched by another session using a different access token. + */ + public static final String STRIMZI_OAUTH_INCLUDE_ACCEPT_HEADER = "strimzi.oauth.include.accept.header"; + + /** * Create a new instance * diff --git a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/Configuration.java b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/Configuration.java index 96e4302f..476af7cc 100644 --- a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/Configuration.java +++ b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/Configuration.java @@ -53,6 +53,7 @@ import static io.strimzi.kafka.oauth.server.authorizer.AuthzConfig.STRIMZI_AUTHORIZATION_SSL_TRUSTSTORE_PASSWORD; import static io.strimzi.kafka.oauth.server.authorizer.AuthzConfig.STRIMZI_AUTHORIZATION_SSL_TRUSTSTORE_TYPE; import static io.strimzi.kafka.oauth.server.authorizer.AuthzConfig.STRIMZI_AUTHORIZATION_TOKEN_ENDPOINT_URI; +import static io.strimzi.kafka.oauth.server.authorizer.AuthzConfig.STRIMZI_OAUTH_INCLUDE_ACCEPT_HEADER; /** * The classes used to parse and store Authorizer configuration. @@ -92,6 +93,7 @@ public class Configuration { private URI tokenEndpointUrl; private int connectTimeoutSeconds; private int readTimeoutSeconds; + private boolean includeAcceptHeader; /** * Create a new Configuration instance @@ -151,6 +153,7 @@ public class Configuration { reuseGrants = authzConfig.getValueAsBoolean(STRIMZI_AUTHORIZATION_REUSE_GRANTS, true); + includeAcceptHeader = ConfigUtil.getDefaultBooleanConfigWithFallbackLookup(authzConfig, STRIMZI_OAUTH_INCLUDE_ACCEPT_HEADER, Config.OAUTH_INCLUDE_ACCEPT_HEADER, true); configureHttpRetries(authzConfig); configureMetrics(authzConfig); @@ -305,7 +308,9 @@ static AuthzConfig convertToCommonConfig(Map configs) { STRIMZI_AUTHORIZATION_READ_TIMEOUT_SECONDS, OAUTH_READ_TIMEOUT_SECONDS, STRIMZI_AUTHORIZATION_ENABLE_METRICS, - OAUTH_ENABLE_METRICS + OAUTH_ENABLE_METRICS, + STRIMZI_OAUTH_INCLUDE_ACCEPT_HEADER, + Config.OAUTH_INCLUDE_ACCEPT_HEADER }; // Copy over the keys @@ -420,6 +425,10 @@ int getReadTimeoutSeconds() { return configMap; } + boolean includeAcceptHeader() { + return includeAcceptHeader; + } + private static class Log { Level level; String message; @@ -455,6 +464,7 @@ public boolean equals(Object o) { enableMetrics == that.enableMetrics && connectTimeoutSeconds == that.connectTimeoutSeconds && readTimeoutSeconds == that.readTimeoutSeconds && + includeAcceptHeader == that.includeAcceptHeader && Objects.equals(clientId, that.clientId) && Objects.equals(clusterName, that.clusterName) && Objects.equals(truststore, that.truststore) && @@ -489,6 +499,7 @@ public int hashCode() { enableMetrics, tokenEndpointUrl, connectTimeoutSeconds, - readTimeoutSeconds); + readTimeoutSeconds, + includeAcceptHeader); } } diff --git a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/KeycloakRBACAuthorizer.java b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/KeycloakRBACAuthorizer.java index 40e6de03..93ae45d1 100644 --- a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/KeycloakRBACAuthorizer.java +++ b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/KeycloakRBACAuthorizer.java @@ -550,7 +550,7 @@ private JsonNode fetchAuthorizationGrantsOnce(String token) { try { response = post(configuration.getTokenEndpointUrl(), socketFactory, hostnameVerifier, authorization, - "application/x-www-form-urlencoded", body.toString(), JsonNode.class, configuration.getConnectTimeoutSeconds(), configuration.getReadTimeoutSeconds()); + "application/x-www-form-urlencoded", body.toString(), JsonNode.class, configuration.getConnectTimeoutSeconds(), configuration.getReadTimeoutSeconds(), configuration.includeAcceptHeader()); addGrantsHttpMetricSuccessTime(startTime); } catch (HttpException e) { addGrantsHttpMetricErrorTime(e, startTime); diff --git a/oauth-server-plain/src/main/java/io/strimzi/kafka/oauth/server/plain/JaasServerOauthOverPlainValidatorCallbackHandler.java b/oauth-server-plain/src/main/java/io/strimzi/kafka/oauth/server/plain/JaasServerOauthOverPlainValidatorCallbackHandler.java index fb2964b9..614cacc2 100644 --- a/oauth-server-plain/src/main/java/io/strimzi/kafka/oauth/server/plain/JaasServerOauthOverPlainValidatorCallbackHandler.java +++ b/oauth-server-plain/src/main/java/io/strimzi/kafka/oauth/server/plain/JaasServerOauthOverPlainValidatorCallbackHandler.java @@ -247,7 +247,7 @@ private void authenticate(String username, String password) throws UnsupportedCa checkUsernameMatch = true; } else if (tokenEndpointUri != null) { accessToken = OAuthAuthenticator.loginWithClientSecret(tokenEndpointUri, getSocketFactory(), getVerifier(), - username, password, isJwt(), getPrincipalExtractor(), scope, audience, getConnectTimeout(), getReadTimeout(), authMetrics, getRetries(), getRetryPauseMillis()) + username, password, isJwt(), getPrincipalExtractor(), scope, audience, getConnectTimeout(), getReadTimeout(), authMetrics, getRetries(), getRetryPauseMillis(), includeAcceptHeader()) .token(); } else { throw new ValidationException("Empty password where access token was expected"); diff --git a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java index c186fb59..0a42469c 100644 --- a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java +++ b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java @@ -214,6 +214,7 @@ public class JaasServerOauthValidatorCallbackHandler implements AuthenticateCall private long retryPauseMillis; private boolean enableMetrics; + private boolean includeAcceptHeader; private OAuthMetrics metrics; protected SensorKeyProducer validationSensorKeyProducer; @@ -256,6 +257,7 @@ public void delegatedConfigure(Map configs, String saslMechanism, Lis boolean checkTokenType = isCheckAccessTokenType(config); boolean checkAudience = config.getValueAsBoolean(ServerConfig.OAUTH_CHECK_AUDIENCE, false); + includeAcceptHeader = config.getValueAsBoolean(Config.OAUTH_INCLUDE_ACCEPT_HEADER, true); String usernameClaim = config.getValue(Config.OAUTH_USERNAME_CLAIM); String fallbackUsernameClaim = config.getValue(Config.OAUTH_FALLBACK_USERNAME_CLAIM); @@ -328,6 +330,7 @@ private String setupIntrospectionValidator(String configId, String validIssuerUr String introspectionEndpoint = config.getValue(ServerConfig.OAUTH_INTROSPECTION_ENDPOINT_URI); String userInfoEndpoint = config.getValue(ServerConfig.OAUTH_USERINFO_ENDPOINT_URI); String validTokenType = config.getValue(ServerConfig.OAUTH_VALID_TOKEN_TYPE); + boolean includeAcceptHeader = config.getValueAsBoolean(Config.OAUTH_INCLUDE_ACCEPT_HEADER, true); ValidatorKey vkey = new ValidatorKey.IntrospectionValidatorKey( validIssuerUri, @@ -375,7 +378,8 @@ private String setupIntrospectionValidator(String configId, String validIssuerUr readTimeout, enableMetrics, retries, - retryPauseMillis); + retryPauseMillis, + includeAcceptHeader); ConfigurationKey confKey = configId != null ? new ConfigurationKey(configId, vkey) : new ConfigurationKey(vkey.getConfigIdHash(), vkey); validator = Services.getInstance().getValidators().get(confKey, factory); @@ -394,7 +398,7 @@ private String setupJWKSValidator(String configId, String jwksUri, String validI int jwksMinPauseSeconds = config.getValueAsInt(ServerConfig.OAUTH_JWKS_REFRESH_MIN_PAUSE_SECONDS, 1); boolean failFast = config.getValueAsBoolean(ServerConfig.OAUTH_FAIL_FAST, true); boolean jwksIgnoreKeyUse = config.getValueAsBoolean(ServerConfig.OAUTH_JWKS_IGNORE_KEY_USE, false); - boolean includeAcceptHeader = config.getValueAsBoolean(ServerConfig.OAUTH_INCLUDE_ACCEPT_HEADER, true); + boolean includeAcceptHeader = config.getValueAsBoolean(Config.OAUTH_INCLUDE_ACCEPT_HEADER, true); ValidatorKey vkey = new ValidatorKey.JwtValidatorKey( validIssuerUri, @@ -745,6 +749,10 @@ protected long getRetryPauseMillis() { return retryPauseMillis; } + protected Boolean includeAcceptHeader() { + return includeAcceptHeader; + } + private void addValidationMetricSuccessTime(long startTimeMs) { if (enableMetrics) { metrics.addTime(validationSensorKeyProducer.successKey(), System.currentTimeMillis() - startTimeMs); diff --git a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/ServerConfig.java b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/ServerConfig.java index 28e448e6..7dc9ba1a 100644 --- a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/ServerConfig.java +++ b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/ServerConfig.java @@ -83,11 +83,6 @@ public class ServerConfig extends Config { */ public static final String OAUTH_GROUPS_CLAIM_DELIMITER = "oauth.groups.claim.delimiter"; - /** - * "oauth.include.accept.header" - */ - public static final String OAUTH_INCLUDE_ACCEPT_HEADER = "oauth.include.accept.header"; - /** * "oauth.valid.token.type" */ diff --git a/testsuite/hydra-test/src/test/java/io/strimzi/testsuite/oauth/HydraAuthenticationTest.java b/testsuite/hydra-test/src/test/java/io/strimzi/testsuite/oauth/HydraAuthenticationTest.java index e717f471..ab8dca8f 100644 --- a/testsuite/hydra-test/src/test/java/io/strimzi/testsuite/oauth/HydraAuthenticationTest.java +++ b/testsuite/hydra-test/src/test/java/io/strimzi/testsuite/oauth/HydraAuthenticationTest.java @@ -128,7 +128,7 @@ public void opaqueAccessTokenWithIntrospectValidationTest(String topic) throws E // first, request access token using client id and secret TokenInfo info = OAuthAuthenticator.loginWithClientSecret(URI.create(tokenEndpointUri), ConfigUtil.createSSLFactory(new ClientConfig()), - null, clientId, clientSecret, true, null, null); + null, clientId, clientSecret, true, null, null, true); Map oauthConfig = new HashMap<>(); oauthConfig.put(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, tokenEndpointUri); diff --git a/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/BasicTests.java b/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/BasicTests.java index 6f7c6dd1..d545beb4 100644 --- a/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/BasicTests.java +++ b/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/BasicTests.java @@ -271,7 +271,7 @@ void accessTokenWithIntrospection() throws Exception { final String clientSecret = "kafka-producer-client-secret"; // First, request access token using client id and secret - TokenInfo info = loginWithClientSecret(URI.create(tokenEndpointUri), null, null, clientId, clientSecret, true, null, null); + TokenInfo info = loginWithClientSecret(URI.create(tokenEndpointUri), null, null, clientId, clientSecret, true, null, null, true); Map oauthConfig = new HashMap<>(); oauthConfig.put(ClientConfig.OAUTH_ACCESS_TOKEN, info.token()); diff --git a/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/JwtManipulationTests.java b/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/JwtManipulationTests.java index 7b966e68..732419b9 100644 --- a/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/JwtManipulationTests.java +++ b/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/JwtManipulationTests.java @@ -224,7 +224,7 @@ private String getOriginalToken() throws IOException { // first, request access token using client id and secret TokenInfo info = OAuthAuthenticator.loginWithClientSecret(URI.create(tokenEndpointUri), null, null, - "kafka-producer-client", "kafka-producer-client-secret", true, null, null); + "kafka-producer-client", "kafka-producer-client-secret", true, null, null, true); return info.token(); } diff --git a/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/OAuthOverPlainTests.java b/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/OAuthOverPlainTests.java index 5006d193..eaf3f7c5 100644 --- a/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/OAuthOverPlainTests.java +++ b/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/OAuthOverPlainTests.java @@ -64,7 +64,7 @@ static void accessTokenOverPlainWithClientCredentialsDisabled() throws Exception // first, request access token using client id and secret TokenInfo info = loginWithClientSecret(URI.create(tokenEndpointUri), null, null, - "team-a-client", "team-a-client-secret", true, null, null); + "team-a-client", "team-a-client-secret", true, null, null, true); Map plainConfig = new HashMap<>(); plainConfig.put("username", "service-account-team-a-client"); @@ -218,7 +218,7 @@ static void accessTokenOverPlainWithIntrospection() throws Exception { // first, request access token using client id and secret TokenInfo info = loginWithClientSecret(URI.create(tokenEndpointUri), null, null, - "team-a-client", "team-a-client-secret", true, null, null); + "team-a-client", "team-a-client-secret", true, null, null, true); Map plainConfig = new HashMap<>(); plainConfig.put("username", "service-account-team-a-client"); diff --git a/testsuite/keycloak-authz-tests/src/main/java/io/strimzi/testsuite/oauth/authz/Common.java b/testsuite/keycloak-authz-tests/src/main/java/io/strimzi/testsuite/oauth/authz/Common.java index 12799571..8475dcab 100644 --- a/testsuite/keycloak-authz-tests/src/main/java/io/strimzi/testsuite/oauth/authz/Common.java +++ b/testsuite/keycloak-authz-tests/src/main/java/io/strimzi/testsuite/oauth/authz/Common.java @@ -93,9 +93,9 @@ static void produceToTopic(String topic, Properties config) throws Exception { void authenticateAllActors() throws IOException { tokens.put(TEAM_A_CLIENT, loginWithClientSecret(URI.create(TOKEN_ENDPOINT_URI), null, null, - TEAM_A_CLIENT, TEAM_A_CLIENT + "-secret", true, null, null).token()); + TEAM_A_CLIENT, TEAM_A_CLIENT + "-secret", true, null, null, true).token()); tokens.put(TEAM_B_CLIENT, loginWithClientSecret(URI.create(TOKEN_ENDPOINT_URI), null, null, - TEAM_B_CLIENT, TEAM_B_CLIENT + "-secret", true, null, null).token()); + TEAM_B_CLIENT, TEAM_B_CLIENT + "-secret", true, null, null, true).token()); tokens.put(BOB, loginWithUsernamePassword(URI.create(TOKEN_ENDPOINT_URI), BOB, BOB + "-password", "kafka-cli")); tokens.put(ZERO, loginWithUsernamePassword(URI.create(TOKEN_ENDPOINT_URI), diff --git a/testsuite/keycloak-authz-tests/src/main/java/io/strimzi/testsuite/oauth/authz/FloodTest.java b/testsuite/keycloak-authz-tests/src/main/java/io/strimzi/testsuite/oauth/authz/FloodTest.java index a345ddae..efb7b6f6 100644 --- a/testsuite/keycloak-authz-tests/src/main/java/io/strimzi/testsuite/oauth/authz/FloodTest.java +++ b/testsuite/keycloak-authz-tests/src/main/java/io/strimzi/testsuite/oauth/authz/FloodTest.java @@ -159,7 +159,7 @@ private void obtainAndStoreToken(String producerPrefix, HashMap String secret = clientId + "-secret"; tokens.put(clientId, loginWithClientSecret(URI.create(TOKEN_ENDPOINT_URI), null, null, - clientId, secret, true, null, null).token()); + clientId, secret, true, null, null, true).token()); } diff --git a/testsuite/keycloak-errors-tests/src/test/java/io/strimzi/testsuite/oauth/auth/ErrorReportingTests.java b/testsuite/keycloak-errors-tests/src/test/java/io/strimzi/testsuite/oauth/auth/ErrorReportingTests.java index 9807c37b..63c47b7d 100644 --- a/testsuite/keycloak-errors-tests/src/test/java/io/strimzi/testsuite/oauth/auth/ErrorReportingTests.java +++ b/testsuite/keycloak-errors-tests/src/test/java/io/strimzi/testsuite/oauth/auth/ErrorReportingTests.java @@ -170,7 +170,7 @@ private void forgedJwtSig() throws Exception { final String clientSecret = "kafka-producer-client-secret"; // first, request access token using client id and secret - TokenInfo info = loginWithClientSecret(URI.create(tokenEndpointUri), null, null, clientId, clientSecret, true, null, null); + TokenInfo info = loginWithClientSecret(URI.create(tokenEndpointUri), null, null, clientId, clientSecret, true, null, null, true); Map oauthConfig = new HashMap<>(); String tokenWithBrokenSig = info.token().substring(0, info.token().length() - 6) + "ffffff"; @@ -211,7 +211,7 @@ private void forgedJwtSigIntrospect() throws Exception { final String clientSecret = "kafka-producer-client-secret"; // first, request access token using client id and secret - TokenInfo info = loginWithClientSecret(URI.create(tokenEndpointUri), null, null, clientId, clientSecret, true, null, null); + TokenInfo info = loginWithClientSecret(URI.create(tokenEndpointUri), null, null, clientId, clientSecret, true, null, null, true); Map oauthConfig = new HashMap<>(); String tokenWithBrokenSig = info.token().substring(0, info.token().length() - 6) + "ffffff"; @@ -252,7 +252,7 @@ private void expiredJwtToken() throws Exception { final String clientSecret = "kafka-producer-client-secret"; // first, request access token using client id and secret - TokenInfo info = loginWithClientSecret(URI.create(tokenEndpointUri), null, null, clientId, clientSecret, true, null, null); + TokenInfo info = loginWithClientSecret(URI.create(tokenEndpointUri), null, null, clientId, clientSecret, true, null, null, true); Map oauthConfig = new HashMap<>(); oauthConfig.put(ClientConfig.OAUTH_ACCESS_TOKEN, info.token()); diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/Common.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/Common.java index 4801ba8d..892c09d1 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/Common.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/Common.java @@ -98,7 +98,8 @@ static String loginWithClientSecret(String tokenEndpoint, String clientId, Strin true, new PrincipalExtractor(), "all", - null); + null, + true); return tokenInfo.token(); } diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JWKSKeyUseTest.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JWKSKeyUseTest.java index d92c8ed1..06bb8edd 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JWKSKeyUseTest.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JWKSKeyUseTest.java @@ -53,7 +53,8 @@ public void doTest() throws Exception { true, null, null, - null); + null, + true); TokenIntrospection.debugLogJWT(log, tokenInfo.token()); diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KeycloakAuthorizerTest.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KeycloakAuthorizerTest.java index d38b9bdf..31d0ee52 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KeycloakAuthorizerTest.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KeycloakAuthorizerTest.java @@ -931,7 +931,8 @@ private TokenInfo login(String tokenEndpoint, String user, String userPass, int 60, 60, retries, - 0); + 0, + true); } private void logStart(String msg) { diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/PasswordAuthTest.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/PasswordAuthTest.java index d4096c58..d43c784a 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/PasswordAuthTest.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/PasswordAuthTest.java @@ -66,7 +66,8 @@ public void doTest() throws Exception { true, null, null, - null); + null, + true); String token = tokenInfo.token(); Assert.assertNotNull(token); @@ -121,7 +122,8 @@ public void doTest() throws Exception { true, null, null, - null); + null, + true); token = tokenInfo.token(); Assert.assertNotNull(token); From aa19ad1fd985526b52ae041625de868f5e8d158f Mon Sep 17 00:00:00 2001 From: Hunter Madison Date: Mon, 31 Jul 2023 09:54:49 -0400 Subject: [PATCH 6/8] Address pull request feedback. Signed-off-by: Hunter Madison --- .../kafka-oauth-single-authz-service-accounts.yaml | 8 ++++---- .../oauth/client/JaasClientOauthLoginCallbackHandler.java | 3 ++- .../kafka/oauth/server/authorizer/AuthzConfig.java | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/examples/kubernetes/kafka-oauth-single-authz-service-accounts.yaml b/examples/kubernetes/kafka-oauth-single-authz-service-accounts.yaml index 6a8c6871..3906e4f9 100644 --- a/examples/kubernetes/kafka-oauth-single-authz-service-accounts.yaml +++ b/examples/kubernetes/kafka-oauth-single-authz-service-accounts.yaml @@ -5,8 +5,8 @@ metadata: name: oidc-reader annotations: kubernetes.io/description: | - A cluster role which allows access to the OpenID Connect endpoints on - the API Server. + A cluster role which allows access to the OpenID Connect endpoints on + the API Server. rules: - nonResourceURLs: ["/.well-known/openid-configuration", "/openid/v1/jwks"] verbs: ["get", "post"] @@ -17,8 +17,8 @@ metadata: name: oidc-reader-binding annotations: kubernetes.io/description: | - A role binding which allows for anonymous access to those endpoints - from consuming applications. + A role binding which allows for anonymous access to those endpoints + from consuming applications. roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole diff --git a/oauth-client/src/main/java/io/strimzi/kafka/oauth/client/JaasClientOauthLoginCallbackHandler.java b/oauth-client/src/main/java/io/strimzi/kafka/oauth/client/JaasClientOauthLoginCallbackHandler.java index 90c82c6b..2abef7f4 100644 --- a/oauth-client/src/main/java/io/strimzi/kafka/oauth/client/JaasClientOauthLoginCallbackHandler.java +++ b/oauth-client/src/main/java/io/strimzi/kafka/oauth/client/JaasClientOauthLoginCallbackHandler.java @@ -166,7 +166,8 @@ public void configure(Map configs, String saslMechanism, ListAccept header to the upstream server. */ public static final String STRIMZI_OAUTH_INCLUDE_ACCEPT_HEADER = "strimzi.oauth.include.accept.header"; From b7f488da1348b04f6c8c596e7915a506cd77a31d Mon Sep 17 00:00:00 2001 From: Hunter Madison Date: Tue, 1 Aug 2023 05:49:34 -0400 Subject: [PATCH 7/8] Back out service account example. Signed-off-by: Hunter Madison --- examples/kubernetes/README.md | 7 -- ...a-oauth-single-authz-service-accounts.yaml | 78 ------------------- 2 files changed, 85 deletions(-) delete mode 100644 examples/kubernetes/kafka-oauth-single-authz-service-accounts.yaml diff --git a/examples/kubernetes/README.md b/examples/kubernetes/README.md index 2818375f..937b4634 100644 --- a/examples/kubernetes/README.md +++ b/examples/kubernetes/README.md @@ -35,13 +35,6 @@ They assume Keycloak is used as an authorization server, with properly configure A single node Kafka cluster with OAuth 2 authentication with OAuth metrics enabled. See [README-metrics.md]() for how to setup this example. -* `kafka-oauth-single-authz-service-accounts.yaml` - - A single node Kafka cluster using [service account tokens](https://kubernetes.io/docs/reference/access-authn-authz/authentication/#service-account-tokens) for authentication and the `simple` authorizer. - It requires that the `kube-root-ca.crt` be copied from its ConfigMap to a Secret: - - kubectl get configmap/kube-root-ca.crt -o=json | jq -r '.data."ca.crt"' | kubectl create secret generic kube-root-ca --from-file=ca.crt=/dev/stdin - ### Deploying Keycloak and accessing the Keycloak Admin Console Before deploying any of the Kafka cluster definitions, you need to deploy a Keycloak instance, and configure the realms with the necessary client definitions. diff --git a/examples/kubernetes/kafka-oauth-single-authz-service-accounts.yaml b/examples/kubernetes/kafka-oauth-single-authz-service-accounts.yaml deleted file mode 100644 index 3906e4f9..00000000 --- a/examples/kubernetes/kafka-oauth-single-authz-service-accounts.yaml +++ /dev/null @@ -1,78 +0,0 @@ ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRole -metadata: - name: oidc-reader - annotations: - kubernetes.io/description: | - A cluster role which allows access to the OpenID Connect endpoints on - the API Server. -rules: - - nonResourceURLs: ["/.well-known/openid-configuration", "/openid/v1/jwks"] - verbs: ["get", "post"] ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRoleBinding -metadata: - name: oidc-reader-binding - annotations: - kubernetes.io/description: | - A role binding which allows for anonymous access to those endpoints - from consuming applications. -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: ClusterRole - name: oidc-reader -subjects: - - apiGroup: rbac.authorization.k8s.io - kind: User - name: "system:anonymous" ---- -apiVersion: kafka.strimzi.io/v1beta2 -kind: Kafka -metadata: - name: my-cluster -spec: - kafka: - version: 3.5.0 - replicas: 1 - listeners: - - name: plain - port: 9092 - type: internal - tls: false - authentication: - type: oauth - enablePlain: true - checkAccessTokenType: false - validIssuerUri: https://kubernetes.default.svc.cluster.local - jwksEndpointUri: https://kubernetes.default.svc.cluster.local/openid/v1/jwks - userNameClaim: sub - tlsTrustedCertificates: - - secretName: kube-root-ca - certificate: ca.crt - authorization: - type: simple - superUsers: - - system:serviceaccount:default:default - config: - offsets.topic.replication.factor: 1 - transaction.state.log.replication.factor: 1 - transaction.state.log.min.isr: 1 - jvmOptions: - javaSystemProperties: - - name: "oauth.include.accept.header" - value: "false" - logging: - type: inline - loggers: - log4j.logger.io.strimzi: DEBUG - storage: - type: ephemeral - zookeeper: - replicas: 1 - storage: - type: ephemeral - entityOperator: - topicOperator: {} - userOperator: {} From 61b47ad3d9209728d75dba98b956cd51a91a1d44 Mon Sep 17 00:00:00 2001 From: Marko Strukelj Date: Sat, 5 Aug 2023 14:10:04 +0200 Subject: [PATCH 8/8] Added configuration tests to the testsuite + fixed some issues Signed-off-by: Marko Strukelj --- .../kafka/oauth/services/ValidatorKey.java | 28 +++- .../validator/JWTSignatureValidator.java | 2 +- .../OAuthIntrospectionValidator.java | 2 +- .../oauth/validator/ConfigIdHashTest.java | 9 +- .../oauth/server/authorizer/AuthzConfig.java | 2 +- .../server/authorizer/Configuration.java | 8 +- .../authorizer/KeycloakRBACAuthorizer.java | 3 +- ...asServerOauthValidatorCallbackHandler.java | 17 +- .../testsuite/oauth/MockOAuthTests.java | 4 + .../testsuite/oauth/mockoauth/Common.java | 13 ++ .../oauth/mockoauth/JaasClientConfigTest.java | 99 +++++++++++- .../oauth/mockoauth/JaasServerConfigTest.java | 148 ++++++++++++++++++ .../mockoauth/KeycloakAuthorizerTest.java | 120 ++++++++------ 13 files changed, 374 insertions(+), 81 deletions(-) create mode 100644 testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JaasServerConfigTest.java diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/ValidatorKey.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/ValidatorKey.java index 8bc0fdf0..a9138f4d 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/ValidatorKey.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/ValidatorKey.java @@ -34,6 +34,7 @@ public class ValidatorKey { private final int connectTimeout; private final int readTimeout; private final boolean enableMetrics; + private final boolean includeAcceptHeader; private final String configIdHash; @@ -53,7 +54,8 @@ public class ValidatorKey { boolean hasHostnameVerifier, int connectTimeout, int readTimeout, - boolean enableMetrics) { + boolean enableMetrics, + boolean includeAcceptHeader) { this.validIssuerUri = validIssuerUri; this.audience = audience; @@ -71,6 +73,7 @@ public class ValidatorKey { this.connectTimeout = connectTimeout; this.readTimeout = readTimeout; this.enableMetrics = enableMetrics; + this.includeAcceptHeader = includeAcceptHeader; this.configIdHash = IOUtil.hashForObjects(validIssuerUri, audience, @@ -87,7 +90,8 @@ public class ValidatorKey { hasHostnameVerifier, connectTimeout, readTimeout, - enableMetrics); + enableMetrics, + includeAcceptHeader); } @Override @@ -110,7 +114,8 @@ public boolean equals(Object o) { Objects.equals(sslRandom, that.sslRandom) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(readTimeout, that.readTimeout) && - Objects.equals(enableMetrics, that.enableMetrics); + Objects.equals(enableMetrics, that.enableMetrics) && + Objects.equals(includeAcceptHeader, that.includeAcceptHeader); } @Override @@ -130,7 +135,8 @@ public int hashCode() { hasHostnameVerifier, connectTimeout, readTimeout, - enableMetrics); + enableMetrics, + includeAcceptHeader); } /** @@ -183,6 +189,7 @@ public static class JwtValidatorKey extends ValidatorKey { * @param readTimeout readTimeout * @param enableMetrics enableMetrics * @param failFast failFast + * @param includeAcceptHeader includeAcceptHeader */ @SuppressWarnings("checkstyle:parameternumber") public JwtValidatorKey(String validIssuerUri, @@ -208,7 +215,8 @@ public JwtValidatorKey(String validIssuerUri, int connectTimeout, int readTimeout, boolean enableMetrics, - boolean failFast) { + boolean failFast, + boolean includeAcceptHeader) { super(validIssuerUri, audience, @@ -225,7 +233,8 @@ public JwtValidatorKey(String validIssuerUri, hasHostnameVerifier, connectTimeout, readTimeout, - enableMetrics); + enableMetrics, + includeAcceptHeader); this.jwksEndpointUri = jwksEndpointUri; this.jwksRefreshSeconds = jwksRefreshSeconds; this.jwksExpirySeconds = jwksExpirySeconds; @@ -317,6 +326,7 @@ public static class IntrospectionValidatorKey extends ValidatorKey { * @param enableMetrics enableMetrics * @param retries retries * @param retryPauseMillis retryPauseMillis + * @param includeAcceptHeader includeAcceptHeader */ @SuppressWarnings("checkstyle:parameternumber") public IntrospectionValidatorKey(String validIssuerUri, @@ -342,7 +352,8 @@ public IntrospectionValidatorKey(String validIssuerUri, int readTimeout, boolean enableMetrics, int retries, - long retryPauseMillis) { + long retryPauseMillis, + boolean includeAcceptHeader) { super(validIssuerUri, audience, @@ -359,7 +370,8 @@ public IntrospectionValidatorKey(String validIssuerUri, hasHostnameVerifier, connectTimeout, readTimeout, - enableMetrics); + enableMetrics, + includeAcceptHeader); this.introspectionEndpoint = introspectionEndpoint; this.userInfoEndpoint = userInfoEndpoint; this.validTokenType = validTokenType; diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java index 248e48bc..402fa0ba 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java @@ -116,7 +116,7 @@ public class JWTSignatureValidator implements TokenValidator { * @param readTimeoutSeconds The maximum time to wait for response from authorization server after connection has been established and request sent (in seconds) * @param enableMetrics The switch that enables metrics collection * @param failFast Should exception be thrown during initialisation if unable to retrieve JWKS keys - * @param includeAcceptHeader Should we skip sending the Accept header when making outbound http requests + * @param includeAcceptHeader Should we send the Accept header when making outbound http requests */ @SuppressWarnings("checkstyle:ParameterNumber") public JWTSignatureValidator(String validatorId, diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/OAuthIntrospectionValidator.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/OAuthIntrospectionValidator.java index 03ade42e..f09770bc 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/OAuthIntrospectionValidator.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/OAuthIntrospectionValidator.java @@ -102,7 +102,7 @@ public class OAuthIntrospectionValidator implements TokenValidator { * @param enableMetrics The switch that enables metrics collection * @param retries Maximum number of retries if request to the authorization server fails (0 means no retries) * @param retryPauseMillis Time to pause before retrying the request to the authorization server - * @param includeAcceptHeader Should we skip sending the Accept header when making outbound http requests + * @param includeAcceptHeader Should we send the Accept header when making outbound http requests */ @SuppressWarnings("checkstyle:ParameterNumber") public OAuthIntrospectionValidator(String id, diff --git a/oauth-common/src/test/java/io/strimzi/kafka/oauth/validator/ConfigIdHashTest.java b/oauth-common/src/test/java/io/strimzi/kafka/oauth/validator/ConfigIdHashTest.java index df841daa..5de0880a 100644 --- a/oauth-common/src/test/java/io/strimzi/kafka/oauth/validator/ConfigIdHashTest.java +++ b/oauth-common/src/test/java/io/strimzi/kafka/oauth/validator/ConfigIdHashTest.java @@ -16,14 +16,14 @@ public void testValidatorKey() { ValidatorKey vkey = getKey(null, null); ValidatorKey vkey2 = getKey(null, null); - Assert.assertEquals("Config id hash mismatch", "13092992", vkey.getConfigIdHash()); + Assert.assertEquals("Config id hash mismatch", "1ed03b31", vkey.getConfigIdHash()); Assert.assertEquals("Config id hash should be the same", vkey.getConfigIdHash(), vkey2.getConfigIdHash()); ValidatorKey key3 = getKey("group", null); ValidatorKey key4 = getKey(null, "group"); - Assert.assertEquals("Config id hash mismatch", "9639c91e", key3.getConfigIdHash()); - Assert.assertEquals("Config id hash mismatch", "b46a6291", key4.getConfigIdHash()); + Assert.assertEquals("Config id hash mismatch", "1afa0b66", key3.getConfigIdHash()); + Assert.assertEquals("Config id hash mismatch", "0d8122fb", key4.getConfigIdHash()); } ValidatorKey getKey(String groupQuery, String groupDelimiter) { @@ -50,6 +50,7 @@ ValidatorKey getKey(String groupQuery, String groupDelimiter) { 60, true, 0, - 0); + 0, + false); } } diff --git a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/AuthzConfig.java b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/AuthzConfig.java index 49ab1ac7..b77c319b 100644 --- a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/AuthzConfig.java +++ b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/AuthzConfig.java @@ -112,7 +112,7 @@ public class AuthzConfig extends Config { /** * Disable sending the Accept header to the upstream server. */ - public static final String STRIMZI_OAUTH_INCLUDE_ACCEPT_HEADER = "strimzi.oauth.include.accept.header"; + public static final String STRIMZI_AUTHORIZATION_INCLUDE_ACCEPT_HEADER = "strimzi.authorization.include.accept.header"; /** diff --git a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/Configuration.java b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/Configuration.java index 476af7cc..5cf98a0e 100644 --- a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/Configuration.java +++ b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/Configuration.java @@ -53,7 +53,7 @@ import static io.strimzi.kafka.oauth.server.authorizer.AuthzConfig.STRIMZI_AUTHORIZATION_SSL_TRUSTSTORE_PASSWORD; import static io.strimzi.kafka.oauth.server.authorizer.AuthzConfig.STRIMZI_AUTHORIZATION_SSL_TRUSTSTORE_TYPE; import static io.strimzi.kafka.oauth.server.authorizer.AuthzConfig.STRIMZI_AUTHORIZATION_TOKEN_ENDPOINT_URI; -import static io.strimzi.kafka.oauth.server.authorizer.AuthzConfig.STRIMZI_OAUTH_INCLUDE_ACCEPT_HEADER; +import static io.strimzi.kafka.oauth.server.authorizer.AuthzConfig.STRIMZI_AUTHORIZATION_INCLUDE_ACCEPT_HEADER; /** * The classes used to parse and store Authorizer configuration. @@ -153,7 +153,7 @@ public class Configuration { reuseGrants = authzConfig.getValueAsBoolean(STRIMZI_AUTHORIZATION_REUSE_GRANTS, true); - includeAcceptHeader = ConfigUtil.getDefaultBooleanConfigWithFallbackLookup(authzConfig, STRIMZI_OAUTH_INCLUDE_ACCEPT_HEADER, Config.OAUTH_INCLUDE_ACCEPT_HEADER, true); + includeAcceptHeader = ConfigUtil.getDefaultBooleanConfigWithFallbackLookup(authzConfig, STRIMZI_AUTHORIZATION_INCLUDE_ACCEPT_HEADER, Config.OAUTH_INCLUDE_ACCEPT_HEADER, true); configureHttpRetries(authzConfig); configureMetrics(authzConfig); @@ -309,7 +309,7 @@ static AuthzConfig convertToCommonConfig(Map configs) { OAUTH_READ_TIMEOUT_SECONDS, STRIMZI_AUTHORIZATION_ENABLE_METRICS, OAUTH_ENABLE_METRICS, - STRIMZI_OAUTH_INCLUDE_ACCEPT_HEADER, + STRIMZI_AUTHORIZATION_INCLUDE_ACCEPT_HEADER, Config.OAUTH_INCLUDE_ACCEPT_HEADER }; @@ -425,7 +425,7 @@ int getReadTimeoutSeconds() { return configMap; } - boolean includeAcceptHeader() { + boolean getIncludeAcceptHeader() { return includeAcceptHeader; } diff --git a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/KeycloakRBACAuthorizer.java b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/KeycloakRBACAuthorizer.java index 93ae45d1..4311ea98 100644 --- a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/KeycloakRBACAuthorizer.java +++ b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/KeycloakRBACAuthorizer.java @@ -234,6 +234,7 @@ public void configure(Map configs) { + "\n readTimeoutSeconds: " + configuration.getReadTimeoutSeconds() + "\n enableMetrics: " + configuration.isEnableMetrics() + "\n gcPeriodSeconds: " + configuration.getGcPeriodSeconds() + + "\n includeAcceptHeader: " + configuration.getIncludeAcceptHeader() ); } } @@ -550,7 +551,7 @@ private JsonNode fetchAuthorizationGrantsOnce(String token) { try { response = post(configuration.getTokenEndpointUrl(), socketFactory, hostnameVerifier, authorization, - "application/x-www-form-urlencoded", body.toString(), JsonNode.class, configuration.getConnectTimeoutSeconds(), configuration.getReadTimeoutSeconds(), configuration.includeAcceptHeader()); + "application/x-www-form-urlencoded", body.toString(), JsonNode.class, configuration.getConnectTimeoutSeconds(), configuration.getReadTimeoutSeconds(), configuration.getIncludeAcceptHeader()); addGrantsHttpMetricSuccessTime(startTime); } catch (HttpException e) { addGrantsHttpMetricErrorTime(e, startTime); diff --git a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java index 0a42469c..6aabf63d 100644 --- a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java +++ b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java @@ -165,6 +165,7 @@ * For example, when configuring fast local token validation the JWKS endpoint is contacted during startup to retrieve the signing keys. * If that fails, the Kafka broker shuts down with an error. By setting this to false the Kafka broker will not shutdown, and the validator will keep trying to fetch the JWKS keys. Default value is true. *
  • oauth.enable.metrics Enable the OAuth metrics. Default value is false.
  • + *
  • oauth.include.accept.header Configure whether 'Accept: application/json' header is included in the requests to the authorization server. Default value is true.
  • * *

    * TLS sasl.jaas.config configuration for TLS connectivity with the authorization server: @@ -297,14 +298,14 @@ public void delegatedConfigure(Map configs, String saslMechanism, Lis String effectiveConfigId = setupJWKSValidator(configId, jwksUri, validIssuerUri, checkTokenType, usernameClaim, fallbackUsernameClaim, fallbackUsernamePrefix, groupQuery, groupDelimiter, audience, customClaimCheck, - sslTruststore, sslPassword, sslType, sslRnd); + sslTruststore, sslPassword, sslType, sslRnd, includeAcceptHeader); URI jwksEndpointUri = config.getValueAsURI(ServerConfig.OAUTH_JWKS_ENDPOINT_URI); validationSensorKeyProducer = new JwksValidationSensorKeyProducer(effectiveConfigId, saslMechanism, jwksEndpointUri); } else { String effectiveConfigId = setupIntrospectionValidator(configId, validIssuerUri, usernameClaim, fallbackUsernameClaim, fallbackUsernamePrefix, groupQuery, groupDelimiter, clientId, clientSecret, audience, customClaimCheck, - sslTruststore, sslPassword, sslType, sslRnd); + sslTruststore, sslPassword, sslType, sslRnd, includeAcceptHeader); URI introspectionUri = config.getValueAsURI(ServerConfig.OAUTH_INTROSPECTION_ENDPOINT_URI); validationSensorKeyProducer = new IntrospectValidationSensorKeyProducer(effectiveConfigId, saslMechanism, introspectionUri); @@ -325,12 +326,11 @@ private void configureMetrics(Map configs) { @SuppressWarnings("checkstyle:ParameterNumber") private String setupIntrospectionValidator(String configId, String validIssuerUri, String usernameClaim, String fallbackUsernameClaim, String fallbackUsernamePrefix, String groupQuery, String groupDelimiter, String clientId, String clientSecret, String audience, String customClaimCheck, - String sslTruststore, String sslPassword, String sslType, String sslRnd) { + String sslTruststore, String sslPassword, String sslType, String sslRnd, boolean includeAcceptHeader) { String introspectionEndpoint = config.getValue(ServerConfig.OAUTH_INTROSPECTION_ENDPOINT_URI); String userInfoEndpoint = config.getValue(ServerConfig.OAUTH_USERINFO_ENDPOINT_URI); String validTokenType = config.getValue(ServerConfig.OAUTH_VALID_TOKEN_TYPE); - boolean includeAcceptHeader = config.getValueAsBoolean(Config.OAUTH_INCLUDE_ACCEPT_HEADER, true); ValidatorKey vkey = new ValidatorKey.IntrospectionValidatorKey( validIssuerUri, @@ -355,7 +355,8 @@ private String setupIntrospectionValidator(String configId, String validIssuerUr readTimeout, enableMetrics, retries, - retryPauseMillis); + retryPauseMillis, + includeAcceptHeader); String effectiveConfigId = configId != null ? configId : vkey.getConfigIdHash(); @@ -391,14 +392,13 @@ private String setupIntrospectionValidator(String configId, String validIssuerUr private String setupJWKSValidator(String configId, String jwksUri, String validIssuerUri, boolean checkTokenType, String usernameClaim, String fallbackUsernameClaim, String fallbackUsernamePrefix, String groupQuery, String groupDelimiter, String audience, String customClaimCheck, - String sslTruststore, String sslPassword, String sslType, String sslRnd) { + String sslTruststore, String sslPassword, String sslType, String sslRnd, boolean includeAcceptHeader) { int jwksRefreshSeconds = config.getValueAsInt(ServerConfig.OAUTH_JWKS_REFRESH_SECONDS, 300); int jwksExpirySeconds = config.getValueAsInt(ServerConfig.OAUTH_JWKS_EXPIRY_SECONDS, 360); int jwksMinPauseSeconds = config.getValueAsInt(ServerConfig.OAUTH_JWKS_REFRESH_MIN_PAUSE_SECONDS, 1); boolean failFast = config.getValueAsBoolean(ServerConfig.OAUTH_FAIL_FAST, true); boolean jwksIgnoreKeyUse = config.getValueAsBoolean(ServerConfig.OAUTH_JWKS_IGNORE_KEY_USE, false); - boolean includeAcceptHeader = config.getValueAsBoolean(Config.OAUTH_INCLUDE_ACCEPT_HEADER, true); ValidatorKey vkey = new ValidatorKey.JwtValidatorKey( validIssuerUri, @@ -423,7 +423,8 @@ private String setupJWKSValidator(String configId, String jwksUri, String validI connectTimeout, readTimeout, enableMetrics, - failFast + failFast, + includeAcceptHeader ); String effectiveConfigId = configId != null ? configId : vkey.getConfigIdHash(); diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java index 591bd948..487b559a 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java @@ -6,6 +6,7 @@ import io.strimzi.testsuite.oauth.common.TestContainersLogCollector; import io.strimzi.testsuite.oauth.common.TestContainersWatcher; +import io.strimzi.testsuite.oauth.mockoauth.JaasServerConfigTest; import io.strimzi.testsuite.oauth.mockoauth.metrics.MetricsTest; import io.strimzi.testsuite.oauth.mockoauth.ConnectTimeoutTests; import io.strimzi.testsuite.oauth.mockoauth.JWKSKeyUseTest; @@ -68,6 +69,9 @@ public void runTests() throws Exception { logStart("JaasClientConfigTest :: Client Configuration Tests"); new JaasClientConfigTest().doTest(); + logStart("JaasServerConfigTest :: Server Configuration Tests"); + new JaasServerConfigTest().doTest(); + logStart("PasswordAuthTest :: Password Grant Tests"); new PasswordAuthTest().doTest(); diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/Common.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/Common.java index 892c09d1..25b63076 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/Common.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/Common.java @@ -13,6 +13,7 @@ import io.strimzi.kafka.oauth.common.SSLUtil; import io.strimzi.kafka.oauth.common.TimeUtil; import io.strimzi.kafka.oauth.common.TokenInfo; +import io.strimzi.testsuite.oauth.common.TestUtil; import io.strimzi.testsuite.oauth.mockoauth.metrics.Metrics; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; @@ -26,6 +27,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.LinkedHashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -219,6 +221,17 @@ public static String getProjectRoot() { return cwd; } + static void checkLog(LogLineReader logReader, String... args) throws IOException { + if (args.length % 2 != 0) { + throw new IllegalArgumentException("Args should be in pairs but there is an odd number of them."); + } + List lines = logReader.readNext(); + + for (int i = 0; i < args.length; i += 2) { + Assert.assertEquals(args[i] + " =~ " + args[i + 1], 1, TestUtil.countLogForRegex(lines, args[i] + ":.*" + args[i + 1])); + } + } + static class MockBearerTokenWithPayload implements BearerTokenWithPayload { private final TokenInfo ti; diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JaasClientConfigTest.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JaasClientConfigTest.java index ccedba0c..71d67e77 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JaasClientConfigTest.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JaasClientConfigTest.java @@ -5,14 +5,19 @@ package io.strimzi.testsuite.oauth.mockoauth; import io.strimzi.kafka.oauth.client.ClientConfig; +import io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler; import io.strimzi.kafka.oauth.common.ConfigException; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; +import org.jetbrains.annotations.NotNull; import org.junit.Assert; +import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.LoginException; +import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -37,6 +42,88 @@ public void doTest() throws Exception { testNoClientId(); + testMissingClientSecret(); + + testMissingPassword(); + + testMissingTrustStore(); + + testAllConfigOptions(); + } + + private void testAllConfigOptions() throws IOException { + + JaasClientOauthLoginCallbackHandler loginHandler = new JaasClientOauthLoginCallbackHandler(); + + Map attrs = new HashMap<>(); + attrs.put(ClientConfig.OAUTH_CONFIG_ID, "config-id"); + attrs.put(ClientConfig.OAUTH_REFRESH_TOKEN, "refresh-token"); + attrs.put(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, "https://sso/token"); + attrs.put(ClientConfig.OAUTH_CLIENT_ID, "client-id"); + attrs.put(ClientConfig.OAUTH_CLIENT_SECRET, "client-secret"); + attrs.put(ClientConfig.OAUTH_PASSWORD_GRANT_USERNAME, "username"); + attrs.put(ClientConfig.OAUTH_PASSWORD_GRANT_PASSWORD, "password"); + attrs.put(ClientConfig.OAUTH_USERNAME_CLAIM, "username-claim"); + attrs.put(ClientConfig.OAUTH_FALLBACK_USERNAME_CLAIM, "fallback-username-claim"); + attrs.put(ClientConfig.OAUTH_FALLBACK_USERNAME_PREFIX, "username-prefix"); + attrs.put(ClientConfig.OAUTH_SCOPE, "scope"); + attrs.put(ClientConfig.OAUTH_AUDIENCE, "audience"); + attrs.put(ClientConfig.OAUTH_ACCESS_TOKEN_IS_JWT, "false"); + attrs.put(ClientConfig.OAUTH_MAX_TOKEN_EXPIRY_SECONDS, "300"); + attrs.put(ClientConfig.OAUTH_CONNECT_TIMEOUT_SECONDS, "20"); + attrs.put(ClientConfig.OAUTH_READ_TIMEOUT_SECONDS, "25"); + attrs.put(ClientConfig.OAUTH_HTTP_RETRIES, "3"); + attrs.put(ClientConfig.OAUTH_HTTP_RETRY_PAUSE_MILLIS, "500"); + attrs.put(ClientConfig.OAUTH_ENABLE_METRICS, "true"); + attrs.put(ClientConfig.OAUTH_INCLUDE_ACCEPT_HEADER, "false"); + + + AppConfigurationEntry jaasConfig = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, attrs); + + + Map clientProps = new HashMap<>(); + clientProps.put("security.protocol", "SASL_PLAINTEXT"); + clientProps.put("sasl.mechanism", "OAUTHBEARER"); + + LogLineReader logReader = new LogLineReader(Common.LOG_PATH); + logReader.readNext(); + + loginHandler.configure(clientProps, "OAUTHBEARER", Arrays.asList(jaasConfig)); + + Common.checkLog(logReader, "configId", "config-id", + "refreshToken", "r\\*\\*", + "tokenEndpointUri", "https://sso/token", + "clientId", "client-id", + "clientSecret", "c\\*\\*", + "username", "username", + "password", "p\\*\\*", + "scope", "scope", + "audience", "audience", + "isJwt", "false", + "maxTokenExpirySeconds", "300", + "connectTimeout", "20", + "readTimeout", "25", + "retries", "3", + "retryPauseMillis", "500", + "enableMetrics", "true", + "includeAcceptHeader", "false"); + + // TODO: add usernameClaim, fallbackUserNameClaim and fallbackUserNamePrefix check when fixed + //"principalExtractor", "PrincipalExtractor {usernameClaim: io.strimzi.kafka.oauth.common.PrincipalExtractor$Extractor@28486680, fallbackUsernameClaim: io.strimzi.kafka.oauth.common.PrincipalExtractor$Extractor@4d7e7435, fallbackUsernamePrefix: null}", + + + // we could not check tokenEndpointUri and token in the same run + + attrs.put(ClientConfig.OAUTH_ACCESS_TOKEN, "access-token"); + jaasConfig = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, attrs); + loginHandler = new JaasClientOauthLoginCallbackHandler(); + loginHandler.configure(clientProps, "OAUTHBEARER", Arrays.asList(jaasConfig)); + + Common.checkLog(logReader, "token", "a\\*\\*"); + } + + @NotNull + private void testMissingClientSecret() throws Exception { Map oauthConfig = new HashMap<>(); oauthConfig.put(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, TOKEN_ENDPOINT_URI); oauthConfig.put(ClientConfig.OAUTH_CLIENT_ID, KAFKA_CLI); @@ -47,7 +134,12 @@ public void doTest() throws Exception { } catch (KafkaException e) { assertConfigException(e, "client credentials"); } + } + private void testMissingPassword() throws Exception { + Map oauthConfig = new HashMap<>(); + oauthConfig.put(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, TOKEN_ENDPOINT_URI); + oauthConfig.put(ClientConfig.OAUTH_CLIENT_ID, KAFKA_CLI); oauthConfig.put(ClientConfig.OAUTH_PASSWORD_GRANT_USERNAME, KAFKA_USER); try { initJaas(oauthConfig); @@ -56,8 +148,13 @@ public void doTest() throws Exception { } catch (KafkaException e) { assertConfigException(e, "no password specified"); } + } - // Fix it, now it should try to authenticate with mockoauth server + private void testMissingTrustStore() throws Exception { + Map oauthConfig = new HashMap<>(); + oauthConfig.put(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, TOKEN_ENDPOINT_URI); + oauthConfig.put(ClientConfig.OAUTH_CLIENT_ID, KAFKA_CLI); + oauthConfig.put(ClientConfig.OAUTH_PASSWORD_GRANT_USERNAME, KAFKA_USER); oauthConfig.put(ClientConfig.OAUTH_PASSWORD_GRANT_PASSWORD, KAFKA_USER_PASSWORD); try { initJaas(oauthConfig); diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JaasServerConfigTest.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JaasServerConfigTest.java new file mode 100644 index 00000000..2276f765 --- /dev/null +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JaasServerConfigTest.java @@ -0,0 +1,148 @@ +/* + * Copyright 2017-2023, Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.testsuite.oauth.mockoauth; + +import io.strimzi.kafka.oauth.metrics.GlobalConfig; +import io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler; +import io.strimzi.kafka.oauth.server.ServerConfig; + +import javax.security.auth.login.AppConfigurationEntry; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class JaasServerConfigTest { + + public void doTest() throws Exception { + testAllConfigOptions(); + } + + private void testAllConfigOptions() throws IOException { + + // Fast local JWT check + JaasServerOauthValidatorCallbackHandler handler = new JaasServerOauthValidatorCallbackHandler(); + + Map attrs = new HashMap<>(); + + // Fast local JWT check + attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id"); + attrs.put(ServerConfig.OAUTH_JWKS_ENDPOINT_URI, "https://sso/jwks"); + attrs.put(ServerConfig.OAUTH_FAIL_FAST, "false"); + attrs.put(ServerConfig.OAUTH_USERNAME_CLAIM, "username-claim"); + attrs.put(ServerConfig.OAUTH_GROUPS_CLAIM, "$.groups"); + attrs.put(ServerConfig.OAUTH_GROUPS_CLAIM_DELIMITER, ","); + attrs.put(ServerConfig.OAUTH_CLIENT_ID, "client-id"); + attrs.put(ServerConfig.OAUTH_CHECK_AUDIENCE, "true"); + attrs.put(ServerConfig.OAUTH_CUSTOM_CLAIM_CHECK, "@.aud anyof ['kafka', 'something']"); + attrs.put(ServerConfig.OAUTH_JWKS_REFRESH_SECONDS, "10"); + attrs.put(ServerConfig.OAUTH_JWKS_REFRESH_MIN_PAUSE_SECONDS, "2"); + attrs.put(ServerConfig.OAUTH_JWKS_EXPIRY_SECONDS, "900"); + attrs.put(ServerConfig.OAUTH_JWKS_IGNORE_KEY_USE, "true"); + attrs.put(ServerConfig.OAUTH_VALID_ISSUER_URI, "https://sso"); + attrs.put(ServerConfig.OAUTH_CONNECT_TIMEOUT_SECONDS, "10"); + attrs.put(ServerConfig.OAUTH_READ_TIMEOUT_SECONDS, "10"); + attrs.put(ServerConfig.OAUTH_CHECK_ACCESS_TOKEN_TYPE, "false"); + attrs.put(ServerConfig.OAUTH_ENABLE_METRICS, "true"); + attrs.put(GlobalConfig.STRIMZI_OAUTH_METRIC_REPORTERS, "io.strimzi.testsuite.oauth.common.metrics.TestMetricsReporter"); + attrs.put(ServerConfig.OAUTH_SSL_TRUSTSTORE_LOCATION, "../docker/target/kafka/certs/ca-truststore.p12"); + attrs.put(ServerConfig.OAUTH_SSL_TRUSTSTORE_PASSWORD, "changeit"); + attrs.put(ServerConfig.OAUTH_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, ""); + attrs.put(ServerConfig.OAUTH_SSL_TRUSTSTORE_TYPE, "pkcs12"); + attrs.put(ServerConfig.OAUTH_INCLUDE_ACCEPT_HEADER, "false"); + + AppConfigurationEntry jaasConfig = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, attrs); + + Map serverProps = new HashMap<>(); + serverProps.put("security.protocol", "SASL_PLAINTEXT"); + serverProps.put("sasl.mechanism", "OAUTHBEARER"); + + LogLineReader logReader = new LogLineReader(Common.LOG_PATH); + logReader.readNext(); + + handler.configure(serverProps, "OAUTHBEARER", Arrays.asList(jaasConfig)); + + Common.checkLog(logReader, "JWTSignatureValidator", "", + "validatorId", "config-id", + "keysEndpointUri", "https://sso/jwks", + "groupsClaimQuery", "\\$\\.groups", + "groupsClaimDelimiter", ",", + "validIssuerUri", "https://sso", + "hostnameVerifier", "SSLUtil", + "sslSocketFactory", "SSLSocketFactoryImpl", + "certsRefreshSeconds", "10", + "certsRefreshMinPauseSeconds", "2", + "certsExpirySeconds", "900", + "certsIgnoreKeyUse", "true", + "checkAccessTokenType", "false", + "audience", "client-id", + "customClaimCheck", "@\\.aud anyof \\['kafka', 'something'\\]", + "connectTimeoutSeconds", "10", + "readTimeoutSeconds", "10", + "enableMetrics", "true", + "failFast", "false", + "includeAcceptHeader", "false" + ); + + // principalExtractor: PrincipalExtractor {usernameClaim: io.strimzi.kafka.oauth.common.PrincipalExtractor$Extractor@1e5f4170, fallbackUsernameClaim: null, fallbackUsernamePrefix: null} + + + // Introspect endpoint + attrs = new HashMap<>(); + attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id2"); + attrs.put(ServerConfig.OAUTH_INTROSPECTION_ENDPOINT_URI, "https://sso/introspect"); + attrs.put(ServerConfig.OAUTH_USERINFO_ENDPOINT_URI, "https://sso/userinfo"); + attrs.put(ServerConfig.OAUTH_USERNAME_CLAIM, "username-claim"); + attrs.put(ServerConfig.OAUTH_FALLBACK_USERNAME_CLAIM, "fallback-username-claim"); + attrs.put(ServerConfig.OAUTH_FALLBACK_USERNAME_PREFIX, "fallback-username-prefix"); + attrs.put(ServerConfig.OAUTH_GROUPS_CLAIM, "$.groups"); + attrs.put(ServerConfig.OAUTH_GROUPS_CLAIM_DELIMITER, ","); + attrs.put(ServerConfig.OAUTH_CHECK_AUDIENCE, "true"); + attrs.put(ServerConfig.OAUTH_CUSTOM_CLAIM_CHECK, "@.aud anyof ['kafka', 'something']"); + attrs.put(ServerConfig.OAUTH_CLIENT_ID, "client-id"); + attrs.put(ServerConfig.OAUTH_CLIENT_SECRET, "client-secret"); + attrs.put(ServerConfig.OAUTH_VALID_ISSUER_URI, "https://sso"); + attrs.put(ServerConfig.OAUTH_VALID_TOKEN_TYPE, "jwt"); + attrs.put(ServerConfig.OAUTH_HTTP_RETRIES, "3"); + attrs.put(ServerConfig.OAUTH_HTTP_RETRY_PAUSE_MILLIS, "500"); + attrs.put(ServerConfig.OAUTH_CONNECT_TIMEOUT_SECONDS, "10"); + attrs.put(ServerConfig.OAUTH_READ_TIMEOUT_SECONDS, "10"); + attrs.put(ServerConfig.OAUTH_ENABLE_METRICS, "true"); + attrs.put(GlobalConfig.STRIMZI_OAUTH_METRIC_REPORTERS, "io.strimzi.testsuite.oauth.common.metrics.TestMetricsReporter"); + attrs.put(ServerConfig.OAUTH_SSL_TRUSTSTORE_LOCATION, "../docker/target/kafka/certs/ca-truststore.p12"); + attrs.put(ServerConfig.OAUTH_SSL_TRUSTSTORE_PASSWORD, "changeit"); + attrs.put(ServerConfig.OAUTH_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, ""); + attrs.put(ServerConfig.OAUTH_SSL_TRUSTSTORE_TYPE, "pkcs12"); + attrs.put(ServerConfig.OAUTH_INCLUDE_ACCEPT_HEADER, "false"); + + jaasConfig = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, attrs); + handler = new JaasServerOauthValidatorCallbackHandler(); + handler.configure(serverProps, "OAUTHBEARER", Arrays.asList(jaasConfig)); + + Common.checkLog(logReader, "OAuthIntrospectionValidator", "", + "id", "config-id2", + "introspectionEndpointUri", "https://sso/introspect", + "groupsClaimQuery", "\\$\\.groups", + "groupsClaimDelimiter", ",", + "validIssuerUri", "https://sso", + "userInfoUri", "https://sso/userinfo", + "hostnameVerifier", "SSLUtil", + "sslSocketFactory", "SSLSocketFactoryImpl", + "validTokenType", "jwt", + "clientId", "client-id", + "clientSecret", "c\\*\\*", + "audience", "client-id", + "customClaimCheck", "@\\.aud anyof \\['kafka', 'something'\\]", + "connectTimeoutSeconds", "10", + "readTimeoutSeconds", "10", + "enableMetrics", "true", + "retries", "3", + "retryPauseMillis", "500", + "includeAcceptHeader", "false" + ); + + //principalExtractor: PrincipalExtractor {usernameClaim: io.strimzi.kafka.oauth.common.PrincipalExtractor$Extractor@3bde62ff, fallbackUsernameClaim: io.strimzi.kafka.oauth.common.PrincipalExtractor$Extractor@523424b5, fallbackUsernamePrefix: fallback-username-prefix} + } +} diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KeycloakAuthorizerTest.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KeycloakAuthorizerTest.java index 31d0ee52..10de052a 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KeycloakAuthorizerTest.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KeycloakAuthorizerTest.java @@ -5,6 +5,7 @@ package io.strimzi.testsuite.oauth.mockoauth; import io.strimzi.kafka.oauth.common.BearerTokenWithPayload; +import io.strimzi.kafka.oauth.common.Config; import io.strimzi.kafka.oauth.common.ConfigException; import io.strimzi.kafka.oauth.common.OAuthAuthenticator; import io.strimzi.kafka.oauth.common.PrincipalExtractor; @@ -59,6 +60,7 @@ import static io.strimzi.testsuite.oauth.common.TestUtil.checkLogForRegex; import static io.strimzi.testsuite.oauth.mockoauth.Common.addGrantsForToken; import static io.strimzi.testsuite.oauth.mockoauth.Common.changeAuthServerMode; +import static io.strimzi.testsuite.oauth.mockoauth.Common.checkLog; import static io.strimzi.testsuite.oauth.mockoauth.Common.createOAuthClient; import static io.strimzi.testsuite.oauth.mockoauth.Common.createOAuthUser; import static org.mockito.Mockito.mock; @@ -435,28 +437,28 @@ void doConfigTests() throws IOException { authorizer.configure(config); } - List lines = logReader.readNext(); - // Check the defaults - Assert.assertEquals("tokenEndpointUri: https://mockoauth:8090/grants", 1, TestUtil.countLogForRegex(lines, "tokenEndpointUri: https://mockoauth:8090/grants")); - Assert.assertEquals("clientId: kafka", 1, TestUtil.countLogForRegex(lines, "clientId: kafka")); - Assert.assertEquals("sslSocketFactory: null", 1, TestUtil.countLogForRegex(lines, "sslSocketFactory: null")); - Assert.assertEquals("hostnameVerifier: null", 1, TestUtil.countLogForRegex(lines, "hostnameVerifier: null")); - Assert.assertEquals("clusterName: kafka-cluster", 1, TestUtil.countLogForRegex(lines, "clusterName: kafka-cluster")); - Assert.assertEquals("delegateToKafkaACL: false", 1, TestUtil.countLogForRegex(lines, "delegateToKafkaACL: false")); - Assert.assertEquals("superUsers: []", 1, TestUtil.countLogForRegex(lines, "superUsers: \\[\\]")); - Assert.assertEquals("grantsRefreshPeriodSeconds: 60", 1, TestUtil.countLogForRegex(lines, "grantsRefreshPeriodSeconds: 60")); - Assert.assertEquals("grantsRefreshPoolSize: 5", 1, TestUtil.countLogForRegex(lines, "grantsRefreshPoolSize: 5")); - Assert.assertEquals("grantsMaxIdleTimeSeconds: 300", 1, TestUtil.countLogForRegex(lines, "grantsMaxIdleTimeSeconds: 300")); - Assert.assertEquals("httpRetries: 0", 1, TestUtil.countLogForRegex(lines, "httpRetries: 0")); - Assert.assertEquals("reuseGrants: true", 1, TestUtil.countLogForRegex(lines, "reuseGrants: true")); - Assert.assertEquals("connectTimeoutSeconds: 60", 1, TestUtil.countLogForRegex(lines, "connectTimeoutSeconds: 60")); - Assert.assertEquals("readTimeoutSeconds: 60", 1, TestUtil.countLogForRegex(lines, "readTimeoutSeconds: 60")); - Assert.assertEquals("enableMetrics: false", 1, TestUtil.countLogForRegex(lines, "enableMetrics: false")); - Assert.assertEquals("gcPeriodSeconds: 300", 1, TestUtil.countLogForRegex(lines, "gcPeriodSeconds: 300")); + checkLog(logReader, "tokenEndpointUri", "https://mockoauth:8090/grants", + "clientId", "kafka", + "sslSocketFactory", "null", + "hostnameVerifier", "null", + "clusterName", "kafka-cluster", + "delegateToKafkaACL", "false", + "superUsers", "\\[\\]", + "grantsRefreshPeriodSeconds", "60", + "grantsRefreshPoolSize", "5", + "grantsMaxIdleTimeSeconds", "300", + "httpRetries", "0", + "reuseGrants", "true", + "connectTimeoutSeconds", "60", + "readTimeoutSeconds", "60", + "enableMetrics", "false", + "gcPeriodSeconds", "300", + "includeAcceptHeader", "true" + ); - // Custom config + // Custom config config.put(AuthzConfig.STRIMZI_AUTHORIZATION_KAFKA_CLUSTER_NAME, "cluster1"); config.put("super.users", "User:admin;User:service-account-kafka"); config.put(AuthzConfig.STRIMZI_AUTHORIZATION_GRANTS_REFRESH_PERIOD_SECONDS, "180"); @@ -468,6 +470,7 @@ void doConfigTests() throws IOException { config.put(AuthzConfig.STRIMZI_AUTHORIZATION_READ_TIMEOUT_SECONDS, "15"); config.put(AuthzConfig.STRIMZI_AUTHORIZATION_ENABLE_METRICS, "true"); config.put(AuthzConfig.STRIMZI_AUTHORIZATION_GRANTS_GC_PERIOD_SECONDS, "60"); + config.put(AuthzConfig.STRIMZI_AUTHORIZATION_INCLUDE_ACCEPT_HEADER, "false"); try (KeycloakAuthorizer authorizer = new KeycloakAuthorizer()) { try { @@ -483,19 +486,35 @@ void doConfigTests() throws IOException { authorizer.configure(config); } - lines = logReader.readNext(); + checkLog(logReader, "clusterName", "cluster1", + "superUsers", "\\['User:admin', 'User:service-account-kafka'\\]", + "grantsRefreshPeriodSeconds", "180", + "grantsRefreshPoolSize", "3", + "grantsMaxIdleTimeSeconds", "30", + "httpRetries", "2", + "reuseGrants", "false", + "connectTimeoutSeconds", "15", + "readTimeoutSeconds", "15", + "enableMetrics", "true", + "gcPeriodSeconds", "60", + "includeAcceptHeader", "false" + ); + + + // test OAUTH_INCLUDE_ACCEPT_HEADER fallback + config.remove(AuthzConfig.STRIMZI_AUTHORIZATION_INCLUDE_ACCEPT_HEADER); + System.setProperty(Config.OAUTH_INCLUDE_ACCEPT_HEADER, "false"); - Assert.assertEquals("clusterName: cluster1", 1, TestUtil.countLogForRegex(lines, "clusterName: cluster1")); - Assert.assertEquals("superUsers: ['User:admin', 'User:service-account-kafka']", 1, TestUtil.countLogForRegex(lines, "superUsers: \\['User:admin', 'User:service-account-kafka'\\]")); - Assert.assertEquals("grantsRefreshPeriodSeconds: 180", 1, TestUtil.countLogForRegex(lines, "grantsRefreshPeriodSeconds: 180")); - Assert.assertEquals("grantsRefreshPoolSize: 3", 1, TestUtil.countLogForRegex(lines, "grantsRefreshPoolSize: 3")); - Assert.assertEquals("grantsMaxIdleTimeSeconds: 30", 1, TestUtil.countLogForRegex(lines, "grantsMaxIdleTimeSeconds: 30")); - Assert.assertEquals("httpRetries: 2", 1, TestUtil.countLogForRegex(lines, "httpRetries: 2")); - Assert.assertEquals("reuseGrants: false", 1, TestUtil.countLogForRegex(lines, "reuseGrants: false")); - Assert.assertEquals("connectTimeoutSeconds: 15", 1, TestUtil.countLogForRegex(lines, "connectTimeoutSeconds: 15")); - Assert.assertEquals("readTimeoutSeconds: 15", 1, TestUtil.countLogForRegex(lines, "readTimeoutSeconds: 15")); - Assert.assertEquals("enableMetrics: true", 1, TestUtil.countLogForRegex(lines, "enableMetrics: true")); - Assert.assertEquals("gcPeriodSeconds: 60", 1, TestUtil.countLogForRegex(lines, "gcPeriodSeconds: 60")); + TestAuthzUtil.clearKeycloakAuthorizerService(); + try (KeycloakAuthorizer authorizer = new KeycloakAuthorizer()) { + authorizer.configure(config); + } + + checkLog(logReader, "clusterName", "cluster1", + "includeAcceptHeader", "false" + ); + + System.clearProperty(Config.OAUTH_INCLUDE_ACCEPT_HEADER); // test gcPeriodSeconds set to 0 @@ -506,10 +525,9 @@ void doConfigTests() throws IOException { authorizer.configure(config); } - lines = logReader.readNext(); - - Assert.assertEquals("gcPeriodSeconds invalid value: 0", 1, TestUtil.countLogForRegex(lines, "'strimzi.authorization.grants.gc.period.seconds' set to invalid value: 0, using the default value: 300 seconds")); - Assert.assertEquals("gcPeriodSeconds: 300", 1, TestUtil.countLogForRegex(lines, "gcPeriodSeconds: 300")); + checkLog(logReader, + "'strimzi.authorization.grants.gc.period.seconds' set to invalid value", "0, using the default value: 300 seconds", + "gcPeriodSeconds", "300"); TestAuthzUtil.clearKeycloakAuthorizerService(); } @@ -881,26 +899,24 @@ private static Future> submitAuthorizationCall(Keycloa } private HashMap configureAuthorizer() { - return configureAuthorizer(CLIENT_SRV, CLIENT_SRV_SECRET, TRUSTSTORE_PATH, TRUSTSTORE_PASS); + return configureAuthorizer(CLIENT_SRV, TRUSTSTORE_PATH, TRUSTSTORE_PASS); } - static HashMap configureAuthorizer(String clientSrv, String clientSrvSecret, String trustStorePath, String trustsStorePass) { + static HashMap configureAuthorizer(String clientSrv, String trustStorePath, String trustsStorePass) { HashMap props = new HashMap<>(); - props.put("strimzi.authorization.ssl.truststore.location", trustStorePath); - props.put("strimzi.authorization.ssl.truststore.password", trustsStorePass); - props.put("strimzi.authorization.ssl.truststore.type", "pkcs12"); - - props.put("strimzi.authorization.enable.metrics", "true"); - props.put("strimzi.authorization.token.endpoint.uri", "https://mockoauth:8090/grants"); - props.put("strimzi.authorization.client.id", clientSrv); - props.put("strimzi.authorization.client.secret", clientSrvSecret); - props.put("strimzi.authorization.kafka.cluster.name", "my-cluster"); - props.put("strimzi.authorization.delegate.to.kafka.acl", "false"); - props.put("strimzi.authorization.read.timeout.seconds", "45"); - props.put("strimzi.authorization.connect.timeout.seconds", "10"); - props.put("strimzi.authorization.grants.refresh.pool.size", "2"); - props.put("strimzi.authorization.grants.refresh.period.seconds", "60"); - props.put("strimzi.authorization.http.retries", "1"); + props.put(AuthzConfig.STRIMZI_AUTHORIZATION_SSL_TRUSTSTORE_LOCATION, trustStorePath); + props.put(AuthzConfig.STRIMZI_AUTHORIZATION_SSL_TRUSTSTORE_PASSWORD, trustsStorePass); + props.put(AuthzConfig.STRIMZI_AUTHORIZATION_SSL_TRUSTSTORE_TYPE, "pkcs12"); + props.put(AuthzConfig.STRIMZI_AUTHORIZATION_ENABLE_METRICS, "true"); + props.put(AuthzConfig.STRIMZI_AUTHORIZATION_TOKEN_ENDPOINT_URI, "https://mockoauth:8090/grants"); + props.put(AuthzConfig.STRIMZI_AUTHORIZATION_CLIENT_ID, clientSrv); + props.put(AuthzConfig.STRIMZI_AUTHORIZATION_KAFKA_CLUSTER_NAME, "my-cluster"); + props.put(AuthzConfig.STRIMZI_AUTHORIZATION_DELEGATE_TO_KAFKA_ACL, "false"); + props.put(AuthzConfig.STRIMZI_AUTHORIZATION_READ_TIMEOUT_SECONDS, "45"); + props.put(AuthzConfig.STRIMZI_AUTHORIZATION_CONNECT_TIMEOUT_SECONDS, "10"); + props.put(AuthzConfig.STRIMZI_AUTHORIZATION_GRANTS_REFRESH_POOL_SIZE, "2"); + props.put(AuthzConfig.STRIMZI_AUTHORIZATION_GRANTS_REFRESH_PERIOD_SECONDS, "60"); + props.put(AuthzConfig.STRIMZI_AUTHORIZATION_HTTP_RETRIES, "1"); props.put("super.users", "User:admin;User:service-account-kafka"); props.put("principal.builder.class", "io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder"); return props;