diff --git a/docs/modules/redpanda.md b/docs/modules/redpanda.md
index 51178788449..22545ffb026 100644
--- a/docs/modules/redpanda.md
+++ b/docs/modules/redpanda.md
@@ -25,6 +25,44 @@ Redpanda also provides a schema registry implementation. Like the Redpanda broke
[Schema Registry](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:getSchemaRegistryAddress
+It is also possible to enable security capabilities of Redpanda by using:
+
+
+[Enable security](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:security
+
+
+Superusers can be created by using:
+
+
+[Register Superuser](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createSuperUser
+
+
+Below is an example of how to create the `AdminClient`:
+
+
+[Create Admin Client](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createAdminClient
+
+
+There are scenarios where additional listeners are needed because the consumer/producer can be another
+container in the same network or a different process where the port to connect differs from the default
+exposed port `9092`. E.g [Toxiproxy](../../docs/modules/toxiproxy.md).
+
+
+[Register additional listener](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:registerListener
+
+
+Container defined in the same network:
+
+
+[Create kcat container](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createKCatContainer
+
+
+Client using the new registered listener:
+
+
+[Produce/Consume via new listener](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:produceConsumeMessage
+
+
## Adding this module to your project dependencies
Add the following dependency to your `pom.xml`/`build.gradle` file:
diff --git a/modules/redpanda/build.gradle b/modules/redpanda/build.gradle
index 3fbf9b7a8ab..0aa32791a9f 100644
--- a/modules/redpanda/build.gradle
+++ b/modules/redpanda/build.gradle
@@ -2,6 +2,7 @@ description = "Testcontainers :: Redpanda"
dependencies {
api project(':testcontainers')
+ shaded 'org.freemarker:freemarker:2.3.32'
testImplementation 'org.apache.kafka:kafka-clients:3.5.1'
testImplementation 'org.assertj:assertj-core:3.24.2'
diff --git a/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java b/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java
index 3843c76d6d0..b4f5b94798b 100644
--- a/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java
+++ b/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java
@@ -1,11 +1,31 @@
package org.testcontainers.redpanda;
import com.github.dockerjava.api.command.InspectContainerResponse;
+import freemarker.template.Configuration;
+import freemarker.template.Template;
+import lombok.AllArgsConstructor;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.SneakyThrows;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.ComparableVersion;
import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
/**
* Testcontainers implementation for Redpanda.
@@ -14,6 +34,7 @@
*
* - Broker: 9092
* - Schema Registry: 8081
+ * - Proxy: 8082
*
*/
public class RedpandaContainer extends GenericContainer {
@@ -30,9 +51,21 @@ public class RedpandaContainer extends GenericContainer {
private static final int REDPANDA_PORT = 9092;
+ private static final int REDPANDA_ADMIN_PORT = 9644;
+
private static final int SCHEMA_REGISTRY_PORT = 8081;
- private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
+ private static final int REST_PROXY_PORT = 8082;
+
+ private boolean enableAuthorization;
+
+ private String authenticationMethod = "none";
+
+ private String schemaRegistryAuthenticationMethod = "none";
+
+ private final List superusers = new ArrayList<>();
+
+ private final Set> listenersValueSupplier = new HashSet<>();
public RedpandaContainer(String image) {
this(DockerImageName.parse(image));
@@ -47,33 +80,198 @@ public RedpandaContainer(DockerImageName imageName) {
throw new IllegalArgumentException("Redpanda version must be >= v22.2.1");
}
- withExposedPorts(REDPANDA_PORT, SCHEMA_REGISTRY_PORT);
+ withExposedPorts(REDPANDA_PORT, REDPANDA_ADMIN_PORT, SCHEMA_REGISTRY_PORT, REST_PROXY_PORT);
withCreateContainerCmdModifier(cmd -> {
- cmd.withEntrypoint("sh");
+ cmd.withEntrypoint();
+ cmd.withUser("root:root");
});
- waitingFor(Wait.forLogMessage(".*Started Kafka API server.*", 1));
- withCommand("-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
+ waitingFor(Wait.forLogMessage(".*Successfully started Redpanda!.*", 1));
+ withCopyFileToContainer(
+ MountableFile.forClasspathResource("testcontainers/entrypoint-tc.sh", 0700),
+ "/entrypoint-tc.sh"
+ );
+ withCommand("/entrypoint-tc.sh", "redpanda", "start", "--mode=dev-container", "--smp=1", "--memory=1G");
+ }
+
+ @Override
+ protected void configure() {
+ this.listenersValueSupplier.stream()
+ .map(Supplier::get)
+ .map(Listener::getAddress)
+ .forEach(this::withNetworkAliases);
}
+ @SneakyThrows
@Override
protected void containerIsStarting(InspectContainerResponse containerInfo) {
super.containerIsStarting(containerInfo);
- String command = "#!/bin/bash\n";
-
- command += "/usr/bin/rpk redpanda start --mode dev-container --smp 1 --memory 1G ";
- command += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 ";
- command +=
- "--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092);
+ Configuration cfg = new Configuration(Configuration.DEFAULT_INCOMPATIBLE_IMPROVEMENTS);
+ cfg.setClassLoaderForTemplateLoading(getClass().getClassLoader(), "testcontainers");
+ cfg.setDefaultEncoding("UTF-8");
- copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
+ copyFileToContainer(getBootstrapFile(cfg), "/etc/redpanda/.bootstrap.yaml");
+ copyFileToContainer(getRedpandaFile(cfg), "/etc/redpanda/redpanda.yaml");
}
+ /**
+ * Returns the bootstrap servers address.
+ * @return the bootstrap servers address
+ */
public String getBootstrapServers() {
return String.format("PLAINTEXT://%s:%s", getHost(), getMappedPort(REDPANDA_PORT));
}
+ /**
+ * Returns the schema registry address.
+ * @return the schema registry address
+ */
public String getSchemaRegistryAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(SCHEMA_REGISTRY_PORT));
}
+
+ /**
+ * Returns the admin address.
+ * @return the admin address
+ */
+ public String getAdminAddress() {
+ return String.format("http://%s:%s", getHost(), getMappedPort(REDPANDA_ADMIN_PORT));
+ }
+
+ /**
+ * Returns the rest proxy address.
+ * @return the rest proxy address
+ */
+ public String getRestProxyAddress() {
+ return String.format("http://%s:%s", getHost(), getMappedPort(REST_PROXY_PORT));
+ }
+
+ /**
+ * Enables authorization.
+ * @return this {@link RedpandaContainer} instance
+ */
+ public RedpandaContainer enableAuthorization() {
+ this.enableAuthorization = true;
+ return this;
+ }
+
+ /**
+ * Enables SASL.
+ * @return this {@link RedpandaContainer} instance
+ */
+ public RedpandaContainer enableSasl() {
+ this.authenticationMethod = "sasl";
+ return this;
+ }
+
+ /**
+ * Enables Http Basic Auth for Schema Registry.
+ * @return this {@link RedpandaContainer} instance
+ */
+ public RedpandaContainer enableSchemaRegistryHttpBasicAuth() {
+ this.schemaRegistryAuthenticationMethod = "http_basic";
+ return this;
+ }
+
+ /**
+ * Register username as a superuser.
+ * @param username username to register as a superuser
+ * @return this {@link RedpandaContainer} instance
+ */
+ public RedpandaContainer withSuperuser(String username) {
+ this.superusers.add(username);
+ return this;
+ }
+
+ /**
+ * Add a {@link Supplier} that will provide a listener with format {@code host:port}.
+ * Host will be added as a network alias.
+ *
+ * The listener will be added to the default listeners.
+ *
+ * Default listeners:
+ *
+ * - 0.0.0.0:9092
+ * - 0.0.0.0:9093
+ *
+ *
+ * Default advertised listeners:
+ *
+ * - {@code container.getHost():container.getMappedPort(9092)}
+ * - 127.0.0.1:9093
+ *
+ * @param listenerSupplier a supplier that will provide a listener
+ * @return this {@link RedpandaContainer} instance
+ */
+ public RedpandaContainer withListener(Supplier listenerSupplier) {
+ String[] parts = listenerSupplier.get().split(":");
+ this.listenersValueSupplier.add(() -> new Listener(parts[0], Integer.parseInt(parts[1])));
+ return this;
+ }
+
+ private Transferable getBootstrapFile(Configuration cfg) {
+ Map kafkaApi = new HashMap<>();
+ kafkaApi.put("enableAuthorization", this.enableAuthorization);
+ kafkaApi.put("superusers", this.superusers);
+
+ Map root = new HashMap<>();
+ root.put("kafkaApi", kafkaApi);
+
+ String file = resolveTemplate(cfg, "bootstrap.yaml.ftl", root);
+
+ return Transferable.of(file, 0700);
+ }
+
+ private Transferable getRedpandaFile(Configuration cfg) {
+ List