Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal Kafka clients should do TLS hostname verification except Nodeports #7526

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -419,15 +419,13 @@ final protected void configureScramSha(SecurityProtocol securityProtocol) {

this.setAdditionalConfig(this.getAdditionalConfig() +
// scram-sha
"ssl.endpoint.identification.algorithm=\n" +
"sasl.mechanism=SCRAM-SHA-512\n" +
"security.protocol=" + securityProtocol + "\n" +
"sasl.jaas.config=" + saslJaasConfigDecrypted);
}

final protected void configureTls() {
this.setAdditionalConfig(this.getAdditionalConfig() +
"ssl.endpoint.identification.algorithm=\n" +
"sasl.mechanism=GSSAPI\n" +
"security.protocol=" + SecurityProtocol.SSL + "\n");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,21 @@ public static SystemTestCertAndKey generateIntermediateCaCertAndKey(SystemTestCe
.withSubjectDn(STRIMZI_INTERMEDIATE_CA)
.build();
}

public static SystemTestCertAndKey generateStrimziCaCertAndKey(SystemTestCertAndKey rootCert, String subjectDn) {
return strimziCaCertBuilder(rootCert)
.withSubjectDn(subjectDn)
.build();
}

public static SystemTestCertAndKey generateEndEntityCertAndKey(final SystemTestCertAndKey intermediateCert,
final ASN1Encodable[] sansNames) {
return endEntityCertBuilder(intermediateCert)
.withSubjectDn(STRIMZI_END_SUBJECT)
.withSanDnsNames(sansNames)
.build();
}

public static SystemTestCertAndKey generateEndEntityCertAndKey(SystemTestCertAndKey intermediateCert) {
return endEntityCertBuilder(intermediateCert)
.withSubjectDn(STRIMZI_END_SUBJECT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ private void testWeirdUsername(ExtensionContext extensionContext, String weirdUs
.withTopicName(topicName)
.withMessageCount(MESSAGE_COUNT)
.withUserName(weirdUserName)
// we disable ssl.endpoint.identification.algorithm for external listener (i.e., NodePort),
// because TLS hostname verification is not supported on such listener type.
.withAdditionalConfig("ssl.endpoint.identification.algorithm=\n")
.build();

if (auth.getType().equals(Constants.TLS_LISTENER_DEFAULT_NAME)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bouncycastle.asn1.ASN1Encodable;
import org.bouncycastle.asn1.x509.GeneralName;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtensionContext;
Expand Down Expand Up @@ -680,7 +682,13 @@ void testCustomSoloCertificatesForNodePort(ExtensionContext extensionContext) {
final TestStorage testStorage = new TestStorage(extensionContext);
final String clusterCustomCertServer1 = testStorage.getClusterName() + "-" + customCertServer1;

SecretUtils.createCustomSecret(clusterCustomCertServer1, testStorage.getClusterName(), testStorage.getNamespaceName(), STRIMZI_CERT_AND_KEY_1);
final SystemTestCertAndKey root1 = generateRootCaCertAndKey();
final SystemTestCertAndKey intermediate1 = generateIntermediateCaCertAndKey(root1);
final SystemTestCertAndKey strimzi1 = generateEndEntityCertAndKey(intermediate1, this.retrieveKafkaBrokerSANs(testStorage));

final CertAndKeyFiles strimziCertAndKey1 = exportToPemFiles(strimzi1);

SecretUtils.createCustomSecret(clusterCustomCertServer1, testStorage.getClusterName(), testStorage.getNamespaceName(), strimziCertAndKey1);

resourceManager.createResource(extensionContext, KafkaTemplates.kafkaEphemeral(testStorage.getClusterName(), 3, 3)
.editSpec()
Expand Down Expand Up @@ -765,8 +773,15 @@ void testCustomChainCertificatesForNodePort(ExtensionContext extensionContext) {
final String clusterCustomCertChain1 = testStorage.getClusterName() + "-" + customCertChain1;
final String clusterCustomRootCA1 = testStorage.getClusterName() + "-" + customRootCA1;

SecretUtils.createCustomSecret(clusterCustomCertChain1, testStorage.getClusterName(), testStorage.getNamespaceName(), CHAIN_CERT_AND_KEY_1);
SecretUtils.createCustomSecret(clusterCustomRootCA1, testStorage.getClusterName(), testStorage.getNamespaceName(), ROOT_CA_CERT_AND_KEY_1);
final SystemTestCertAndKey root1 = generateRootCaCertAndKey();
final SystemTestCertAndKey intermediate1 = generateIntermediateCaCertAndKey(root1);
final SystemTestCertAndKey strimzi1 = generateEndEntityCertAndKey(intermediate1, this.retrieveKafkaBrokerSANs(testStorage));

final CertAndKeyFiles rootCertAndKey1 = exportToPemFiles(root1);
final CertAndKeyFiles chainCertAndKey1 = exportToPemFiles(strimzi1, intermediate1, root1);

SecretUtils.createCustomSecret(clusterCustomCertChain1, testStorage.getClusterName(), testStorage.getNamespaceName(), chainCertAndKey1);
SecretUtils.createCustomSecret(clusterCustomRootCA1, testStorage.getClusterName(), testStorage.getNamespaceName(), rootCertAndKey1);

resourceManager.createResource(extensionContext, KafkaTemplates.kafkaEphemeral(testStorage.getClusterName(), 1, 1)
.editSpec()
Expand Down Expand Up @@ -1451,8 +1466,20 @@ void testCustomCertNodePortAndTlsRollingUpdate(ExtensionContext extensionContext
final String clusterCustomCertServer1 = testStorage.getClusterName() + "-" + customCertServer1;
final String clusterCustomCertServer2 = testStorage.getClusterName() + "-" + customCertServer2;

SecretUtils.createCustomSecret(clusterCustomCertServer1, testStorage.getClusterName(), testStorage.getNamespaceName(), STRIMZI_CERT_AND_KEY_1);
SecretUtils.createCustomSecret(clusterCustomCertServer2, testStorage.getClusterName(), testStorage.getNamespaceName(), STRIMZI_CERT_AND_KEY_2);
final SystemTestCertAndKey root1 = generateRootCaCertAndKey();
final SystemTestCertAndKey intermediate1 = generateIntermediateCaCertAndKey(root1);
final SystemTestCertAndKey strimzi1 = generateEndEntityCertAndKey(intermediate1, this.retrieveKafkaBrokerSANs(testStorage));

final CertAndKeyFiles strimziCertAndKey1 = exportToPemFiles(strimzi1);

final SystemTestCertAndKey root2 = generateRootCaCertAndKey();
final SystemTestCertAndKey intermediate2 = generateIntermediateCaCertAndKey(root2);
final SystemTestCertAndKey strimzi2 = generateEndEntityCertAndKey(intermediate2, this.retrieveKafkaBrokerSANs(testStorage));

final CertAndKeyFiles strimziCertAndKey2 = exportToPemFiles(strimzi2);

SecretUtils.createCustomSecret(clusterCustomCertServer1, testStorage.getClusterName(), testStorage.getNamespaceName(), strimziCertAndKey1);
SecretUtils.createCustomSecret(clusterCustomCertServer2, testStorage.getClusterName(), testStorage.getNamespaceName(), strimziCertAndKey2);

resourceManager.createResource(extensionContext, KafkaTemplates.kafkaPersistent(testStorage.getClusterName(), 3)
.editSpec()
Expand Down Expand Up @@ -1586,8 +1613,8 @@ void testCustomCertNodePortAndTlsRollingUpdate(ExtensionContext extensionContext
resourceManager.createResource(extensionContext, kafkaClients.consumerTlsStrimzi(testStorage.getClusterName()));
ClientUtils.waitForClientSuccess(testStorage.getConsumerName(), testStorage.getNamespaceName(), testStorage.getMessageCount() * 3);

SecretUtils.createCustomSecret(clusterCustomCertServer1, testStorage.getClusterName(), testStorage.getNamespaceName(), STRIMZI_CERT_AND_KEY_2);
SecretUtils.createCustomSecret(clusterCustomCertServer2, testStorage.getClusterName(), testStorage.getNamespaceName(), STRIMZI_CERT_AND_KEY_1);
SecretUtils.createCustomSecret(clusterCustomCertServer1, testStorage.getClusterName(), testStorage.getNamespaceName(), strimziCertAndKey2);
SecretUtils.createCustomSecret(clusterCustomCertServer2, testStorage.getClusterName(), testStorage.getNamespaceName(), strimziCertAndKey1);

kafkaSnapshot = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), testStorage.getKafkaSelector(), 3, kafkaSnapshot);

Expand Down Expand Up @@ -2242,4 +2269,14 @@ void afterEach(ExtensionContext extensionContext) {
final String namespaceName = StUtils.getNamespaceBasedOnRbac(clusterOperator.getDeploymentNamespace(), extensionContext);
kubeClient(namespaceName).getClient().persistentVolumeClaims().inNamespace(namespaceName).delete();
}

private ASN1Encodable[] retrieveKafkaBrokerSANs(final TestStorage testStorage) {
return new ASN1Encodable[] {
new GeneralName(GeneralName.dNSName, "*.127.0.0.1.nip.io"),
new GeneralName(GeneralName.dNSName, "*." + testStorage.getClusterName() + "-kafka-brokers"),
new GeneralName(GeneralName.dNSName, "*." + testStorage.getClusterName() + "-kafka-brokers." + testStorage.getNamespaceName() + ".svc"),
new GeneralName(GeneralName.dNSName, testStorage.getClusterName() + "-kafka-bootstrap"),
new GeneralName(GeneralName.dNSName, testStorage.getClusterName() + "-kafka-bootstrap." + testStorage.getNamespaceName() + ".svc")
};
}
}