Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Keycloak Authorization configuration #2432

Merged
merged 15 commits into from
Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = KafkaAuthorizationSimple.TYPE_SIMPLE, value = KafkaAuthorizationSimple.class),
@JsonSubTypes.Type(name = KafkaAuthorizationKeycloak.TYPE_KEYCLOAK, value = KafkaAuthorizationKeycloak.class)
})
@JsonInclude(JsonInclude.Include.NON_NULL)
@EqualsAndHashCode
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.api.kafka.model;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import io.strimzi.crdgenerator.annotations.Description;
import io.strimzi.crdgenerator.annotations.Example;
import io.sundr.builder.annotations.Buildable;
import lombok.EqualsAndHashCode;

import java.util.List;

/**
* Configures Keycloak authorization on the brokers
*/
@Buildable(
editableEnabled = false,
generateBuilderPackage = false,
builderPackage = "io.fabric8.kubernetes.api.builder"
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"type", "clientId", "tokenEndpointUri",
"tlsTrustedCertificates", "disableTlsHostnameVerification",
"delegateToKafkaAcls", "superUsers"})
@EqualsAndHashCode
public class KafkaAuthorizationKeycloak extends KafkaAuthorization {
private static final long serialVersionUID = 1L;

public static final String TYPE_KEYCLOAK = "keycloak";

public static final String AUTHORIZER_CLASS_NAME = "io.strimzi.kafka.oauth.server.authorizer.KeycloakRBACAuthorizer";
public static final String PRINCIPAL_BUILDER_CLASS_NAME = "io.strimzi.kafka.oauth.server.authorizer.JwtKafkaPrincipalBuilder";

private String clientId;
private String tokenEndpointUri;
private List<CertSecretSource> tlsTrustedCertificates;
private boolean disableTlsHostnameVerification = false;
private boolean delegateToKafkaAcls = false;
private List<String> superUsers;

@Description("Must be `" + TYPE_KEYCLOAK + "`")
@Override
public String getType() {
return TYPE_KEYCLOAK;
}

@Description("OAuth Client ID which the Kafka client can use to authenticate against the OAuth server and use the token endpoint URI.")
public String getClientId() {
return clientId;
}

public void setClientId(String clientId) {
this.clientId = clientId;
}

@Description("Authorization server token endpoint URI.")
public String getTokenEndpointUri() {
return tokenEndpointUri;
}

public void setTokenEndpointUri(String tokenEndpointUri) {
this.tokenEndpointUri = tokenEndpointUri;
}

@Description("Trusted certificates for TLS connection to the OAuth server.")
@JsonInclude(JsonInclude.Include.NON_NULL)
public List<CertSecretSource> getTlsTrustedCertificates() {
return tlsTrustedCertificates;
}

public void setTlsTrustedCertificates(List<CertSecretSource> tlsTrustedCertificates) {
this.tlsTrustedCertificates = tlsTrustedCertificates;
}

@Description("Enable or disable TLS hostname verification. " +
"Default value is `false`.")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isDisableTlsHostnameVerification() {
return disableTlsHostnameVerification;
}

public void setDisableTlsHostnameVerification(boolean disableTlsHostnameVerification) {
this.disableTlsHostnameVerification = disableTlsHostnameVerification;
}

@Description("Whether authorization decision should be delegated to the 'Simple' authorizer if DENIED by Keycloak Authorization Services policies." +
"Default value is `false`.")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isDelegateToKafkaAcls() {
return delegateToKafkaAcls;
}

public void setDelegateToKafkaAcls(boolean delegateToKafkaAcls) {
this.delegateToKafkaAcls = delegateToKafkaAcls;
}

@Description("List of super users. Should contain list of user principals which should get unlimited access rights.")
@Example("- CN=my-user\n" +
"- CN=my-other-user")
@JsonInclude(JsonInclude.Include.NON_NULL)
public List<String> getSuperUsers() {
return superUsers;
}

public void setSuperUsers(List<String> superUsers) {
this.superUsers = superUsers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.fabric8.kubernetes.api.model.VolumeMount;
import io.strimzi.api.kafka.model.CertAndKeySecretSource;
import io.strimzi.api.kafka.model.KafkaAuthorization;
import io.strimzi.api.kafka.model.KafkaAuthorizationKeycloak;
import io.strimzi.api.kafka.model.KafkaAuthorizationSimple;
import io.strimzi.api.kafka.model.KafkaResources;
import io.strimzi.api.kafka.model.Rack;
Expand Down Expand Up @@ -369,32 +370,62 @@ private String getSecurityProtocol(boolean tls, boolean sasl) {
public KafkaBrokerConfigurationBuilder withAuthorization(String clusterName, KafkaAuthorization authorization) {
if (authorization != null) {
List<String> superUsers = new ArrayList<>();
String authorizerClass = "";

// Broker super users
superUsers.add(String.format("User:CN=%s,O=io.strimzi", KafkaResources.kafkaStatefulSetName(clusterName)));
superUsers.add(String.format("User:CN=%s-%s,O=io.strimzi", clusterName, "entity-operator"));
superUsers.add(String.format("User:CN=%s-%s,O=io.strimzi", clusterName, "kafka-exporter"));

// User configured super users
if (KafkaAuthorizationSimple.TYPE_SIMPLE.equals(authorization.getType())) {
KafkaAuthorizationSimple simpleAuthz = (KafkaAuthorizationSimple) authorization;
authorizerClass = KafkaAuthorizationSimple.AUTHORIZER_CLASS_NAME;

if (simpleAuthz.getSuperUsers() != null && simpleAuthz.getSuperUsers().size() > 0) {
superUsers.addAll(simpleAuthz.getSuperUsers().stream().map(e -> String.format("User:%s", e)).collect(Collectors.toList()));
}
}

printSectionHeader("Authorization");
writer.println("authorizer.class.name=" + authorizerClass);
configureAuthorization(clusterName, superUsers, authorization);
writer.println("super.users=" + String.join(";", superUsers));
writer.println();
}

return this;
}

/**
* Configures authorization for the Kafka brokers. This method is used only internally.
*
* @param clusterName Name of the cluster
* @param superUsers Super users list who have all the rights on the cluster
* @param authorization The authorization configuration from the Kafka CR
*/
private void configureAuthorization(String clusterName, List<String> superUsers, KafkaAuthorization authorization) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maybe split this into two methods? configureSimpleAuthorization and configureKeycloakAuthorization?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a similar pattern for configuring authentication on which I agree. The configuraAuthentication method is just one and we have the logic for different authentication mechanisms inside. I am not sure about the advantage of having different methods here.

if (KafkaAuthorizationSimple.TYPE_SIMPLE.equals(authorization.getType())) {
KafkaAuthorizationSimple simpleAuthz = (KafkaAuthorizationSimple) authorization;
writer.println("authorizer.class.name=" + KafkaAuthorizationSimple.AUTHORIZER_CLASS_NAME);

// User configured super users
if (simpleAuthz.getSuperUsers() != null && simpleAuthz.getSuperUsers().size() > 0) {
superUsers.addAll(simpleAuthz.getSuperUsers().stream().map(e -> String.format("User:%s", e)).collect(Collectors.toList()));
}
} else if (KafkaAuthorizationKeycloak.TYPE_KEYCLOAK.equals(authorization.getType())) {
KafkaAuthorizationKeycloak keycloakAuthz = (KafkaAuthorizationKeycloak) authorization;
writer.println("authorizer.class.name=" + KafkaAuthorizationKeycloak.AUTHORIZER_CLASS_NAME);
writer.println("principal.builder.class=" + KafkaAuthorizationKeycloak.PRINCIPAL_BUILDER_CLASS_NAME);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you double check what impact does this have on the internal super users? Do they still keey their own names?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They should keep their names, yes.

The JwtKafkaPrincipalBuilder only does something different when user was authenticated over SASL_OAUTHBEARER as seen here. An internal user would be handled by the same logic as before.

writer.println("strimzi.authorization.token.endpoint.uri=" + keycloakAuthz.getTokenEndpointUri());
writer.println("strimzi.authorization.client.id=" + keycloakAuthz.getClientId());
writer.println("strimzi.authorization.delegate.to.kafka.acl=" + keycloakAuthz.isDelegateToKafkaAcls());
writer.println("strimzi.authorization.kafka.cluster.name=" + clusterName);

if (keycloakAuthz.getTlsTrustedCertificates() != null && keycloakAuthz.getTlsTrustedCertificates().size() > 0) {
writer.println("strimzi.authorization.ssl.truststore.location=/tmp/kafka/authz-keycloak.truststore.p12");
writer.println("strimzi.authorization.ssl.truststore.password=${CERTS_STORE_PASSWORD}");
writer.println("strimzi.authorization.ssl.truststore.type=PKCS12");
writer.println("strimzi.authorization.ssl.secure.random.implementation=SHA1PRNG");
String endpointIdentificationAlgorithm = keycloakAuthz.isDisableTlsHostnameVerification() ? "" : "HTTPS";
writer.println("strimzi.authorization.ssl.endpoint.identification.algorithm=" + endpointIdentificationAlgorithm);
}

// User configured super users
if (keycloakAuthz.getSuperUsers() != null && keycloakAuthz.getSuperUsers().size() > 0) {
superUsers.addAll(keycloakAuthz.getSuperUsers().stream().map(e -> String.format("User:%s", e)).collect(Collectors.toList()));
}
}
}

/**
* Configures the configuration options passed by the user in the Kafka CR.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.strimzi.api.kafka.model.InlineLogging;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaAuthorization;
import io.strimzi.api.kafka.model.KafkaAuthorizationKeycloak;
import io.strimzi.api.kafka.model.KafkaClusterSpec;
import io.strimzi.api.kafka.model.KafkaResources;
import io.strimzi.api.kafka.model.Logging;
Expand Down Expand Up @@ -490,12 +491,14 @@ public static KafkaCluster fromCrd(Kafka kafkaAssembly, KafkaVersion.Lookup vers
KafkaListeners listeners = kafkaClusterSpec.getListeners();
result.setListeners(listeners);

boolean isListenerOAuth = false;
if (listeners != null) {
if (listeners.getPlain() != null) {
if (listeners.getPlain().getAuth() instanceof KafkaListenerAuthenticationTls) {
throw new InvalidResourceException("You cannot configure TLS authentication on a plain listener.");
} else if (listeners.getPlain().getAuth() instanceof KafkaListenerAuthenticationOAuth) {
validateOauth((KafkaListenerAuthenticationOAuth) listeners.getPlain().getAuth(), "Plain listener");
isListenerOAuth = true;
}
}

Expand All @@ -504,11 +507,13 @@ public static KafkaCluster fromCrd(Kafka kafkaAssembly, KafkaVersion.Lookup vers
throw new InvalidResourceException("TLS Client Authentication can be used only with enabled TLS encryption!");
} else if (listeners.getExternal().getAuth() != null && listeners.getExternal().getAuth() instanceof KafkaListenerAuthenticationOAuth) {
validateOauth((KafkaListenerAuthenticationOAuth) listeners.getExternal().getAuth(), "External listener");
isListenerOAuth = true;
}
}

if (listeners.getTls() != null && listeners.getTls().getAuth() != null && listeners.getTls().getAuth() instanceof KafkaListenerAuthenticationOAuth) {
validateOauth((KafkaListenerAuthenticationOAuth) listeners.getTls().getAuth(), "TLS listener");
isListenerOAuth = true;
}

if (listeners.getExternal() != null) {
Expand Down Expand Up @@ -542,6 +547,18 @@ public static KafkaCluster fromCrd(Kafka kafkaAssembly, KafkaVersion.Lookup vers
}
}

if (kafkaClusterSpec.getAuthorization() instanceof KafkaAuthorizationKeycloak) {
if (!isListenerOAuth) {
throw new InvalidResourceException("You cannot configure Keycloak Authorization without any listener with OAuth based authentication");
} else {
KafkaAuthorizationKeycloak authorizationKeycloak = (KafkaAuthorizationKeycloak) kafkaClusterSpec.getAuthorization();
if (authorizationKeycloak.getClientId() == null || authorizationKeycloak.getTokenEndpointUri() == null) {
log.error("Keycloak Authorization: Token Endpoint URI and clientId are both required");
throw new InvalidResourceException("Keycloak Authorization: Token Endpoint URI and clientId are both required");
}
}
}

result.setAuthorization(kafkaClusterSpec.getAuthorization());

if (kafkaClusterSpec.getTemplate() != null) {
Expand Down Expand Up @@ -1415,6 +1432,11 @@ private List<Volume> getVolumes(boolean isOpenShift) {
}
}

if (authorization instanceof KafkaAuthorizationKeycloak) {
KafkaAuthorizationKeycloak keycloakAuthz = (KafkaAuthorizationKeycloak) authorization;
volumeList.addAll(AuthenticationUtils.configureOauthCertificateVolumes("authz-keycloak", keycloakAuthz.getTlsTrustedCertificates(), isOpenShift));
}

return volumeList;
}

Expand Down Expand Up @@ -1475,6 +1497,11 @@ private List<VolumeMount> getVolumeMounts() {
}
}

if (authorization instanceof KafkaAuthorizationKeycloak) {
KafkaAuthorizationKeycloak keycloakAuthz = (KafkaAuthorizationKeycloak) authorization;
volumeMountList.addAll(AuthenticationUtils.configureOauthCertificateVolumeMounts("authz-keycloak", keycloakAuthz.getTlsTrustedCertificates(), OAUTH_TRUSTED_CERTS_BASE_VOLUME_MOUNT + "/authz-keycloak-certs"));
}

return volumeMountList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.strimzi.api.kafka.model.CertSecretSource;
import io.strimzi.api.kafka.model.CertSecretSourceBuilder;
import io.strimzi.api.kafka.model.KafkaAuthorization;
import io.strimzi.api.kafka.model.KafkaAuthorizationKeycloakBuilder;
import io.strimzi.api.kafka.model.KafkaAuthorizationSimpleBuilder;
import io.strimzi.api.kafka.model.Rack;
import io.strimzi.api.kafka.model.listener.IngressListenerBrokerConfiguration;
Expand Down Expand Up @@ -97,7 +98,7 @@ public void testNoAuthorization() {
}

@Test
public void testAuthorizationWithSuperUsers() {
public void testSimpleAuthorizationWithSuperUsers() {
KafkaAuthorization auth = new KafkaAuthorizationSimpleBuilder()
.addToSuperUsers("jakub", "CN=kuba")
.build();
Expand All @@ -111,7 +112,7 @@ public void testAuthorizationWithSuperUsers() {
}

@Test
public void testAuthorizationWithoutSuperUsers() {
public void testSimpleAuthorizationWithoutSuperUsers() {
KafkaAuthorization auth = new KafkaAuthorizationSimpleBuilder()
.build();

Expand All @@ -123,6 +124,40 @@ public void testAuthorizationWithoutSuperUsers() {
"super.users=User:CN=my-cluster-kafka,O=io.strimzi;User:CN=my-cluster-entity-operator,O=io.strimzi;User:CN=my-cluster-kafka-exporter,O=io.strimzi"));
}

@Test
public void testKeycloakAuthorization() {
CertSecretSource cert = new CertSecretSourceBuilder()
.withNewSecretName("my-secret")
.withNewCertificate("my.crt")
.build();

KafkaAuthorization auth = new KafkaAuthorizationKeycloakBuilder()
.withTokenEndpointUri("http://token-endpoint-uri")
.withClientId("my-client-id")
.withDelegateToKafkaAcls(false)
.withTlsTrustedCertificates(cert)
.withDisableTlsHostnameVerification(true)
.addToSuperUsers("giada", "CN=paccu")
.build();

String configuration = new KafkaBrokerConfigurationBuilder()
.withAuthorization("my-cluster", auth)
.build();

assertThat(configuration, isEquivalent("authorizer.class.name=io.strimzi.kafka.oauth.server.authorizer.KeycloakRBACAuthorizer\n" +
"principal.builder.class=io.strimzi.kafka.oauth.server.authorizer.JwtKafkaPrincipalBuilder\n" +
"strimzi.authorization.token.endpoint.uri=http://token-endpoint-uri\n" +
"strimzi.authorization.client.id=my-client-id\n" +
"strimzi.authorization.delegate.to.kafka.acl=false\n" +
"strimzi.authorization.kafka.cluster.name=my-cluster\n" +
"strimzi.authorization.ssl.truststore.location=/tmp/kafka/authz-keycloak.truststore.p12\n" +
"strimzi.authorization.ssl.truststore.password=${CERTS_STORE_PASSWORD}\n" +
"strimzi.authorization.ssl.truststore.type=PKCS12\n" +
"strimzi.authorization.ssl.secure.random.implementation=SHA1PRNG\n" +
"strimzi.authorization.ssl.endpoint.identification.algorithm=\n" +
"super.users=User:CN=my-cluster-kafka,O=io.strimzi;User:CN=my-cluster-entity-operator,O=io.strimzi;User:CN=my-cluster-kafka-exporter,O=io.strimzi;User:giada;User:CN=paccu"));
}

@Test
public void testNullUserConfiguration() {
String configuration = new KafkaBrokerConfigurationBuilder()
Expand Down
Loading