Skip to content

Commit

Permalink
Fix configuration issue for strimzi oauth
Browse files Browse the repository at this point in the history
  • Loading branch information
carlesarnal committed Feb 17, 2025
1 parent 0cfadda commit 3d16024
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static boolean configureKafkaSQLOauth(ApicurioRegistry3 primary, Map<Stri

// spotless:off
var clientSecret = new SecretKeyRefTool(getKafkaSqlAuthSpec(primary)
.map(KafkaSqlAuthSpec::getClientSecret)
.map(KafkaSqlAuthSpec::getClientSecretRef)
.orElse(null), "client-secret");

if (clientSecret.isValid()) {
Expand All @@ -36,7 +36,7 @@ public static boolean configureKafkaSQLOauth(ApicurioRegistry3 primary, Map<Stri
addEnvVar(env, APICURIO_KAFKASQL_SECURITY_SASL_ENABLED, kafkaSqlAuthSpec.getEnabled().toString());
addEnvVar(env, APICURIO_KAFKASQL_SECURITY_SASL_MECHANISM, kafkaSqlAuthSpec.getMechanism());
addEnvVar(env, APICURIO_KAFKA_SECURITY_SASL_CLIENT_ID, kafkaSqlAuthSpec.getClientId());
addEnvVar(env, APICURIO_KAFKASQL_SECURITY_SASL_CLIENT_SECRET, new SecretKeyRefTool(kafkaSqlAuthSpec.getClientSecret(), "client-secret").getSecretVolumeKeyPath());
addEnvVar(env, APICURIO_KAFKASQL_SECURITY_SASL_CLIENT_SECRET, new SecretKeyRefTool(kafkaSqlAuthSpec.getClientSecretRef(), "client-secret").getSecretVolumeKeyPath());
addEnvVar(env, APICURIO_KAFAKSQL_SECURITY_SASL_TOKEN_ENDPOINT, kafkaSqlAuthSpec.getTokenEndpoint());
addEnvVar(env, APICURIO_KAFAKSQL_SECURITY_SASL_LOGIN_CALLBACK_HANDLER_CLASS, kafkaSqlAuthSpec.getLoginHandlerClass());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import static org.assertj.core.api.Assertions.assertThat;

@QuarkusTest
public class AuthITTest extends BaseAuthTest {
public class AuthITTest extends BaseAuthITTest {

/**
* In this test, Keycloak is deployed using a self-signed certificate with the hostname set to the ingress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static org.assertj.core.api.Assertions.assertThat;

@QuarkusTest
public class AuthTLSITTest extends BaseAuthTest {
public class AuthTLSITTest extends BaseAuthITTest {

private static final Logger log = LoggerFactory.getLogger(AuthTLSITTest.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import static org.assertj.core.api.Assertions.assertThat;

@QuarkusTest
public class AuthzITTest extends BaseAuthTest {
public class AuthzITTest extends BaseAuthITTest {

/**
* In this test, Keycloak is deployed using a self-signed certificate with the hostname set to the ingress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,27 @@
import io.apicurio.registry.operator.api.v1.ApicurioRegistry3;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.utils.Serialization;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeAll;

import java.time.Duration;
import java.util.List;

import static io.apicurio.registry.operator.resource.ResourceFactory.deserialize;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public abstract class BaseAuthTest extends ITBase {
public abstract class BaseAuthITTest extends ITBase {

@BeforeAll
public static void init() {
Awaitility.setDefaultTimeout(Duration.ofSeconds(60));
protected static ApicurioRegistry3 prepareInfra(String keycloakResource, String apicurioResource) {
installKeycloak(keycloakResource);

// Deploy Registry
var registry = deserialize(apicurioResource, ApicurioRegistry3.class);

registry.getMetadata().setNamespace(namespace);

return registry;
}

protected static ApicurioRegistry3 prepareInfra(String keycloakResource, String apicurioResource) {
protected static void installKeycloak(String keycloakResource) {
List<HasMetadata> resources = Serialization
.unmarshal(AuthITTest.class.getResourceAsStream(keycloakResource));

Expand All @@ -34,11 +37,5 @@ protected static ApicurioRegistry3 prepareInfra(String keycloakResource, String
createKeycloakDNSResolution("simple-keycloak.apps.cluster.example",
"keycloak." + namespace + ".svc.cluster.local");

// Deploy Registry
var registry = deserialize(apicurioResource, ApicurioRegistry3.class);

registry.getMetadata().setNamespace(namespace);

return registry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
import io.fabric8.kubernetes.api.model.rbac.RoleBinding;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
Expand All @@ -33,6 +35,7 @@
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
Expand Down Expand Up @@ -71,6 +74,8 @@ public enum OperatorDeployment {
protected static String deploymentTarget;
protected static String namespace;
protected static boolean cleanup;

protected static boolean strimziInstalled = false;
private static Operator operator;

@BeforeAll
Expand Down Expand Up @@ -307,6 +312,28 @@ static void createKeycloakDNSResolution(String ingressHostname, String keycloakS
client.apps().deployments().inNamespace(systemNamespace).withName("coredns").rolling().restart();
}

static void applyStrimziResources() throws IOException {
try (BufferedInputStream in = new BufferedInputStream(
new URL("https://strimzi.io/install/latest").openStream())) {
List<HasMetadata> resources = Serialization.unmarshal(in);
resources.forEach(r -> {
if (r.getKind().equals("ClusterRoleBinding") && r instanceof ClusterRoleBinding) {
var crb = (ClusterRoleBinding) r;
crb.getSubjects().forEach(s -> s.setNamespace(namespace));
} else if (r.getKind().equals("RoleBinding") && r instanceof RoleBinding) {
var crb = (RoleBinding) r;
crb.getSubjects().forEach(s -> s.setNamespace(namespace));
}
log.info("Creating Strimzi resource kind {} in namespace {}", r.getKind(), namespace);
client.resource(r).inNamespace(namespace).createOrReplace();
await().atMost(Duration.ofMinutes(2)).ignoreExceptions().until(() -> {
assertThat(client.resource(r).inNamespace(namespace).get()).isNotNull();
return true;
});
});
}
}

static void createNamespace(KubernetesClient client, String namespace) {
log.info("Creating Namespace {}", namespace);
client.resource(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,13 @@
package io.apicurio.registry.operator.it;

import io.apicurio.registry.operator.api.v1.ApicurioRegistry3;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.PodCondition;
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
import io.fabric8.kubernetes.api.model.rbac.RoleBinding;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.URL;
import java.util.List;

import static io.apicurio.registry.operator.resource.ResourceFactory.deserialize;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
Expand All @@ -28,7 +19,9 @@ public class KafkaSqlITTest extends ITBase {

@BeforeAll
public static void beforeAll() throws Exception {
applyStrimziResources();
if (!strimziInstalled) {
applyStrimziResources();
}
}

@Test
Expand Down Expand Up @@ -67,26 +60,4 @@ void testKafkaSQLPlain() {
return true;
});
}

static void applyStrimziResources() throws IOException {
try (BufferedInputStream in = new BufferedInputStream(
new URL("https://strimzi.io/install/latest").openStream())) {
List<HasMetadata> resources = Serialization.unmarshal(in);
resources.forEach(r -> {
if (r.getKind().equals("ClusterRoleBinding") && r instanceof ClusterRoleBinding) {
var crb = (ClusterRoleBinding) r;
crb.getSubjects().forEach(s -> s.setNamespace(namespace));
} else if (r.getKind().equals("RoleBinding") && r instanceof RoleBinding) {
var crb = (RoleBinding) r;
crb.getSubjects().forEach(s -> s.setNamespace(namespace));
}
log.info("Creating Strimzi resource kind {} in namespace {}", r.getKind(), namespace);
client.resource(r).inNamespace(namespace).createOrReplace();
await().ignoreExceptions().until(() -> {
assertThat(client.resource(r).inNamespace(namespace).get()).isNotNull();
return true;
});
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,41 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.apicurio.registry.operator.it.KafkaSqlITTest.applyStrimziResources;
import static io.apicurio.registry.operator.resource.ResourceFactory.deserialize;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

@QuarkusTest
public class KafkaSqlOAuthITTest extends ITBase {
public class KafkaSqlOAuthITTest extends BaseAuthITTest {

private static final Logger log = LoggerFactory.getLogger(KafkaSqlOAuthITTest.class);

@BeforeAll
public static void beforeAll() throws Exception {
applyStrimziResources();
if (!strimziInstalled) {
applyStrimziResources();
}
}

@Test
void testKafkaSQLTLS() {
installKeycloak("/k8s/examples/auth/keycloak.yaml");

client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/oauth/oauth-example-cluster.yaml"))
.create();
final var clusterName = "oauth-example-cluster";

await().ignoreExceptions().untilAsserted(() ->
// Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods
assertThat(client.pods().inNamespace(namespace).withName(clusterName + "-kafka-0").get().getStatus()
.getConditions()).filteredOn(c -> "Ready".equals(c.getType())).map(PodCondition::getStatus)
.containsOnly("True"));
// Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods
assertThat(client.pods().inNamespace(namespace).withName(clusterName + "-kafka-0").get().getStatus()
.getConditions()).filteredOn(c -> "Ready".equals(c.getType())).map(PodCondition::getStatus)
.containsOnly("True"));

// We're guessing the value here to avoid using Strimzi Java model, and relying on retries below.
var bootstrapServers = clusterName + "-kafka-bootstrap." + namespace + ".svc:9093";

var registry = deserialize(
"k8s/examples/kafkasql/oauth/oauth-example-kafkasql-tls.apicurioregistry3.yaml.yaml",
"k8s/examples/kafkasql/oauth/oauth-example-kafkasql-tls.apicurioregistry3.yaml",
ApicurioRegistry3.class);
registry.getMetadata().setNamespace(namespace);
registry.getSpec().getApp().getStorage().getKafkasql().setBootstrapServers(bootstrapServers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.apicurio.registry.operator.it.KafkaSqlITTest.applyStrimziResources;
import static io.apicurio.registry.operator.resource.ResourceFactory.deserialize;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
Expand All @@ -20,7 +19,9 @@ public class KafkaSqlTLSITTest extends ITBase {

@BeforeAll
public static void beforeAll() throws Exception {
applyStrimziResources();
if (!strimziInstalled) {
applyStrimziResources();
}
}

@Test
Expand All @@ -30,10 +31,10 @@ void testKafkaSQLTLS() {
final var clusterName = "example-cluster";

await().ignoreExceptions().untilAsserted(() ->
// Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods
assertThat(client.pods().inNamespace(namespace).withName(clusterName + "-kafka-0").get().getStatus()
.getConditions()).filteredOn(c -> "Ready".equals(c.getType())).map(PodCondition::getStatus)
.containsOnly("True"));
// Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods
assertThat(client.pods().inNamespace(namespace).withName(clusterName + "-kafka-0").get().getStatus()
.getConditions()).filteredOn(c -> "Ready".equals(c.getType())).map(PodCondition::getStatus)
.containsOnly("True"));

client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/tls/apicurio.kafkauser.yaml"))
.inNamespace(namespace).create();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
apiVersion: v1
kind: Secret
metadata:
name: client-credentials
data:
client-secret: dGVzdDE=
---
# IMPORTANT: This resource should only be used for development or testing purposes.
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
Expand All @@ -16,9 +23,8 @@ spec:
type: oauth
clientId: admin-client
clientSecret:
clientSecret:
key: client-secret
secretName: client-credentials
key: client-secret
secretName: client-credentials
validIssuerUri: https://simple-keycloak.apps.cluster.example/realms/registry
jwksEndpointUri: https://simple-keycloak.apps.cluster.example/realms/registry/protocol/openid-connect/certs
userNameClaim: preferred_username
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
apiVersion: v1
kind: Secret
metadata:
name: client-credentials
data:
client-secret: dGVzdDE=
---
apiVersion: registry.apicur.io/v1
kind: ApicurioRegistry3
metadata:
Expand All @@ -17,10 +10,6 @@ spec:
bootstrapServers: "<service name>.<namespace>.svc:9092"
# Try using Strimzi/Red Hat AMQ Streams Operator!
tls:
keystoreSecretRef:
name: apicurio
keystorePasswordSecretRef:
name: apicurio
truststoreSecretRef:
name: oauth-example-cluster-cluster-ca-cert
truststorePasswordSecretRef:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

@JsonDeserialize(using = JsonDeserializer.None.class)
@JsonInclude(NON_NULL)
@JsonPropertyOrder({ "enabled", "mechanism", "clientId", "clientSecret", "tokenEndpoint",
@JsonPropertyOrder({ "enabled", "mechanism", "clientId", "clientSecretRef", "tokenEndpoint",
"loginHandlerClass" })
@NoArgsConstructor
@AllArgsConstructor(access = PRIVATE)
Expand Down Expand Up @@ -50,11 +50,11 @@ public class KafkaSqlAuthSpec {
@JsonSetter(nulls = SKIP)
private String clientId;

@JsonProperty("clientSecret")
@JsonProperty("clientSecretRef")
@JsonPropertyDescription("""
The client secret used to authenticate to Kafka.""")
@JsonSetter(nulls = SKIP)
private SecretKeyRef clientSecret;
private SecretKeyRef clientSecretRef;

@JsonProperty("tokenEndpoint")
@JsonPropertyDescription("""
Expand Down

0 comments on commit 3d16024

Please sign in to comment.