Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: RedPandaContainer does not allow for container to container Kafka communication #6395

Closed
JapuDCret opened this issue Jan 6, 2023 · 5 comments

Comments

@JapuDCret
Copy link
Contributor

JapuDCret commented Jan 6, 2023

Module

Core

Testcontainers version

1.17.6

Using the latest Testcontainers version?

Yes

Host OS

Linux (Ubuntu 22.04.1 LTS)

Host Arch

x86

Docker version

20.10.22

What happened?

Cannot connect a Kafka Connect TestContainer instance to the RedpandaContainer.

It seems that this is due to the unfortunate nature of advertised addresses/listeners in the Kafka architecture.

Quarkus RedPandaKafkaContainer solved this by not having a static advertised addresses/listeners

See https://github.com/quarkusio/quarkus/blob/e47a267e13819cf5ebd4322ad2b2c88b06209a34/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/RedPandaKafkaContainer.java#L86
vs.
https://github.com/testcontainers/testcontainers-java/blob/0bb6925fb7d420/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java#L55

My setup:

class KafkaConnectTest {
    companion object {
        private val log = LoggerFactory.getLogger(KafkaConnectTest::class.java)

        val kafka = RedpandaContainer(DockerImageName.parse("docker.redpanda.com/vectorized/redpanda:v22.3.11")).apply {
            withNetwork(Network.SHARED)
            withNetworkAliases("kafka")
        }

        val kafkaConnect = GenericContainer(DockerImageName.parse("confluentinc/cp-kafka-connect:7.3.0")).apply {
            withNetwork(Network.SHARED)
            dependsOn(kafka)

            withNetworkAliases("connect")

            withEnv("CONNECT_GROUP_ID", "test")
            withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "_connect-configs-test")
            withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "_connect-offsets-test")
            withEnv("CONNECT_STATUS_STORAGE_TOPIC", "_connect-status-test")
            withEnv("CONNECT_LISTENERS", "http://0.0.0.0:8083")
            withEnv("CONNECT_REST_HOST_NAME", "0.0.0.0")
            withEnv("CONNECT_REST_PORT", "8083")
            withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "connect")
            withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
            withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
            withEnv("CONNECT_BOOTSTRAP_SERVERS", "kafka:29092")
            withExposedPorts(8083)
        }

        @JvmStatic
        @BeforeAll
        fun setup() {
            Stream.of( kafka, kafkaConnect ).parallel().forEach { it.start() }

            val logConsumer = Slf4jLogConsumer(log)
            kafkaConnect.followOutput(logConsumer)
        }
    }

    @Test
    fun `verify Kafka message publish`() {
        println("kafka = $kafka")
        println("localStack = $localStack")
        println("kafkaConnect: $kafkaConnect")
        println("bootstrapServers: $bootstrapServers")

        Thread.sleep(20 * 1000)
    }
}

Relevant log output

2023-01-06 19:10:57 INFO  KafkaConnectTest - STDOUT: ===> User
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDOUT: uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDOUT: ===> Configuring ...
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDOUT: ===> Running preflight checks ... 
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDOUT: ===> Check if Kafka is healthy ...
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: Error while getting broker list.
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: 	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: 	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: 	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: 	at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:147)
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: 	at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:149)
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: Expected 1 brokers but found only 0. Trying to query Kafka for metadata again ...
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: Expected 1 brokers but found only 0. Brokers found [].
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDOUT: Using log4j config /etc/cp-base-new/log4j.properties

Additional Information

Indeed, after overriding the command from

command += "--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092)

to

command += "--advertise-kafka-addr PLAINTEXT://kafka:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092)

it works in my desired setup (should be dynamic of course)

@JapuDCret
Copy link
Contributor Author

JapuDCret commented Jan 10, 2023

quick workaround in Kotlin for local setup
(replaces 127.0.0.1 with kafka as the hostname)

class CustomRedpandaContainer(imageName: DockerImageName) : RedpandaContainer(imageName) {

    override fun containerIsStarting(containerInfo: InspectContainerResponse?, reused: Boolean) {
        super.containerIsStarting(containerInfo)

        val hostname = "kafka"

        var command = "#!/bin/bash\n"
        command += "/usr/bin/rpk redpanda start --mode dev-container "
        command += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 "
        command += "--advertise-kafka-addr PLAINTEXT://$hostname:29092,OUTSIDE://${this.host}:${getMappedPort(9092)}"
        this.copyFileToContainer(Transferable.of(command, 511), "/testcontainers_start.sh")
    }
}

usage:

  val kafka = CustomRedpandaContainer(DockerImageName.parse("docker.redpanda.com/vectorized/redpanda:v22.3.11")).apply {
    withNetwork(Network.SHARED)
    withNetworkAliases("kafka")
  }

@emaxerrno
Copy link

i'd recommend testing the latest 22.3.10 as the redpanda version fyi.

@JapuDCret
Copy link
Contributor Author

i'd recommend testing the latest 22.3.10 as the redpanda version fyi.

The issue is still present with redpanda:v22.3.10 and redpanda:v22.3.11.
This makes sense, as it is an underlying networking issue, that cannot be solved by the container itself.

The testcontainer needs to allow for setting a network alias, that is then used in the startup script (to set --advertise-kafka-add)

@maxant
Copy link

maxant commented Jul 11, 2023

While testcontainers are super for testing in the build pipeline, we want to use the same tests and build while developing locally, and that means we sometimes need to debug tests. With Kafka, that unfortunately often means looking at the data, just like you might with mysql console or similar. So spinning up the redpandaconsole makes sense imho.

workaround:

package rocks.twr.core;

import com.github.dockerjava.api.command.InspectContainerResponse;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.redpanda.RedpandaContainer;
import org.testcontainers.temp.NetworkImpl;
import org.testcontainers.utility.DockerImageName;

import java.awt.*;
import java.io.IOException;
import java.net.URI;

public class AbstractContainerBaseTest {

    static Network network = new NetworkImpl("twr-core"); // <- see below ...  Network.newNetwork();
    static RedpandaContainer redpanda = new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.12") {
                // https://github.com/testcontainers/testcontainers-java/issues/7302
                @Override
                protected void containerIsStarting(InspectContainerResponse containerInfo) {
                    //super.containerIsStarting(containerInfo);
                    String command = "#!/bin/bash\n";
                    command = command + "/usr/bin/rpk redpanda start --mode dev-container --smp 1 --memory 1G ";
                    command = command + "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 ";
                    command = command + "--advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://" + this.getHost() + ":" + this.getMappedPort(9092); // <<< fixed on this line with "redpanda" instead of 127.0.0.1 and would probably work with this.getNetworkAliases().get(0)
                    this.copyFileToContainer(Transferable.of(command, 511), "/testcontainers_start.sh");
                }
            }
            .withNetwork(network)
            .withNetworkAliases("redpanda") // <<< matches the advertised address
            .withReuse(true);
    static GenericContainer redpandaconsole = new GenericContainer(DockerImageName.parse("redpandadata/console:v2.2.4"))
            .dependsOn(redpanda)
            .withNetwork(network)
            .withExposedPorts(8080)
            .withNetworkAliases("redpandaconsole")
            .withEnv("KAFKA_BROKERS", "redpanda:29092") // <<< port comes from script above
            .withEnv("KAFKA_TLS_ENABLED", "false")
            .withEnv("KAFKA_SASL_ENABLED", "false")
            .withEnv("CONNECT_ENABLED", "false")
            .withReuse(true);

    static {
        Startables.deepStart(mysql, redpanda, redpandaconsole).join();
        logAndOpenRedpandaConsoleInBrowser();
    }

    private static void logAndOpenRedpandaConsoleInBrowser() {
        String url = ("http://localhost:" + redpandaconsole.getMappedPort(8080));
        try {
            if (Desktop.isDesktopSupported()) {
                Desktop desktop = Desktop.getDesktop();
                if (desktop.isSupported(Desktop.Action.BROWSE)) {
                    desktop.browse(URI.create(url));
                }
            }
        } catch (IOException | InternalError e) {
            e.printStackTrace();
        }
        System.out.println("Redpanda Console running on " + url);
    }

}

package org.testcontainers.temp;

import com.github.dockerjava.api.command.CreateNetworkCmd;
import com.github.dockerjava.api.exception.ConflictException;
import org.junit.rules.ExternalResource;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.ResourceReaper;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

// https://github.com/testcontainers/testcontainers-java/issues/7261
public class NetworkImpl extends ExternalResource implements Network {

    private final String name;
    private Boolean enableIpv6;
    private String driver;
    private String id;

    private final AtomicBoolean initialized = new AtomicBoolean();

    public NetworkImpl(String name) {
        this.name = name;
    }

    @Override
    public synchronized String getId() {
        if (initialized.compareAndSet(false, true)) {
            boolean success = false;
            try {
                id = create();
                success = true;
            } finally {
                if (!success) {
                    initialized.set(false);
                }
            }
        }
        return id;
    }

    private String create() {
        CreateNetworkCmd createNetworkCmd = DockerClientFactory.instance().client().createNetworkCmd();

        createNetworkCmd.withName(name);
        createNetworkCmd.withCheckDuplicate(true);

        if (enableIpv6 != null) {
            createNetworkCmd.withEnableIpv6(enableIpv6);
        }

        if (driver != null) {
            createNetworkCmd.withDriver(driver);
        }

        Map<String, String> labels = createNetworkCmd.getLabels();
        labels = new HashMap<>(labels != null ? labels : Collections.emptyMap());
        labels.putAll(DockerClientFactory.DEFAULT_LABELS);
        //noinspection deprecation
        labels.putAll(ResourceReaper.instance().getLabels());
        createNetworkCmd.withLabels(labels);

        try {
            return createNetworkCmd.exec().getId();
        } catch (ConflictException e) {
            List<com.github.dockerjava.api.model.Network> networks = DockerClientFactory
                    .instance()
                    .client()
                    .listNetworksCmd()
                    .withNameFilter(name)
                    .exec();
            if(networks.size() >= 1) {
                return networks.get(0).getId();
            }
            throw new IllegalStateException("Unable to create network with name " + name
                    + " due to conflict, but no existing network found with the same name", e);
        }
    }

    @Override
    protected void after() {
        close();
    }

    @Override
    public synchronized void close() {
        if (initialized.getAndSet(false)) {
            ResourceReaper.instance().removeNetworkById(id);
        }
    }
}

@eddumelendez
Copy link
Member

Hi,

There is currently some limitation in order to add new listeners in redpanda and kafka module. However, there is a workaround for this, see custom redpanda implementation in order to add dynamic listeners as you can see here

The same applies to local development. See my setup using spring boot testcontainers integration

https://github.com/eddumelendez/spring-boot-redpanda-testcontainers-reusable-mode/blob/223ba163403a495fde0de04f182d32d5a6372d47/producer/src/test/java/com/example/producer/ProducerApplicationTests.java#L65-L89

this is the redpanda console config

https://github.com/eddumelendez/spring-boot-redpanda-testcontainers-reusable-mode/blob/223ba163403a495fde0de04f182d32d5a6372d47/producer/src/test/resources/redpandaConsole.yml

We are making some improvements to make this possible OOTB. See #7320

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants