Skip to content

Commit

Permalink
Address comments from code review.
Browse files Browse the repository at this point in the history
Signed-off-by: Hunter Madison <[email protected]>
  • Loading branch information
hmadison committed Jul 28, 2023
1 parent d5a099e commit adfbf8e
Show file tree
Hide file tree
Showing 26 changed files with 135 additions and 61 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1485,3 +1485,7 @@ The JWT tokens are signed by the authorization server when they are issued. The
The client may have obtained a new access token, but the Kafka broker has not yet refreshed the public keys from JWKS endpoint resulting in a mismatch. The Kafka Broker will automatically refresh JWT keys if it encounters an unknown `kid`, and the problem will self-correct in this case, you may just need to repeat your request a few times.

It can also happen the other way around. Your existing client may still use the refresh token or the access token issued by the previous authorization server instance while the Kafka broker has already refreshed the keys from JWKS endpoint - resulting in a mismatch between the private key used by authorization server to sign the token, and the published public keys (JWKS endpoint). Since the problem is on the client you may need to configure your client with a newly obtained refresh token, or access token. If you configure your client with clientId and secret, it should autocorrect by itself, you just need to restart it.

### HTTP 406: Not Acceptable errors.

For certain servers setting the `Accept` header on outbound requests to `application/json` can cause the identity provider to reject the request. If that is an issue, you can set `oauth.include.accept.header` to `false` and remove the `Accept` header from outbound requests made by the Kafka server or client.
2 changes: 1 addition & 1 deletion examples/kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ They assume Keycloak is used as an authorization server, with properly configure

* `kafka-oauth-single-authz-service-accounts.yaml`

A single node Kafka cluster using [service account tokens](https://kubernetes.io/docs/reference/access-authn-authz/authentication/#service-account-tokens) for authorization and the `simple` authorizer.
A single node Kafka cluster using [service account tokens](https://kubernetes.io/docs/reference/access-authn-authz/authentication/#service-account-tokens) for authentication and the `simple` authorizer.
It requires that the `kube-root-ca.crt` be copied from its ConfigMap to a Secret:

kubectl get configmap/kube-root-ca.crt -o=json | jq -r '.data."ca.crt"' | kubectl create secret generic kube-root-ca --from-file=ca.crt=/dev/stdin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class JaasClientOauthLoginCallbackHandler implements AuthenticateCallback
private SensorKeyProducer tokenSensorKeyProducer;

private final ClientMetricsHandler authenticatorMetrics = new ClientMetricsHandler();
private boolean includeAcceptHeader;

@Override
public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
Expand Down Expand Up @@ -125,6 +126,7 @@ public void configure(Map<String, ?> configs, String saslMechanism, List<AppConf
readTimeout = getReadTimeout(config);
retries = getHttpRetries(config);
retryPauseMillis = getHttpRetryPauseMillis(config, retries);
includeAcceptHeader = config.getValueAsBoolean(Config.OAUTH_INCLUDE_ACCEPT_HEADER, true);
checkConfiguration();

principalExtractor = new PrincipalExtractor(
Expand Down Expand Up @@ -274,11 +276,11 @@ private void handleCallback(OAuthBearerTokenCallback callback) throws IOExceptio
// we could check if it's a JWT - in that case we could check if it's expired
result = loginWithAccessToken(token, isJwt, principalExtractor);
} else if (refreshToken != null) {
result = loginWithRefreshToken(tokenEndpoint, socketFactory, hostnameVerifier, refreshToken, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, authenticatorMetrics, retries, retryPauseMillis);
result = loginWithRefreshToken(tokenEndpoint, socketFactory, hostnameVerifier, refreshToken, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, authenticatorMetrics, retries, retryPauseMillis, includeAcceptHeader);
} else if (username != null) {
result = loginWithPassword(tokenEndpoint, socketFactory, hostnameVerifier, username, password, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, authenticatorMetrics, retries, retryPauseMillis);
result = loginWithPassword(tokenEndpoint, socketFactory, hostnameVerifier, username, password, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, authenticatorMetrics, retries, retryPauseMillis, includeAcceptHeader);
} else if (clientSecret != null) {
result = loginWithClientSecret(tokenEndpoint, socketFactory, hostnameVerifier, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, authenticatorMetrics, retries, retryPauseMillis);
result = loginWithClientSecret(tokenEndpoint, socketFactory, hostnameVerifier, clientId, clientSecret, isJwt, principalExtractor, scope, audience, connectTimeout, readTimeout, authenticatorMetrics, retries, retryPauseMillis, includeAcceptHeader);
} else {
throw new IllegalStateException("Invalid oauth client configuration - no credentials");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public class Config {
/** The name of 'oauth.enable.metrics' config option */
public static final String OAUTH_ENABLE_METRICS = "oauth.enable.metrics";

/**
* Whether http requests should include "application/json" when being sent to the upstream OIDC server.
*/
public static final String OAUTH_INCLUDE_ACCEPT_HEADER = "oauth.include.accept.header";

/** The name of 'oauth.tokens.not.jwt' config option */
@Deprecated
public static final String OAUTH_TOKENS_NOT_JWT = "oauth.tokens.not.jwt";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,22 @@ public static String getConfigWithFallbackLookup(Config c, String key, String fa
}
return result;
}

/**
* Resolve the configuration value for the key as a string.
* If the key is not present, fallback to using a secondary key.
*
* @param c the Config object
* @param key the configuration key
* @param fallbackKey the fallback key
* @param defautValue the default value
* @return Configured value as String
*/
public static Boolean getDefaultBooleanConfigWithFallbackLookup(Config c, String key, String fallbackKey, boolean defautValue) {
String result = c.getValue(key);
if (result == null) {
return c.getValueAsBoolean(fallbackKey, defautValue);
}
return c.getValueAsBoolean(key, defautValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,14 @@ public static <T> T post(URI uri, SSLSocketFactory socketFactory, HostnameVerifi
* @param responseType The type to which to convert the response (String or one of the Jackson Mapper types)
* @param connectTimeout Connect timeout in seconds
* @param readTimeout Read timeout in seconds
* @param includeAcceptHeader TODO
* @return The response as specified by the <code>responseType</code>.
* @param <T> Generic type of the <code>responseType</code>
* @throws IOException A connection, timeout, or network exception that occurs while performing the request
* @throws HttpException A runtime exception when an HTTP response status signals a failed request
*/
public static <T> T post(URI uri, SSLSocketFactory socketFactory, HostnameVerifier verifier, String authorization, String contentType, String body, Class<T> responseType, int connectTimeout, int readTimeout) throws IOException {
return request(uri, "POST", socketFactory, verifier, authorization, contentType, body, responseType, connectTimeout, readTimeout, true);
public static <T> T post(URI uri, SSLSocketFactory socketFactory, HostnameVerifier verifier, String authorization, String contentType, String body, Class<T> responseType, int connectTimeout, int readTimeout, boolean includeAcceptHeader) throws IOException {
return request(uri, "POST", socketFactory, verifier, authorization, contentType, body, responseType, connectTimeout, readTimeout, includeAcceptHeader);
}

/**
Expand Down
Loading

0 comments on commit adfbf8e

Please sign in to comment.