Skip to content

Commit

Permalink
Allow KafkaContainer to register additional listeners (#7333)
Browse files Browse the repository at this point in the history
Allow to register additional listeners. E.g. when using along with
Toxiproxy, Schema Registry, Kafka Connect, KCat.

Listeners's host will register as a network aliases.

---------

Co-authored-by: Kevin Wittek <[email protected]>
  • Loading branch information
eddumelendez and kiview authored Aug 17, 2023
1 parent ea0b163 commit d48bab7
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 24 deletions.
23 changes: 16 additions & 7 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,27 @@ KRaft mode was declared production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x)"

See the [versions interoperability matrix](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) for more details.

## Multi-container usage
## Register listeners

If your test needs to run some other Docker container which needs access to Kafka, do the following:
There are scenarios where additional listeners are needed because the consumer/producer can be in another
container in the same network or a different process where the port to connect differs from the default
exposed port `9093`. E.g [Toxiproxy](../../docs/modules/toxiproxy.md).

* Run your other container on the same network as Kafka container, e.g.:
<!--codeinclude-->
[Network](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withKafkaNetwork
[Register additional listener](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:registerListener
<!--/codeinclude-->
* Use `kafka.getNetworkAliases().get(0)+":9092"` as bootstrap server location.
Or just give your Kafka container a network alias of your liking.

You will need to explicitly create a network and set it on the Kafka container as well as on your other containers that need to communicate with Kafka.
Container defined in the same network:

<!--codeinclude-->
[Create kcat container](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:createKCatContainer
<!--/codeinclude-->

Client using the new registered listener:

<!--codeinclude-->
[Produce/Consume via new listener](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:produceConsumeMessage
<!--/codeinclude-->

## Adding this module to your project dependencies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
import org.testcontainers.utility.ComparableVersion;
import org.testcontainers.utility.DockerImageName;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;

/**
* Testcontainers implementation for Apache Kafka.
Expand Down Expand Up @@ -43,6 +48,10 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {

private String clusterId = DEFAULT_CLUSTER_ID;

private static final String PROTOCOL_PREFIX = "TC";

private final Set<Supplier<String>> listeners = new HashSet<>();

/**
* @deprecated use {@link #KafkaContainer(DockerImageName)} instead
*/
Expand All @@ -63,10 +72,6 @@ public KafkaContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);

// Use two listeners with different names, it will force Kafka to communicate with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092");
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");

withEnv("KAFKA_BROKER_ID", "1");
Expand Down Expand Up @@ -140,6 +145,37 @@ public String getBootstrapServers() {

@Override
protected void configure() {
// Use two listeners with different names, it will force Kafka to communicate with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
Set<String> listeners = new HashSet<>();
listeners.add("PLAINTEXT://0.0.0.0:" + KAFKA_PORT);
listeners.add("BROKER://0.0.0.0:9092");

Set<String> listenerSecurityProtocolMap = new HashSet<>();
listenerSecurityProtocolMap.add("BROKER:PLAINTEXT");
listenerSecurityProtocolMap.add("PLAINTEXT:PLAINTEXT");

List<Supplier<String>> listenersToTransform = new ArrayList<>(this.listeners);
for (int i = 0; i < listenersToTransform.size(); i++) {
Supplier<String> listenerSupplier = listenersToTransform.get(i);
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
String listener = listenerSupplier.get();
String listenerPort = listener.split(":")[1];
String listenerProtocol = String.format("%s://0.0.0.0:%s", protocol, listenerPort);
String protocolMap = String.format("%s:PLAINTEXT", protocol);
listeners.add(listenerProtocol);
listenerSecurityProtocolMap.add(protocolMap);

String host = listener.split(":")[0];
withNetworkAliases(host);
}

String kafkaListeners = String.join(",", listeners);
String kafkaListenerSecurityProtocolMap = String.join(",", listenerSecurityProtocolMap);

withEnv("KAFKA_LISTENERS", kafkaListeners);
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", kafkaListenerSecurityProtocolMap);

if (this.kraftEnabled) {
waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
configureKraft();
Expand Down Expand Up @@ -187,14 +223,24 @@ protected void configureZookeeper() {
protected void containerIsStarting(InspectContainerResponse containerInfo) {
super.containerIsStarting(containerInfo);

List<String> advertisedListeners = new ArrayList<>();
advertisedListeners.add(getBootstrapServers());
advertisedListeners.add(brokerAdvertisedListener(containerInfo));

List<Supplier<String>> listenersToTransform = new ArrayList<>(this.listeners);
for (int i = 0; i < listenersToTransform.size(); i++) {
Supplier<String> listenerSupplier = listenersToTransform.get(i);
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
String listener = listenerSupplier.get();
String listenerProtocol = String.format("%s://%s", protocol, listener);
advertisedListeners.add(listenerProtocol);
}

String kafkaAdvertisedListeners = String.join(",", advertisedListeners);

String command = "#!/bin/bash\n";
// exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
command +=
String.format(
"export KAFKA_ADVERTISED_LISTENERS=%s,%s\n",
getBootstrapServers(),
brokerAdvertisedListener(containerInfo)
);
command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners);

if (this.kraftEnabled && isLessThanCP740()) {
// Optimization: skip the checks
Expand Down Expand Up @@ -230,6 +276,31 @@ protected String commandZookeeper() {
return command;
}

/**
* Add a {@link Supplier} that will provide a listener with format {@code host:port}.
* Host will be added as a network alias.
* <p>
* The listener will be added to the list of default listeners.
* <p>
* Default listeners:
* <ul>
* <li>0.0.0.0:9092</li>
* <li>0.0.0.0:9093</li>
* </ul>
* <p>
* Default advertised listeners:
* <ul>
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
* <li>{@code container.getConfig().getHostName():9092}</li>
* </ul>
* @param listenerSupplier a supplier that will provide a listener
* @return this {@link KafkaContainer} instance
*/
public KafkaContainer withListener(Supplier<String> listenerSupplier) {
this.listeners.add(listenerSupplier);
return this;
}

protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.Testcontainers;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
Expand Down Expand Up @@ -83,16 +84,9 @@ public void testExternalZookeeperWithExternalNetwork() throws Exception {
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
// withKafkaNetwork {
GenericContainer<?> application = new GenericContainer<>(DockerImageName.parse("alpine"))
.withNetwork(network)
// }
.withNetworkAliases("dummy")
.withCommand("sleep 10000")
) {
zookeeper.start();
kafka.start();
application.start();

testKafkaFunctionality(kafka.getBootstrapServers());
}
Expand Down Expand Up @@ -195,6 +189,37 @@ public void testKraftPrecedenceOverEmbeddedZookeeper() throws Exception {
}
}

@Test
public void testUsageWithListener() throws Exception {
try (
Network network = Network.newNetwork();
// registerListener {
KafkaContainer kafka = new KafkaContainer(KAFKA_KRAFT_TEST_IMAGE)
.withListener(() -> "kafka:19092")
.withNetwork(network);
// }
// createKCatContainer {
GenericContainer<?> kcat = new GenericContainer<>("confluentinc/cp-kcat:7.4.1")
.withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
})
.withCopyToContainer(Transferable.of("Message produced by kcat"), "/data/msgs.txt")
.withNetwork(network)
.withCommand("-c", "tail -f /dev/null")
// }
) {
kafka.start();
kcat.start();
// produceConsumeMessage {
kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
String stdout = kcat
.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")
.getStdout();
// }
assertThat(stdout).contains("Message produced by kcat");
}
}

protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, 1, 1);
}
Expand Down

0 comments on commit d48bab7

Please sign in to comment.