Skip to content

Commit

Permalink
Add Kafka 3.4.1 and 3.5.0 + fix tests (#57)
Browse files Browse the repository at this point in the history
Signed-off-by: see-quick <[email protected]>
  • Loading branch information
see-quick authored Jun 16, 2023
1 parent e14479b commit deb9305
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/main/resources/kafka_versions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"3.1.2": "quay.io/strimzi-test-container/test-container:latest-kafka-3.1.2",
"3.2.3": "quay.io/strimzi-test-container/test-container:latest-kafka-3.2.3",
"3.3.2": "quay.io/strimzi-test-container/test-container:latest-kafka-3.3.2",
"3.4.0": "quay.io/strimzi-test-container/test-container:latest-kafka-3.4.0"
"3.4.1": "quay.io/strimzi-test-container/test-container:latest-kafka-3.4.1",
"3.5.0": "quay.io/strimzi-test-container/test-container:latest-kafka-3.5.0"
}
}
6 changes: 5 additions & 1 deletion src/test/java/io/strimzi/test/container/AbstractIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected static Stream<Arguments> retrieveKafkaVersionsFile() throws IOExceptio

for (Iterator<Map.Entry<String, JsonNode>> iter = rootNode.get("kafkaVersions").fields(); iter.hasNext(); ) {
final Map.Entry<String, JsonNode> fields = iter.next();
parameters.add(Arguments.of(fields.getValue().asText()));
parameters.add(Arguments.of(fields.getValue().asText(), fields.getKey()));
}
return parameters.stream();
}
Expand All @@ -46,4 +46,8 @@ protected void assumeDocker() {
protected void supportsKraftMode(final String imageName) {
Assumptions.assumeTrue(!imageName.contains("-kafka-2."));
}

protected boolean isLessThanKafka350(final String kafkaVersion) {
return KafkaVersionService.KafkaVersion.compareVersions(kafkaVersion, "3.5.0") == -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class StrimziKafkaKraftContainerIT extends AbstractIT {

@ParameterizedTest(name = "testStartContainerWithEmptyConfiguration-{0}")
@MethodSource("retrieveKafkaVersionsFile")
void testStartContainerWithEmptyConfiguration(final String imageName) throws ExecutionException, InterruptedException, TimeoutException {
void testStartContainerWithEmptyConfiguration(final String imageName, final String kafkaVersion) throws ExecutionException, InterruptedException, TimeoutException {
assumeDocker();
supportsKraftMode(imageName);

Expand All @@ -57,7 +57,12 @@ void testStartContainerWithEmptyConfiguration(final String imageName) throws Exe
assertThat(systemUnderTest.getClusterId(), notNullValue());

String logsFromKafka = systemUnderTest.getLogs();
assertThat(logsFromKafka, containsString("RaftManager nodeId=1"));
if (isLessThanKafka350(kafkaVersion)) {
assertThat(logsFromKafka, containsString("RaftManager nodeId=1"));
} else {
assertThat(logsFromKafka, containsString("ControllerServer id=1"));
assertThat(logsFromKafka, containsString("SocketServer listenerType=CONTROLLER, nodeId=1"));
}

verify();

Expand All @@ -70,10 +75,9 @@ void testStartContainerWithEmptyConfiguration(final String imageName) throws Exe

@ParameterizedTest(name = "testStartContainerWithSomeConfiguration-{0}")
@MethodSource("retrieveKafkaVersionsFile")
void testStartContainerWithSomeConfiguration(final String imageName) throws ExecutionException, InterruptedException, TimeoutException {
void testStartContainerWithSomeConfiguration(final String imageName, final String kafkaVersion) throws ExecutionException, InterruptedException, TimeoutException {
assumeDocker();
supportsKraftMode(imageName);

try {
Map<String, String> kafkaConfiguration = new HashMap<>();

Expand All @@ -92,7 +96,12 @@ void testStartContainerWithSomeConfiguration(final String imageName) throws Exec

String logsFromKafka = systemUnderTest.getLogs();

assertThat(logsFromKafka, containsString("RaftManager nodeId=1"));
if (isLessThanKafka350(kafkaVersion)) {
assertThat(logsFromKafka, containsString("RaftManager nodeId=1"));
} else {
assertThat(logsFromKafka, containsString("ControllerServer id=1"));
assertThat(logsFromKafka, containsString("SocketServer listenerType=CONTROLLER, nodeId=1"));
}
assertThat(logsFromKafka, containsString("log.cleaner.enable = false"));
assertThat(logsFromKafka, containsString("log.cleaner.backoff.ms = 1000"));
assertThat(logsFromKafka, containsString("ssl.enabled.protocols = [TLSv1]"));
Expand Down

0 comments on commit deb9305

Please sign in to comment.