Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support disabling the Accept header when requesting Json Web Key Sets. #201

Merged
merged 9 commits into from
Aug 9, 2023
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1485,3 +1485,7 @@ The JWT tokens are signed by the authorization server when they are issued. The
The client may have obtained a new access token, but the Kafka broker has not yet refreshed the public keys from JWKS endpoint resulting in a mismatch. The Kafka Broker will automatically refresh JWT keys if it encounters an unknown `kid`, and the problem will self-correct in this case, you may just need to repeat your request a few times.

It can also happen the other way around. Your existing client may still use the refresh token or the access token issued by the previous authorization server instance while the Kafka broker has already refreshed the keys from JWKS endpoint - resulting in a mismatch between the private key used by authorization server to sign the token, and the published public keys (JWKS endpoint). Since the problem is on the client you may need to configure your client with a newly obtained refresh token, or access token. If you configure your client with clientId and secret, it should autocorrect by itself, you just need to restart it.

### HTTP 406: Not Acceptable errors.

For certain servers setting the `Accept` header on outbound requests to `application/json` can cause the identity provider to reject the request. If that is an issue, you can set `oauth.include.accept.header` to `false` and remove the `Accept` header from outbound requests made by the Kafka server or client.
18 changes: 17 additions & 1 deletion examples/docker/strimzi-kafka-image/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The result is that the most recent Strimzi Kafka OAuth jars and their dependenci
Building
--------

Use `docker build` to build the image:
Run `mvn install` then, use `docker build` to build the image:

docker build -t strimzi/kafka:latest-kafka-3.3.2-oauth .

Expand Down Expand Up @@ -69,10 +69,14 @@ You need to retag the built image before so you can push it to Docker Registry:
Actually, Kubernetes Kind supports an even simpler option how to make an image available to Kubernetes:

kind load docker-image strimzi/kafka:latest-kafka-3.3.2-oauth

If you're using minikube, you'll need to run `minikube docker-env` before building the image.

Deploying
---------

## Via the Strimzi Repository

In order for the operator to use your Kafka image, you have to replace the Kafka image coordinates in `packaging/install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml` in your `strimzi-kafka-operator` project.

This image builds the kafka-3.3.2 replacement image, so we need to replace all occurrences where kafka-3.3.2 is referred to into the proper coordinates to our image:
Expand All @@ -88,3 +92,15 @@ It's best to check the `060-Deployment-strimzi-cluster-operator.yaml` file manua

You can now deploy Strimzi Kafka Operator following instructions in [HACKING.md](../../../HACKING.md)

## Via Helm

You can also run the operator via its Helm chart and set the `kafka.image.registry` property to your local registry. As an example, if you've built and tagged the image as `local.dev/strimzi/kafka:0.36.0-kafka-3.5.0 `. You can run it using the operator as:

helm repo add strimzi https://strimzi.io/charts/ --force-update
helm upgrade -i -n strimzi strimzi strimzi/strimzi-kafka-operator \
--version 0.36.0 \
--set watchNamespaces="{default}" \
--set generateNetworkPolicy=false \
--set kafka.image.registry="local.dev" \
--wait \
--create-namespace
1 change: 0 additions & 1 deletion examples/kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ They assume Keycloak is used as an authorization server, with properly configure
A single node Kafka cluster with OAuth 2 authentication with OAuth metrics enabled.
See [README-metrics.md]() for how to setup this example.


### Deploying Keycloak and accessing the Keycloak Admin Console

Before deploying any of the Kafka cluster definitions, you need to deploy a Keycloak instance, and configure the realms with the necessary client definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class JaasClientOauthLoginCallbackHandler implements AuthenticateCallback
private SensorKeyProducer tokenSensorKeyProducer;

private final ClientMetricsHandler authenticatorMetrics = new ClientMetricsHandler();
private boolean includeAcceptHeader;

@Override
public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
Expand Down Expand Up @@ -125,6 +126,7 @@ public void configure(Map<String, ?> configs, String saslMechanism, List<AppConf
readTimeout = getReadTimeout(config);
retries = getHttpRetries(config);
retryPauseMillis = getHttpRetryPauseMillis(config, retries);
includeAcceptHeader = config.getValueAsBoolean(Config.OAUTH_INCLUDE_ACCEPT_HEADER, true);
checkConfiguration();

principalExtractor = new PrincipalExtractor(
Expand Down Expand Up @@ -164,7 +166,8 @@ public void configure(Map<String, ?> configs, String saslMechanism, List<AppConf
+ "\n readTimeout: " + readTimeout
+ "\n retries: " + retries
+ "\n retryPauseMillis: " + retryPauseMillis
+ "\n enableMetrics: " + enableMetrics);
+ "\n enableMetrics: " + enableMetrics
+ "\n includeAcceptHeader: " + includeAcceptHeader);
}
}

Expand Down Expand Up @@ -274,11 +277,11 @@ private void handleCallback(OAuthBearerTokenCallback callback) throws IOExceptio
// we could check if it's a JWT - in that case we could check if it's expired
result = loginWithAccessToken(token, isJwt, principalExtractor);
} else if (refreshToken != null) {
result = loginWithRefreshToken(tokenEndpoint, socketFactory, hostnameVerifier, refreshToken, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, authenticatorMetrics, retries, retryPauseMillis);
result = loginWithRefreshToken(tokenEndpoint, socketFactory, hostnameVerifier, refreshToken, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, authenticatorMetrics, retries, retryPauseMillis, includeAcceptHeader);
} else if (username != null) {
result = loginWithPassword(tokenEndpoint, socketFactory, hostnameVerifier, username, password, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, authenticatorMetrics, retries, retryPauseMillis);
result = loginWithPassword(tokenEndpoint, socketFactory, hostnameVerifier, username, password, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, authenticatorMetrics, retries, retryPauseMillis, includeAcceptHeader);
} else if (clientSecret != null) {
result = loginWithClientSecret(tokenEndpoint, socketFactory, hostnameVerifier, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, authenticatorMetrics, retries, retryPauseMillis);
result = loginWithClientSecret(tokenEndpoint, socketFactory, hostnameVerifier, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, authenticatorMetrics, retries, retryPauseMillis, includeAcceptHeader);
} else {
throw new IllegalStateException("Invalid oauth client configuration - no credentials");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public class Config {
/** The name of 'oauth.enable.metrics' config option */
public static final String OAUTH_ENABLE_METRICS = "oauth.enable.metrics";

/**
* Whether http requests should include "application/json" when being sent to the upstream OIDC server.
*/
public static final String OAUTH_INCLUDE_ACCEPT_HEADER = "oauth.include.accept.header";

/** The name of 'oauth.tokens.not.jwt' config option */
@Deprecated
public static final String OAUTH_TOKENS_NOT_JWT = "oauth.tokens.not.jwt";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,22 @@ public static String getConfigWithFallbackLookup(Config c, String key, String fa
}
return result;
}

/**
* Resolve the configuration value for the key as a string.
* If the key is not present, fallback to using a secondary key.
*
* @param c the Config object
* @param key the configuration key
* @param fallbackKey the fallback key
* @param defautValue the default value
* @return Configured value as String
*/
public static Boolean getDefaultBooleanConfigWithFallbackLookup(Config c, String key, String fallbackKey, boolean defautValue) {
String result = c.getValue(key);
if (result == null) {
return c.getValueAsBoolean(fallbackKey, defautValue);
}
return c.getValueAsBoolean(key, defautValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,27 @@ public static <T> T get(URI uri, SSLSocketFactory socketFactory, HostnameVerifie
* @throws HttpException A runtime exception when an HTTP response status signals a failed request
*/
public static <T> T get(URI uri, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String authorization, Class<T> responseType, int connectTimeout, int readTimeout) throws IOException {
return request(uri, "GET", socketFactory, hostnameVerifier, authorization, null, null, responseType, connectTimeout, readTimeout);
return request(uri, "GET", socketFactory, hostnameVerifier, authorization, null, null, responseType, connectTimeout, readTimeout, true);
}

/**
* Perform HTTP GET request and return the response in the specified type.
*
* @param uri The target url
* @param socketFactory Socket factory to use with https:// url
* @param hostnameVerifier HostnameVerifier to use with https:// url
* @param authorization The Authorization header value
* @param responseType The type to which to convert the response (String or one of the Jackson Mapper types)
* @param connectTimeout Connect timeout in seconds
* @param readTimeout Read timeout in seconds
* @param includeAcceptHeader Determines if <code>Accept application/json</code> is sent to the remote server.
* @return The response as specified by the <code>responseType</code>.
* @param <T> Generic type of the <code>responseType</code>
* @throws IOException A connection, timeout, or network exception that occurs while performing the request
* @throws HttpException A runtime exception when an HTTP response status signals a failed request
*/
public static <T> T get(URI uri, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String authorization, Class<T> responseType, int connectTimeout, int readTimeout, boolean includeAcceptHeader) throws IOException {
return request(uri, "GET", socketFactory, hostnameVerifier, authorization, null, null, responseType, connectTimeout, readTimeout, includeAcceptHeader);
}

/**
Expand Down Expand Up @@ -242,13 +262,14 @@ public static <T> T post(URI uri, SSLSocketFactory socketFactory, HostnameVerifi
* @param responseType The type to which to convert the response (String or one of the Jackson Mapper types)
* @param connectTimeout Connect timeout in seconds
* @param readTimeout Read timeout in seconds
* @param includeAcceptHeader TODO
* @return The response as specified by the <code>responseType</code>.
* @param <T> Generic type of the <code>responseType</code>
* @throws IOException A connection, timeout, or network exception that occurs while performing the request
* @throws HttpException A runtime exception when an HTTP response status signals a failed request
*/
public static <T> T post(URI uri, SSLSocketFactory socketFactory, HostnameVerifier verifier, String authorization, String contentType, String body, Class<T> responseType, int connectTimeout, int readTimeout) throws IOException {
return request(uri, "POST", socketFactory, verifier, authorization, contentType, body, responseType, connectTimeout, readTimeout);
public static <T> T post(URI uri, SSLSocketFactory socketFactory, HostnameVerifier verifier, String authorization, String contentType, String body, Class<T> responseType, int connectTimeout, int readTimeout, boolean includeAcceptHeader) throws IOException {
return request(uri, "POST", socketFactory, verifier, authorization, contentType, body, responseType, connectTimeout, readTimeout, includeAcceptHeader);
}

/**
Expand Down Expand Up @@ -311,7 +332,7 @@ public static void put(URI uri, SSLSocketFactory socketFactory, HostnameVerifier
* @throws HttpException A runtime exception when an HTTP response status signals a failed request
*/
public static void put(URI uri, SSLSocketFactory socketFactory, HostnameVerifier verifier, String authorization, String contentType, String body, int connectTimeout, int readTimeout) throws IOException {
request(uri, "PUT", socketFactory, verifier, authorization, contentType, body, null, connectTimeout, readTimeout);
request(uri, "PUT", socketFactory, verifier, authorization, contentType, body, null, connectTimeout, readTimeout, true);
}

/**
Expand All @@ -323,7 +344,7 @@ public static void put(URI uri, SSLSocketFactory socketFactory, HostnameVerifier
* @throws HttpException A runtime exception when an HTTP response status signals a failed request
*/
public static void delete(URI uri, String authorization) throws IOException {
request(uri, "DELETE", null, null, authorization, null, null, null, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT);
request(uri, "DELETE", null, null, authorization, null, null, null, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT, true);
}

/**
Expand All @@ -337,7 +358,7 @@ public static void delete(URI uri, String authorization) throws IOException {
* @throws HttpException A runtime exception when an HTTP response status signals a failed request
*/
public static void delete(URI uri, SSLSocketFactory socketFactory, HostnameVerifier verifier, String authorization) throws IOException {
request(uri, "DELETE", socketFactory, verifier, authorization, null, null, null, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT);
request(uri, "DELETE", socketFactory, verifier, authorization, null, null, null, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT, true);
}

/**
Expand All @@ -353,7 +374,7 @@ public static void delete(URI uri, SSLSocketFactory socketFactory, HostnameVerif
* @throws HttpException A runtime exception when an HTTP response status signals a failed request
*/
public static void delete(URI uri, SSLSocketFactory socketFactory, HostnameVerifier verifier, String authorization, int connectTimeout, int readTimeout) throws IOException {
request(uri, "DELETE", socketFactory, verifier, authorization, null, null, null, connectTimeout, readTimeout);
request(uri, "DELETE", socketFactory, verifier, authorization, null, null, null, connectTimeout, readTimeout, true);
}

/**
Expand All @@ -375,7 +396,7 @@ public static void delete(URI uri, SSLSocketFactory socketFactory, HostnameVerif
* @throws HttpException A runtime exception when an HTTP response status signals a failed request
*/
public static <T> T request(URI uri, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String authorization, String contentType, String body, Class<T> responseType) throws IOException {
return request(uri, null, socketFactory, hostnameVerifier, authorization, contentType, body, responseType, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT);
return request(uri, null, socketFactory, hostnameVerifier, authorization, contentType, body, responseType, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT, true);
}

/**
Expand All @@ -395,7 +416,7 @@ public static <T> T request(URI uri, SSLSocketFactory socketFactory, HostnameVer
* @throws HttpException A runtime exception when an HTTP response status signals a failed request
*/
public static <T> T request(URI uri, String method, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String authorization, String contentType, String body, Class<T> responseType) throws IOException {
return request(uri, method, socketFactory, hostnameVerifier, authorization, contentType, body, responseType, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT);
return request(uri, method, socketFactory, hostnameVerifier, authorization, contentType, body, responseType, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT, true);
}

/**
Expand All @@ -413,13 +434,14 @@ public static <T> T request(URI uri, String method, SSLSocketFactory socketFacto
* @param readTimeout Read timeout in seconds
* @return The response as specified by the <code>responseType</code>.
* @param <T> Generic type of the <code>responseType</code>
* @param includeAcceptHeader Determines if <code>Accept application/json</code> is sent to the remote server.
* @throws IOException A connection, timeout, or network exception that occurs while performing the request
* @throws HttpException A runtime exception when an HTTP response status signals a failed request
*/
// Suppressed because of Spotbugs Java 11 bug - https://github.com/spotbugs/spotbugs/issues/756
@SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
public static <T> T request(URI uri, String method, SSLSocketFactory socketFactory, HostnameVerifier hostnameVerifier, String authorization,
String contentType, String body, Class<T> responseType, int connectTimeout, int readTimeout) throws IOException {
String contentType, String body, Class<T> responseType, int connectTimeout, int readTimeout, boolean includeAcceptHeader) throws IOException {
HttpURLConnection con;
try {
con = (HttpURLConnection) uri.toURL().openConnection();
Expand All @@ -443,7 +465,11 @@ public static <T> T request(URI uri, String method, SSLSocketFactory socketFacto
if (authorization != null) {
con.setRequestProperty("Authorization", authorization);
}
con.setRequestProperty("Accept", "application/json");

if (includeAcceptHeader) {
con.setRequestProperty("Accept", "application/json");
}

if (body != null && body.length() > 0) {
if (contentType == null) {
throw new IllegalArgumentException("contentType must be set when body is not null");
Expand Down
Loading