diff --git a/Directory.Packages.props b/Directory.Packages.props
index 1c06818d8..1ec3f79d8 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -36,8 +36,8 @@
-
-
+
+
diff --git a/src/Testcontainers.Kafka/KafkaBuilder.cs b/src/Testcontainers.Kafka/KafkaBuilder.cs
index e489469d5..efa81c439 100644
--- a/src/Testcontainers.Kafka/KafkaBuilder.cs
+++ b/src/Testcontainers.Kafka/KafkaBuilder.cs
@@ -72,20 +72,21 @@ public KafkaBuilder WithListener(string kafka)
var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT";
var listeners = new[] { listener };
+ var listenersSecurityProtocolMap = new[] { listenerSecurityProtocolMap };
var host = kafka.Split(':')[0];
- var currentListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"]
- .Split([','], StringSplitOptions.RemoveEmptyEntries)
- .Concat([listener]);
+ var updatedListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"]
+ .Split(',')
+ .Concat(listeners);
- var currentListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"]
- .Split([','], StringSplitOptions.RemoveEmptyEntries)
- .Concat([listenerSecurityProtocolMap]);
+ var updatedListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"]
+ .Split(',')
+ .Concat(listenersSecurityProtocolMap);
return Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners, listeners))
- .WithEnvironment("KAFKA_LISTENERS", string.Join(",", currentListeners))
- .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", currentListenersSecurityProtocolMap))
+ .WithEnvironment("KAFKA_LISTENERS", string.Join(",", updatedListeners))
+ .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", updatedListenersSecurityProtocolMap))
.WithNetworkAliases(host);
}
diff --git a/tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs b/tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs
index 8ddc2ccaf..d5d9ba26d 100644
--- a/tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs
+++ b/tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs
@@ -1,63 +1,68 @@
-using System.Collections.Generic;
-using System.Text;
-using DotNet.Testcontainers.Builders;
-using DotNet.Testcontainers.Containers;
-using DotNet.Testcontainers.Networks;
-
namespace Testcontainers.Kafka;
public sealed class KafkaContainerNetworkTest : IAsyncLifetime
{
- private INetwork _network;
- private KafkaContainer _kafkaContainer;
+ private const string Message = "Message produced by kafkacat";
- private IContainer _kCatContainer;
- public async Task InitializeAsync()
+ private const string Listener = "kafka:19092";
+
+ private const string DataFilePath = "/data/msgs.txt";
+
+ private readonly INetwork _network;
+
+ private readonly IContainer _kafkaContainer;
+
+ private readonly IContainer _kCatContainer;
+
+ public KafkaContainerNetworkTest()
{
- _network = new NetworkBuilder().Build();
+ _network = new NetworkBuilder()
+ .Build();
+
_kafkaContainer = new KafkaBuilder()
- .WithImage("confluentinc/cp-kafka")
+ .WithImage("confluentinc/cp-kafka:6.1.9")
.WithNetwork(_network)
- .WithListener("kafka:19092")
+ .WithListener(Listener)
.Build();
_kCatContainer = new ContainerBuilder()
- .WithImage("confluentinc/cp-kcat")
+ .WithImage("confluentinc/cp-kafkacat:6.1.9")
.WithNetwork(_network)
- .WithCommand("-c", "tail -f /dev/null")
- .WithEntrypoint("sh")
- .WithResourceMapping(Encoding.Default.GetBytes("Message produced by kcat"), "/data/msgs.txt")
+ .WithEntrypoint(CommonCommands.SleepInfinity)
+ .WithResourceMapping(Encoding.Default.GetBytes(Message), DataFilePath)
.Build();
-
- await _kCatContainer.StartAsync();
- await _kafkaContainer.StartAsync();
}
- public Task DisposeAsync()
+ public async Task InitializeAsync()
{
- return Task.WhenAll(
- _kafkaContainer.DisposeAsync().AsTask(),
- _kCatContainer.DisposeAsync().AsTask()
- );
+ await _kafkaContainer.StartAsync()
+ .ConfigureAwait(false);
+
+ await _kCatContainer.StartAsync()
+ .ConfigureAwait(false);
}
-
+
+ public async Task DisposeAsync()
+ {
+ await _kafkaContainer.StartAsync()
+ .ConfigureAwait(false);
+
+ await _kCatContainer.StartAsync()
+ .ConfigureAwait(false);
+
+ await _network.DisposeAsync()
+ .ConfigureAwait(false);
+ }
+
[Fact]
- public async Task TestUsageWithListener()
+ public async Task ConsumesProducedKafkaMessage()
{
- // kcat producer
- await _kCatContainer.ExecAsync(new List()
- {
- "kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt"
- });
-
-
- // kcat consumer
- var kCatResult = await _kCatContainer.ExecAsync(new List()
- {
- "kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1"
- });
-
- Assert.Contains("Message produced by kcat", kCatResult.Stdout);
+ _ = await _kCatContainer.ExecAsync(new[] { "kafkacat", "-b", Listener, "-t", "msgs", "-P", "-l", DataFilePath })
+ .ConfigureAwait(true);
+
+ var execResult = await _kCatContainer.ExecAsync(new[] { "kafkacat", "-b", Listener, "-C", "-t", "msgs", "-c", "1" })
+ .ConfigureAwait(true);
+
+ Assert.Equal(Message, execResult.Stdout.Trim());
}
-
}
\ No newline at end of file
diff --git a/tests/Testcontainers.Kafka.Tests/KafkaContainerRegistryTest.cs b/tests/Testcontainers.Kafka.Tests/KafkaContainerRegistryTest.cs
new file mode 100644
index 000000000..9220c202b
--- /dev/null
+++ b/tests/Testcontainers.Kafka.Tests/KafkaContainerRegistryTest.cs
@@ -0,0 +1,129 @@
+namespace Testcontainers.Kafka;
+
+public sealed class KafkaContainerRegistryTest : IAsyncLifetime
+{
+ private const string Schema = @"
+ {
+ ""$schema"": ""http://json-schema.org/draft-04/schema#"",
+ ""title"": ""User"",
+ ""type"": ""object"",
+ ""additionalProperties"": false,
+ ""properties"": {
+ ""FirstName"": {
+ ""type"": [""null"", ""string""]
+ },
+ ""LastName"": {
+ ""type"": [""null"", ""string""]
+ }
+ }
+ }";
+
+ private const ushort RestPort = 8085;
+
+ private const string SchemaRegistryNetworkAlias = "schema-registry";
+
+ private const string Listener = "kafka:19092";
+
+ private readonly INetwork _network;
+
+ private readonly KafkaContainer _kafkaContainer;
+
+ private readonly IContainer _schemaRegistryContainer;
+
+ public KafkaContainerRegistryTest()
+ {
+ _network = new NetworkBuilder()
+ .Build();
+
+ _kafkaContainer = new KafkaBuilder()
+ .WithImage("confluentinc/cp-kafka:6.1.9")
+ .WithNetwork(_network)
+ .WithListener(Listener)
+ .Build();
+
+ _schemaRegistryContainer = new ContainerBuilder()
+ .WithImage("confluentinc/cp-schema-registry:6.1.9")
+ .WithPortBinding(RestPort, true)
+ .WithNetwork(_network)
+ .WithNetworkAliases(SchemaRegistryNetworkAlias)
+ .WithEnvironment("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + RestPort)
+ .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT")
+ .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + Listener)
+ .WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", SchemaRegistryNetworkAlias)
+ .WithWaitStrategy(Wait.ForUnixContainer().UntilHttpRequestIsSucceeded(request =>
+ request.ForPort(RestPort).ForPath("/subjects")))
+ .Build();
+ }
+
+ public async Task InitializeAsync()
+ {
+ await _kafkaContainer.StartAsync()
+ .ConfigureAwait(false);
+
+ await _schemaRegistryContainer.StartAsync()
+ .ConfigureAwait(false);
+ }
+
+ public async Task DisposeAsync()
+ {
+ await _kafkaContainer.StartAsync()
+ .ConfigureAwait(false);
+
+ await _schemaRegistryContainer.StartAsync()
+ .ConfigureAwait(false);
+
+ await _network.DisposeAsync()
+ .ConfigureAwait(false);
+ }
+
+ [Fact]
+ public async Task ConsumerReturnsProducerMessage()
+ {
+ // Given
+ const string topic = "user";
+
+ var subject = SubjectNameStrategy.Topic.ConstructValueSubjectName(topic);
+
+ var bootstrapServer = _kafkaContainer.GetBootstrapAddress();
+
+ var producerConfig = new ProducerConfig();
+ producerConfig.BootstrapServers = bootstrapServer;
+
+ var consumerConfig = new ConsumerConfig();
+ consumerConfig.BootstrapServers = bootstrapServer;
+ consumerConfig.GroupId = "sample-consumer";
+ consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
+
+ var message = new Message();
+ message.Value = new User("John", "Doe");
+
+ var schemaRegistryConfig = new SchemaRegistryConfig();
+ schemaRegistryConfig.Url = new UriBuilder(Uri.UriSchemeHttp, _schemaRegistryContainer.Hostname, _schemaRegistryContainer.GetMappedPublicPort(RestPort)).ToString();
+
+ // When
+ using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
+ _ = await schemaRegistry.RegisterSchemaAsync(subject, new Schema(Schema, SchemaType.Json))
+ .ConfigureAwait(true);
+
+ using var producer = new ProducerBuilder(producerConfig)
+ .SetValueSerializer(new JsonSerializer(schemaRegistry))
+ .Build();
+
+ _ = await producer.ProduceAsync(topic, message)
+ .ConfigureAwait(true);
+
+ using var consumer = new ConsumerBuilder(consumerConfig)
+ .SetValueDeserializer(new JsonDeserializer().AsSyncOverAsync())
+ .Build();
+
+ consumer.Subscribe(topic);
+
+ var result = consumer.Consume(TimeSpan.FromSeconds(15));
+
+ // Then
+ Assert.NotNull(result);
+ Assert.Equal(message.Value, result.Message.Value);
+ }
+
+ private record User(string FirstName, string LastName);
+}
\ No newline at end of file
diff --git a/tests/Testcontainers.Kafka.Tests/KafkaContainerWithRegistryTest.cs b/tests/Testcontainers.Kafka.Tests/KafkaContainerWithRegistryTest.cs
deleted file mode 100644
index 3ad1a11d5..000000000
--- a/tests/Testcontainers.Kafka.Tests/KafkaContainerWithRegistryTest.cs
+++ /dev/null
@@ -1,132 +0,0 @@
-using System.Diagnostics;
-using System.Threading;
-using Confluent.SchemaRegistry;
-using Confluent.SchemaRegistry.Serdes;
-using DotNet.Testcontainers.Builders;
-using DotNet.Testcontainers.Containers;
-using DotNet.Testcontainers.Networks;
-
-namespace Testcontainers.Kafka;
-
-public class KafkaContainerWithRegistryTest : IAsyncLifetime
-{
- private INetwork _network;
- private KafkaContainer _kafkaContainer;
- private IContainer _kafkaSchemaRegistry;
-
- const string schema = @"
- {
- ""$schema"": ""http://json-schema.org/draft-07/schema#"",
- ""$id"": ""http://example.com/product.schema.json"",
- ""title"": ""User"",
- ""description"": ""A User"",
- ""type"": ""object"",
- ""properties"": {
- ""age"": {
- ""description"": ""The age of the user"",
- ""type"": ""integer""
- },
- ""lastname"": {
- ""description"": ""Last name of the user"",
- ""type"": ""string""
- },
- ""firstname"": {
- ""description"": ""First name of the user"",
- ""type"": ""string""
- }
- },
- ""required"": [""firstname"", ""lastname""]
- }";
-
- public async Task InitializeAsync()
- {
- _network = new NetworkBuilder().Build();
- _kafkaContainer = new KafkaBuilder()
- .WithImage("confluentinc/cp-kafka")
- .WithNetwork(_network)
- .WithListener("kafka:19092")
- .Build();
-
- _kafkaSchemaRegistry = new ContainerBuilder()
- .WithImage("confluentinc/cp-schema-registry:7.8.0")
- .DependsOn(_kafkaContainer)
- .WithPortBinding(8085, true)
- .WithNetworkAliases("schemaregistry")
- .WithNetwork(_network)
- .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:19092")
- .WithEnvironment("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085")
- .WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry")
- .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT")
- .WithWaitStrategy(
- Wait.ForUnixContainer()
- .UntilHttpRequestIsSucceeded(request => request.ForPath("/subjects")
- .ForPort(8085))
-
- )
- .Build();
-
- await _kafkaContainer.StartAsync();
- await _kafkaSchemaRegistry.StartAsync();
- }
-
- public async Task DisposeAsync()
- {
- await Task.WhenAll(
- _kafkaContainer.DisposeAsync().AsTask(),
- _kafkaSchemaRegistry.DisposeAsync().AsTask()
- );
- }
-
- ///
- /// Test the usage of the Kafka container with the schema registry.
- ///
- [Fact]
- public async Task TestUsageWithSchemaRegistry()
- {
- const string topicName = "user-topic";
- var subject = SubjectNameStrategy.Topic.ConstructValueSubjectName(topicName, null);
-
- var bootstrapServers = this._kafkaContainer.GetBootstrapAddress()
- .Replace("PLAINTEXT://", "", StringComparison.OrdinalIgnoreCase);
-
- var jsonSerializerConfig = new JsonSerializerConfig
- {
- BufferBytes = 100,
- };
-
- var schemaRegistryUrl = $"http://localhost:{_kafkaSchemaRegistry.GetMappedPublicPort(8085)}";
-
- var schemaRegistryConfig = new SchemaRegistryConfig
- {
- Url = schemaRegistryUrl,
- };
- // Init Kafka producer to send a message
- var producerConfig = new ProducerConfig
- {
- BootstrapServers = bootstrapServers,
- ClientId = $"test-client-{DateTime.Now.Ticks}",
- };
- using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
-
- var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, new Schema(schema, SchemaType.Json));
-
- using var producer = new ProducerBuilder(producerConfig)
- .SetValueSerializer(new JsonSerializer(schemaRegistry, jsonSerializerConfig))
- .Build();
-
- await Assert.ThrowsAsync(async () =>
- {
- try
- {
- var user = new User { Name = "value", Age = 30 };
- await producer.ProduceAsync(topicName, new Message { Value = user });
- }
- catch (Exception e)
- {
- Assert.True(e is ProduceException);
- Debug.Assert(e.InnerException != null, "e.InnerException != null");
- throw e.InnerException;
- }
- });
- }
-}
diff --git a/tests/Testcontainers.Kafka.Tests/User.cs b/tests/Testcontainers.Kafka.Tests/User.cs
deleted file mode 100644
index 30759b7dc..000000000
--- a/tests/Testcontainers.Kafka.Tests/User.cs
+++ /dev/null
@@ -1,7 +0,0 @@
-namespace Testcontainers.Kafka;
-
-public class User
-{
- public string Name { get; set; }
- public int Age { get; set; }
-}
\ No newline at end of file
diff --git a/tests/Testcontainers.Kafka.Tests/Usings.cs b/tests/Testcontainers.Kafka.Tests/Usings.cs
index a7fcce194..3e69bfb4f 100644
--- a/tests/Testcontainers.Kafka.Tests/Usings.cs
+++ b/tests/Testcontainers.Kafka.Tests/Usings.cs
@@ -1,5 +1,15 @@
global using System;
+global using System.Collections.Generic;
+global using System.Diagnostics;
+global using System.Text;
+global using System.Threading;
global using System.Threading.Tasks;
global using Confluent.Kafka;
+global using Confluent.Kafka.SyncOverAsync;
+global using Confluent.SchemaRegistry;
+global using Confluent.SchemaRegistry.Serdes;
+global using DotNet.Testcontainers.Builders;
global using DotNet.Testcontainers.Commons;
+global using DotNet.Testcontainers.Containers;
+global using DotNet.Testcontainers.Networks;
global using Xunit;
\ No newline at end of file