configs) {
+
+ Object sslPrincipalMappingRules = configs.get(BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG);
+ Object sslPrincipalMapper;
+
+ try {
+ Class> clazz = Class.forName("org.apache.kafka.common.security.ssl.SslPrincipalMapper");
+ try {
+ Method m = clazz.getMethod("fromRules", List.class);
+ if (sslPrincipalMappingRules == null) {
+ sslPrincipalMappingRules = Collections.singletonList("DEFAULT");
+ }
+ sslPrincipalMapper = m.invoke(null, sslPrincipalMappingRules);
+
+ } catch (NoSuchMethodException ex) {
+ Method m = clazz.getMethod("fromRules", String.class);
+ if (sslPrincipalMappingRules == null) {
+ sslPrincipalMappingRules = "DEFAULT";
+ }
+ sslPrincipalMapper = m.invoke(null, sslPrincipalMappingRules);
+ }
+
+ // Hack setting sslPrincipalMapper to DefaultKafkaPrincipalBuilder
+ // An alternative would be to copy paste the complete DefaultKafkaPrincipalBuilder implementation
+ // into this class and extend it
+
+ SET_PRINCIPAL_MAPPER.invoke(this, sslPrincipalMapper);
+
+ } catch (RuntimeException e) {
+ throw new RuntimeException("Failed to initialize JwtKafkaPrincioalBuilder", e);
+
+ } catch (ClassNotFoundException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | InvocationTargetException e) {
+ throw new RuntimeException("Failed to initialize JwtKafkaPrincioalBuilder", e);
+ }
+ }
+
+ @Override
+ public KafkaPrincipal build(AuthenticationContext context) {
+ if (context instanceof SaslAuthenticationContext) {
+ OAuthBearerSaslServer server = (OAuthBearerSaslServer) ((SaslAuthenticationContext) context).server();
+ if (OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(server.getMechanismName())) {
+ return new JwtKafkaPrincipal(KafkaPrincipal.USER_TYPE,
+ server.getAuthorizationID(),
+ (BearerTokenWithPayload) server.getNegotiatedProperty("OAUTHBEARER.token"));
+ }
+ }
+
+ return super.build(context);
+ }
+}
diff --git a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/KeycloakRBACAuthorizer.java b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/KeycloakRBACAuthorizer.java
new file mode 100644
index 00000000..1a7c307b
--- /dev/null
+++ b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/KeycloakRBACAuthorizer.java
@@ -0,0 +1,470 @@
+/*
+ * Copyright 2017-2019, Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.oauth.server.authorizer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.strimzi.kafka.oauth.client.ClientConfig;
+import io.strimzi.kafka.oauth.common.Config;
+import io.strimzi.kafka.oauth.common.ConfigUtil;
+import io.strimzi.kafka.oauth.common.HttpException;
+import io.strimzi.kafka.oauth.common.JSONUtil;
+import io.strimzi.kafka.oauth.common.BearerTokenWithPayload;
+import io.strimzi.kafka.oauth.common.SSLUtil;
+import kafka.network.RequestChannel;
+import kafka.security.auth.Acl;
+import kafka.security.auth.Operation;
+import kafka.security.auth.Resource;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.immutable.Set;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLSocketFactory;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static io.strimzi.kafka.oauth.common.HttpUtil.post;
+import static io.strimzi.kafka.oauth.common.OAuthAuthenticator.urlencode;
+
+/**
+ * An authorizer that grants access based on security policies managed in Keycloak Authorization Services.
+ * It works in conjunction with JaasServerOauthValidatorCallbackHandler, and requires
+ * {@link io.strimzi.kafka.oauth.server.authorizer.JwtKafkaPrincipalBuilder} to be configured as
+ * 'principal.builder.class' in 'server.properties' file.
+ *
+ * To install this authorizer in Kafka, specify the following in your 'server.properties':
+ *
+ *
+ * authorizer.class.name=io.strimzi.kafka.oauth.server.authorizer.KeycloakRBACAuthorizer
+ * principal.builder.class=io.strimzi.kafka.oauth.server.authorizer.JwtKafkaPrincipalBuilder
+ *
+ *
+ * There is additional configuration that needs to be specified in order for this authorizer to work.
+ *
+ *
+ * Note: The following configuration keys can be specified as properties in Kafka `server.properties` file, or as
+ * ENV vars in which case an all-uppercase key name is also attempted with '.' replaced by '_' (e.g. STRIMZI_AUTHORIZATION_TOKEN_ENDPOINT_URI).
+ * They can also be specified as system properties. The priority is in reverse - system property overrides the ENV var, which overrides
+ * `server.properties`.
+ *
+ *
+ * Required configuration:
+ *
+ *
+ * - strimzi.authorization.token.endpoint.uri A URL of the Keycloak's token endpoint (e.g. https://keycloak:8443/auth/realms/master/protocol/openid-connect/token).
+ * If not present, oauth.token.endpoint.uri is used as a fallback configuration key to avoid unnecessary duplication when already present for the purpose of client authentication.
+ *
+ * - strimzi.authorization.client.id A client id of the OAuth client definition in Keycloak, that has Authorization Services enabled.
+ * Typically it is called 'kafka'.
+ * If not present, oauth.client.id is used as a fallback configuration key to avoid unnecessary duplication when already present for the purpose of client authentication.
+ *
+ *
+ *
+ * Optional configuration:
+ *
+ *
+ * - strimzi.authorization.kafka.cluster.name The name of this cluster, used to target permissions to specific Kafka cluster, making it possible to manage multiple clusters within the same Keycloak realm.
+ * The default value is kafka-cluster
+ *
+ * - strimzi.authorization.delegate.to.kafka.acl Whether authorization decision should be delegated to SimpleACLAuthorizer if DENIED by Keycloak Authorization Services policies.
+ * The default value is false
+ *
+ *
+ *
+ * TLS configuration:
+ *
+ *
+ * - strimzi.authorization.ssl.truststore.location The location of the truststore file on the filesystem.
+ * If not present, oauth.ssl.truststore.location is used as a fallback configuration key to avoid unnecessary duplication when already present for the purpose of client authentication.
+ *
+ * - strimzi.authorization.ssl.truststore.password The password for the truststore.
+ * If not present, oauth.ssl.truststore.password is used as a fallback configuration key to avoid unnecessary duplication when already present for the purpose of client authentication.
+ *
+ * - strimzi.authorization.ssl.truststore.type The truststore type.
+ * If not present, oauth.ssl.truststore.type is used as a fallback configuration key to avoid unnecessary duplication when already present for the purpose of client authentication.
+ * If not set, the Java KeyStore default type is used.
+ *
+ * - strimzi.authorization.ssl.secure.random.implementation The random number generator implementation. See Java SDK documentation.
+ * If not present, oauth.ssl.secure.random.implementation is used as a fallback configuration key to avoid unnecessary duplication when already present for the purpose of client authentication.
+ * If not set, the Java platform SDK default is used.
+ *
+ * - strimzi.authorization.ssl.endpoint.identification.algorithm Specify how to perform hostname verification. If set to empty string the hostname verification is turned off.
+ * If not present, oauth.ssl.endpoint.identification.algorithm is used as a fallback configuration key to avoid unnecessary duplication when already present for the purpose of client authentication.
+ * If not set, the default value is HTTPS which enforces hostname verification for server certificates.
+ *
+ *
+ *
+ * This authorizer honors the super.users configuration. Super users are automatically granted any authorization request.
+ *
+ */
+@SuppressWarnings("deprecation")
+public class KeycloakRBACAuthorizer extends kafka.security.auth.SimpleAclAuthorizer {
+
+ private static final String PRINCIPAL_BUILDER_CLASS = "io.strimzi.kafka.oauth.server.authorizer.JwtKafkaPrincipalBuilder";
+
+ static final Logger log = LoggerFactory.getLogger(KeycloakRBACAuthorizer.class);
+ static final Logger GRANT_LOG = LoggerFactory.getLogger(KeycloakRBACAuthorizer.class.getName() + ".grant");
+ static final Logger DENY_LOG = LoggerFactory.getLogger(KeycloakRBACAuthorizer.class.getName() + ".deny");
+
+ private URI tokenEndpointUrl;
+ private String clientId;
+ private String clusterName;
+ private SSLSocketFactory socketFactory;
+ private HostnameVerifier hostnameVerifier;
+ private List superUsers = Collections.emptyList();
+ private boolean delegateToKafkaACL = false;
+
+
+ public KeycloakRBACAuthorizer() {
+ super();
+ }
+
+ @Override
+ public void configure(Map configs) {
+ super.configure(configs);
+
+ AuthzConfig config = convertToCommonConfig(configs);
+
+ String pbclass = (String) configs.get("principal.builder.class");
+ if (!PRINCIPAL_BUILDER_CLASS.equals(pbclass)) {
+ throw new RuntimeException("KeycloakRBACAuthorizer requires " + PRINCIPAL_BUILDER_CLASS + " as 'principal.builder.class'");
+ }
+
+ String endpoint = ConfigUtil.getConfigWithFallbackLookup(config, AuthzConfig.STRIMZI_AUTHORIZATION_TOKEN_ENDPOINT_URI,
+ ClientConfig.OAUTH_TOKEN_ENDPOINT_URI);
+ if (endpoint == null) {
+ throw new RuntimeException("OAuth2 Token Endpoint ('strimzi.authorization.token.endpoint.uri') not set.");
+ }
+
+ try {
+ tokenEndpointUrl = new URI(endpoint);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Specified token endpoint uri is invalid: " + endpoint);
+ }
+
+ clientId = ConfigUtil.getConfigWithFallbackLookup(config, AuthzConfig.STRIMZI_AUTHORIZATION_CLIENT_ID, ClientConfig.OAUTH_CLIENT_ID);
+ if (clientId == null) {
+ throw new RuntimeException("OAuth2 Client Id ('strimzi.authorization.client.id') not set.");
+ }
+
+ socketFactory = createSSLFactory(config);
+ hostnameVerifier = createHostnameVerifier(config);
+
+ clusterName = config.getValue(AuthzConfig.STRIMZI_AUTHORIZATION_KAFKA_CLUSTER_NAME);
+ if (clusterName == null) {
+ clusterName = "kafka-cluster";
+ }
+
+ delegateToKafkaACL = config.getValueAsBoolean(AuthzConfig.STRIMZI_AUTHORIZATION_DELEGATE_TO_KAFKA_ACL, false);
+
+ String users = (String) configs.get("super.users");
+ if (users != null) {
+ superUsers = Arrays.asList(users.split(";"))
+ .stream()
+ .map(s -> UserSpec.of(s))
+ .collect(Collectors.toList());
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Configured KeycloakRBACAuthorizer:\n tokenEndpointUri: " + tokenEndpointUrl
+ + "\n sslSocketFactory: " + socketFactory
+ + "\n hostnameVerifier: " + hostnameVerifier
+ + "\n clientId: " + clientId
+ + "\n clusterName: " + clusterName
+ + "\n delegateToKafkaACL: " + delegateToKafkaACL
+ + "\n superUsers: " + superUsers.stream().map(u -> u.getType() + ":" + u.getName()).collect(Collectors.toList()));
+ }
+ }
+
+ /**
+ * This method transforms strimzi.authorization.* entries into oauth.* entries in order to be able to use existing ConfigUtil
+ * methods for setting up certificate truststore and hostname verification.
+ *
+ * It also makes sure to copy over 'as-is' all the config keys expected in server.properties for configuring
+ * this authorizer.
+ *
+ * @param configs Kafka configs map
+ * @return Config object
+ */
+ static AuthzConfig convertToCommonConfig(Map configs) {
+ Properties p = new Properties();
+
+ String[] keys = {
+ AuthzConfig.STRIMZI_AUTHORIZATION_DELEGATE_TO_KAFKA_ACL,
+ AuthzConfig.STRIMZI_AUTHORIZATION_KAFKA_CLUSTER_NAME,
+ AuthzConfig.STRIMZI_AUTHORIZATION_CLIENT_ID,
+ AuthzConfig.OAUTH_CLIENT_ID,
+ AuthzConfig.STRIMZI_AUTHORIZATION_TOKEN_ENDPOINT_URI,
+ ClientConfig.OAUTH_TOKEN_ENDPOINT_URI,
+ AuthzConfig.STRIMZI_AUTHORIZATION_SSL_TRUSTSTORE_LOCATION,
+ Config.OAUTH_SSL_TRUSTSTORE_LOCATION,
+ AuthzConfig.STRIMZI_AUTHORIZATION_SSL_TRUSTSTORE_PASSWORD,
+ Config.OAUTH_SSL_TRUSTSTORE_PASSWORD,
+ AuthzConfig.STRIMZI_AUTHORIZATION_SSL_TRUSTSTORE_TYPE,
+ Config.OAUTH_SSL_TRUSTSTORE_TYPE,
+ AuthzConfig.STRIMZI_AUTHORIZATION_SSL_SECURE_RANDOM_IMPLEMENTATION,
+ Config.OAUTH_SSL_SECURE_RANDOM_IMPLEMENTATION,
+ AuthzConfig.STRIMZI_AUTHORIZATION_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
+ Config.OAUTH_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
+ };
+
+ // copy over the keys
+ for (String key: keys) {
+ ConfigUtil.putIfNotNull(p, key, configs.get(key));
+ }
+
+ return new AuthzConfig(p);
+ }
+
+ static SSLSocketFactory createSSLFactory(Config config) {
+ String truststore = ConfigUtil.getConfigWithFallbackLookup(config,
+ AuthzConfig.STRIMZI_AUTHORIZATION_SSL_TRUSTSTORE_LOCATION, Config.OAUTH_SSL_TRUSTSTORE_LOCATION);
+ String password = ConfigUtil.getConfigWithFallbackLookup(config,
+ AuthzConfig.STRIMZI_AUTHORIZATION_SSL_TRUSTSTORE_PASSWORD, Config.OAUTH_SSL_TRUSTSTORE_PASSWORD);
+ String type = ConfigUtil.getConfigWithFallbackLookup(config,
+ AuthzConfig.STRIMZI_AUTHORIZATION_SSL_TRUSTSTORE_TYPE, Config.OAUTH_SSL_TRUSTSTORE_TYPE);
+ String rnd = ConfigUtil.getConfigWithFallbackLookup(config,
+ AuthzConfig.STRIMZI_AUTHORIZATION_SSL_SECURE_RANDOM_IMPLEMENTATION, Config.OAUTH_SSL_SECURE_RANDOM_IMPLEMENTATION);
+
+ return SSLUtil.createSSLFactory(truststore, password, type, rnd);
+ }
+
+ static HostnameVerifier createHostnameVerifier(Config config) {
+ String hostCheck = ConfigUtil.getConfigWithFallbackLookup(config,
+ AuthzConfig.STRIMZI_AUTHORIZATION_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, Config.OAUTH_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM);
+
+ if (hostCheck == null) {
+ hostCheck = "HTTPS";
+ }
+ // Following Kafka convention for skipping hostname validation (when set to )
+ return "".equals(hostCheck) ? SSLUtil.createAnyHostHostnameVerifier() : null;
+ }
+
+ /**
+ * The method that makes the authorization decision.
+ *
+ * We assume authorize() is thread-safe in a sense that there will not be two concurrent threads
+ * calling it at the same time for the same session.
+ *
+ * Should that not be the case, the side effect could be to make more calls to token endpoint than necessary.
+ * Other than that it should not affect proper functioning of this authorizer.
+ *
+ * @param session Current session
+ * @param operation Operation to authorize
+ * @param resource Resource to authorize
+ * @return true if permission is granted
+ */
+ @Override
+ public boolean authorize(RequestChannel.Session session, Operation operation, Resource resource) {
+
+ KafkaPrincipal principal = session.principal();
+
+ for (UserSpec u: superUsers) {
+ if (principal.getPrincipalType().equals(u.getType())
+ && principal.getName().equals(u.getName())) {
+
+ // it's a super user. super users are granted everything
+ if (GRANT_LOG.isDebugEnabled()) {
+ GRANT_LOG.debug("Authorization GRANTED - user is a superuser: " + session.principal() + ", operation: " + operation + ", resource: " + resource);
+ }
+ return true;
+ }
+ }
+
+ if (!(principal instanceof JwtKafkaPrincipal)) {
+ // if user wasn't authenticated over OAuth, and simple ACL delegation is enabled
+ // we delegate to simple ACL
+ return delegateIfRequested(session, operation, resource, null);
+ }
+
+ //
+ // Check if authorization grants are available
+ // If not, fetch authorization grants and store them in the token
+ //
+
+ JwtKafkaPrincipal jwtPrincipal = (JwtKafkaPrincipal) principal;
+
+ BearerTokenWithPayload token = jwtPrincipal.getJwt();
+ JsonNode authz = (JsonNode) token.getPayload();
+
+ if (authz == null) {
+ // fetch authorization grants
+ try {
+ authz = fetchAuthorizationGrants(token.value());
+ if (authz == null) {
+ authz = new ObjectNode(JSONUtil.MAPPER.getNodeFactory());
+ }
+ } catch (HttpException e) {
+ if (e.getStatus() == 403) {
+ authz = new ObjectNode(JSONUtil.MAPPER.getNodeFactory());
+ } else {
+ log.warn("Unexpected status while fetching authorization data - will retry next time: " + e.getMessage());
+ }
+ }
+ if (authz != null) {
+ // store authz grants in the token so they are available for subsequent requests
+ token.setPayload(authz);
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("authorize(): " + authz);
+ }
+
+ //
+ // Iterate authorization rules and try to find a match
+ //
+
+ if (authz != null) {
+ Iterator it = authz.iterator();
+ while (it.hasNext()) {
+ JsonNode permission = it.next();
+ String name = permission.get("rsname").asText();
+ ResourceSpec resourceSpec = ResourceSpec.of(name);
+ if (resourceSpec.match(clusterName, resource.resourceType().name(), resource.name())) {
+
+ ScopesSpec grantedScopes = ScopesSpec.of(
+ validateScopes(
+ JSONUtil.asListOfString(permission.get("scopes"))));
+
+ if (grantedScopes.isGranted(operation.name())) {
+ if (GRANT_LOG.isDebugEnabled()) {
+ GRANT_LOG.debug("Authorization GRANTED - cluster: " + clusterName + ",user: " + session.principal() + ", operation: " + operation +
+ ", resource: " + resource + "\nGranted scopes for resource (" + resourceSpec + "): " + grantedScopes);
+ }
+ return true;
+ }
+ }
+ }
+ }
+ return delegateIfRequested(session, operation, resource, authz);
+ }
+
+ static List validateScopes(List scopes) {
+ List enumScopes = new ArrayList<>(scopes.size());
+ for (String name: scopes) {
+ try {
+ enumScopes.add(ScopesSpec.AuthzScope.valueOf(name));
+ } catch (Exception e) {
+ log.warn("[IGNORED] Invalid scope detected in authorization scopes list: " + name);
+ }
+ }
+ return enumScopes;
+ }
+
+ boolean delegateIfRequested(RequestChannel.Session session, Operation operation, Resource resource, JsonNode authz) {
+ String nonAuthMessageFragment = session.principal() instanceof JwtKafkaPrincipal ? "" : " non-oauth";
+ if (delegateToKafkaACL) {
+ boolean granted = super.authorize(session, operation, resource);
+
+ boolean grantLogOn = granted && GRANT_LOG.isDebugEnabled();
+ boolean denyLogOn = !granted && DENY_LOG.isDebugEnabled();
+
+ if (grantLogOn || denyLogOn) {
+ String status = granted ? "GRANTED" : "DENIED";
+ String message = "Authorization " + status + " by ACL -" + nonAuthMessageFragment + " user: " + session.principal() + ", operation: " + operation + ", resource: " + resource;
+
+ if (grantLogOn) {
+ GRANT_LOG.debug(message);
+ } else if (denyLogOn) {
+ DENY_LOG.debug(message);
+ }
+ }
+ return granted;
+ }
+
+ if (DENY_LOG.isDebugEnabled()) {
+ DENY_LOG.debug("Authorization DENIED -" + nonAuthMessageFragment + " user: " + session.principal() +
+ " cluster: " + clusterName + ", operation: " + operation + ", resource: " + resource + "\n permissions: " + authz);
+ }
+ return false;
+ }
+
+ JsonNode fetchAuthorizationGrants(String token) {
+
+ String authorization = "Bearer " + token;
+
+ StringBuilder body = new StringBuilder("audience=").append(urlencode(clientId))
+ .append("&grant_type=").append(urlencode("urn:ietf:params:oauth:grant-type:uma-ticket"))
+ .append("&response_mode=permissions");
+
+ JsonNode response;
+
+ try {
+ response = post(tokenEndpointUrl, socketFactory, hostnameVerifier, authorization,
+ "application/x-www-form-urlencoded", body.toString(), JsonNode.class);
+
+ } catch (HttpException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to fetch authorization data from authorization server: ", e);
+ }
+
+ return response;
+ }
+
+ @Override
+ public void addAcls(Set acls, Resource resource) {
+ if (!delegateToKafkaACL) {
+ throw new RuntimeException("Simple ACL delegation not enabled");
+ }
+ super.addAcls(acls, resource);
+ }
+
+ @Override
+ public boolean removeAcls(Set aclsTobeRemoved, Resource resource) {
+ if (!delegateToKafkaACL) {
+ throw new RuntimeException("Simple ACL delegation not enabled");
+ }
+ return super.removeAcls(aclsTobeRemoved, resource);
+ }
+
+ @Override
+ public boolean removeAcls(Resource resource) {
+ if (!delegateToKafkaACL) {
+ throw new RuntimeException("Simple ACL delegation not enabled");
+ }
+ return super.removeAcls(resource);
+ }
+
+ @Override
+ public Set getAcls(Resource resource) {
+ if (!delegateToKafkaACL) {
+ throw new RuntimeException("Simple ACL delegation not enabled");
+ }
+ return super.getAcls(resource);
+ }
+
+ @Override
+ public scala.collection.immutable.Map> getAcls(KafkaPrincipal principal) {
+ if (!delegateToKafkaACL) {
+ throw new RuntimeException("Simple ACL delegation not enabled");
+ }
+ return super.getAcls(principal);
+ }
+
+ @Override
+ public scala.collection.immutable.Map> getAcls() {
+ if (!delegateToKafkaACL) {
+ throw new RuntimeException("Simple ACL delegation not enabled");
+ }
+ return super.getAcls();
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ }
+}
diff --git a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/ResourceSpec.java b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/ResourceSpec.java
new file mode 100644
index 00000000..04b5f299
--- /dev/null
+++ b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/ResourceSpec.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2017-2019, Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.oauth.server.authorizer;
+
+import java.util.Locale;
+
+/**
+ * ResourceSpec is used to parse resource matching pattern and to perform matching to specific resource.
+ */
+public class ResourceSpec {
+
+ public enum ResourceType {
+ Topic,
+ Group,
+ Cluster,
+ TransactionalId,
+ DelegationToken
+ }
+
+ private String clusterName;
+ private boolean clusterStartsWith;
+
+ private ResourceType resourceType;
+ private String resourceName;
+ private boolean resourceStartsWith;
+
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public boolean isClusterStartsWith() {
+ return clusterStartsWith;
+ }
+
+ public ResourceType getResourceType() {
+ return resourceType;
+ }
+
+ public String getResourceName() {
+ return resourceName;
+ }
+
+ public boolean isResourceStartsWith() {
+ return resourceStartsWith;
+ }
+
+ /**
+ * Match specific resource's cluster, type and name to this ResourceSpec
+ *
+ * If clusterName is set then cluster must match, otherwise cluster match is ignored.
+ * Type and name are always matched.
+ *
+ * @param cluster Kafka cluster name such as: my-kafka
+ * @param type Resource type such as: Topic, Group
+ * @param name Resource name such as: my-topic
+ * @return true if cluster, type and name match this resource spec
+ */
+ public boolean match(String cluster, String type, String name) {
+ if (clusterName != null) {
+ if (cluster == null) {
+ throw new IllegalArgumentException("cluster == null");
+ }
+ if (clusterStartsWith) {
+ if (!cluster.startsWith(clusterName)) {
+ return false;
+ }
+ } else if (!cluster.equals(clusterName)) {
+ return false;
+ }
+ }
+
+ if (type == null) {
+ throw new IllegalArgumentException("type == null");
+ }
+ if (resourceType == null || !type.equals(resourceType.name())) {
+ return false;
+ }
+
+ if (name == null) {
+ throw new IllegalArgumentException("name == null");
+ }
+ if (resourceStartsWith) {
+ if (!name.startsWith(resourceName)) {
+ return false;
+ }
+ } else if (!name.equals(resourceName)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public static ResourceSpec of(String name) {
+ ResourceSpec spec = new ResourceSpec();
+
+ String[] parts = name.split(",");
+ for (String part: parts) {
+ String[] subSpec = part.split(":");
+ if (subSpec.length != 2) {
+ throw new RuntimeException("Failed to parse Resource: " + name + " - part doesn't follow TYPE:NAME pattern: " + part);
+ }
+
+ String type = subSpec[0].toLowerCase(Locale.US);
+ String pat = subSpec[1];
+ if (type.equals("kafka-cluster")) {
+ if (spec.clusterName != null) {
+ throw new RuntimeException("Failed to parse Resource: " + name + " - cluster part specified multiple times");
+ }
+ if (pat.endsWith("*")) {
+ spec.clusterName = pat.substring(pat.length() - 1);
+ spec.clusterStartsWith = true;
+ } else {
+ spec.clusterName = pat;
+ }
+ continue;
+ }
+
+ if (spec.resourceName != null) {
+ throw new RuntimeException("Failed to parse Resource: " + name + " - resource part specified multiple times");
+ }
+
+ if (type.equals("topic")) {
+ spec.resourceType = ResourceType.Topic;
+ } else if (type.equals("group")) {
+ spec.resourceType = ResourceType.Group;
+ } else if (type.equals("cluster")) {
+ spec.resourceType = ResourceType.Cluster;
+ } else if (type.equals("transactionalid")) {
+ spec.resourceType = ResourceType.TransactionalId;
+ } else if (type.equals("delegationtoken")) {
+ spec.resourceType = ResourceType.DelegationToken;
+ } else {
+ throw new RuntimeException("Failed to parse Resource: " + name + " - unsupported segment type: " + subSpec[0]);
+ }
+
+ if (pat.endsWith("*")) {
+ spec.resourceName = pat.substring(0, pat.length() - 1);
+ spec.resourceStartsWith = true;
+ } else {
+ spec.resourceName = pat;
+ }
+ }
+
+ return spec;
+ }
+
+ @Override
+ public String toString() {
+ return (clusterName != null ?
+ ("kafka-cluster:" + clusterName + (clusterStartsWith ? "*" : "") + ",")
+ : "") +
+ (resourceName != null ?
+ (resourceType + ":" + resourceName + (resourceStartsWith ? "*" : ":"))
+ : "");
+ }
+}
diff --git a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/ScopesSpec.java b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/ScopesSpec.java
new file mode 100644
index 00000000..81e30a2e
--- /dev/null
+++ b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/ScopesSpec.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2017-2019, Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.oauth.server.authorizer;
+
+import java.util.EnumSet;
+import java.util.List;
+
+public class ScopesSpec {
+
+ public enum AuthzScope {
+ Create,
+ Read,
+ Write,
+ Delete,
+ Alter,
+ Describe,
+ AlterConfigs,
+ DescribeConfigs,
+ ClusterAction,
+ IdempotentWrite
+ }
+
+ private EnumSet granted;
+
+ private ScopesSpec(EnumSet grants) {
+ this.granted = grants;
+ }
+
+
+ static ScopesSpec of(List scopes) {
+ return new ScopesSpec(EnumSet.copyOf(scopes));
+ }
+
+ public boolean isGranted(String operation) {
+ AuthzScope scope = AuthzScope.valueOf(operation);
+ return granted.contains(scope);
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(granted);
+ }
+}
diff --git a/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/UserSpec.java b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/UserSpec.java
new file mode 100644
index 00000000..644a985b
--- /dev/null
+++ b/oauth-keycloak-authorizer/src/main/java/io/strimzi/kafka/oauth/server/authorizer/UserSpec.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2017-2019, Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.oauth.server.authorizer;
+
+public class UserSpec {
+
+ private final String type;
+ private final String name;
+
+ private UserSpec(String type, String name) {
+ this.type = type;
+ this.name = name;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+
+ public static UserSpec of(String principal) {
+ int pos = principal.indexOf(':');
+ if (pos <= 0) {
+ throw new IllegalArgumentException("Invalid user specification: " + principal);
+ }
+ return new UserSpec(principal.substring(0, pos), principal.substring(pos + 1));
+ }
+
+ public String toString() {
+ return super.toString() + " " + type + ":" + name;
+ }
+}
diff --git a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java
index 236b9bc6..e0b17a5b 100644
--- a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java
+++ b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java
@@ -6,6 +6,7 @@
import io.strimzi.kafka.oauth.common.Config;
import io.strimzi.kafka.oauth.common.ConfigUtil;
+import io.strimzi.kafka.oauth.common.BearerTokenWithPayload;
import io.strimzi.kafka.oauth.validator.JWTSignatureValidator;
import io.strimzi.kafka.oauth.validator.OAuthIntrospectionValidator;
import io.strimzi.kafka.oauth.common.TokenInfo;
@@ -14,7 +15,6 @@
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
-import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
import org.keycloak.jose.jws.JWSInput;
import org.keycloak.jose.jws.JWSInputException;
@@ -156,7 +156,19 @@ private void handleCallback(OAuthBearerValidatorCallback callback) {
try {
TokenInfo ti = validateToken(token);
- callback.token(new OAuthBearerToken() {
+ callback.token(new BearerTokenWithPayload() {
+
+ private Object payload;
+
+ @Override
+ public Object getPayload() {
+ return payload;
+ }
+
+ @Override
+ public void setPayload(Object value) {
+ payload = value;
+ }
@Override
public String value() {
@@ -189,6 +201,7 @@ public String principalName() {
public Long startTimeMs() {
return ti.issuedAtMs();
}
+
});
} catch (TokenValidationException e) {
diff --git a/pom.xml b/pom.xml
index 1fffcb7a..b1ee1d70 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,7 @@
1.6.3
2.3.0
+ 2.12.8
2.9.10.2
4.12
1.7.26
@@ -90,6 +91,7 @@
oauth-common
oauth-client
oauth-server
+ oauth-keycloak-authorizer
examples/consumer
examples/producer
@@ -111,6 +113,16 @@
kafka-clients
${kafka.version}