From 840e3a96cff3de1e1fe499dda3d52c8347b02b26 Mon Sep 17 00:00:00 2001 From: Andre Hofmeister <9199345+HofmeisterAn@users.noreply.github.com> Date: Mon, 13 Jan 2025 20:40:08 +0100 Subject: [PATCH] refactor: Align Kafka tests --- Directory.Packages.props | 4 +- src/Testcontainers.Kafka/KafkaBuilder.cs | 17 +-- .../KafkaContainerNetworkTest.cs | 89 ++++++------ .../KafkaContainerRegistryTest.cs | 129 +++++++++++++++++ .../KafkaContainerWithRegistryTest.cs | 132 ------------------ tests/Testcontainers.Kafka.Tests/User.cs | 7 - tests/Testcontainers.Kafka.Tests/Usings.cs | 10 ++ 7 files changed, 197 insertions(+), 191 deletions(-) create mode 100644 tests/Testcontainers.Kafka.Tests/KafkaContainerRegistryTest.cs delete mode 100644 tests/Testcontainers.Kafka.Tests/KafkaContainerWithRegistryTest.cs delete mode 100644 tests/Testcontainers.Kafka.Tests/User.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index 1c06818d8..1ec3f79d8 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -36,8 +36,8 @@ - - + + diff --git a/src/Testcontainers.Kafka/KafkaBuilder.cs b/src/Testcontainers.Kafka/KafkaBuilder.cs index e489469d5..efa81c439 100644 --- a/src/Testcontainers.Kafka/KafkaBuilder.cs +++ b/src/Testcontainers.Kafka/KafkaBuilder.cs @@ -72,20 +72,21 @@ public KafkaBuilder WithListener(string kafka) var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT"; var listeners = new[] { listener }; + var listenersSecurityProtocolMap = new[] { listenerSecurityProtocolMap }; var host = kafka.Split(':')[0]; - var currentListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"] - .Split([','], StringSplitOptions.RemoveEmptyEntries) - .Concat([listener]); + var updatedListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"] + .Split(',') + .Concat(listeners); - var currentListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] - .Split([','], StringSplitOptions.RemoveEmptyEntries) - .Concat([listenerSecurityProtocolMap]); + var updatedListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] + .Split(',') + .Concat(listenersSecurityProtocolMap); return Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners, listeners)) - .WithEnvironment("KAFKA_LISTENERS", string.Join(",", currentListeners)) - .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", currentListenersSecurityProtocolMap)) + .WithEnvironment("KAFKA_LISTENERS", string.Join(",", updatedListeners)) + .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", updatedListenersSecurityProtocolMap)) .WithNetworkAliases(host); } diff --git a/tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs b/tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs index 8ddc2ccaf..d5d9ba26d 100644 --- a/tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs +++ b/tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs @@ -1,63 +1,68 @@ -using System.Collections.Generic; -using System.Text; -using DotNet.Testcontainers.Builders; -using DotNet.Testcontainers.Containers; -using DotNet.Testcontainers.Networks; - namespace Testcontainers.Kafka; public sealed class KafkaContainerNetworkTest : IAsyncLifetime { - private INetwork _network; - private KafkaContainer _kafkaContainer; + private const string Message = "Message produced by kafkacat"; - private IContainer _kCatContainer; - public async Task InitializeAsync() + private const string Listener = "kafka:19092"; + + private const string DataFilePath = "/data/msgs.txt"; + + private readonly INetwork _network; + + private readonly IContainer _kafkaContainer; + + private readonly IContainer _kCatContainer; + + public KafkaContainerNetworkTest() { - _network = new NetworkBuilder().Build(); + _network = new NetworkBuilder() + .Build(); + _kafkaContainer = new KafkaBuilder() - .WithImage("confluentinc/cp-kafka") + .WithImage("confluentinc/cp-kafka:6.1.9") .WithNetwork(_network) - .WithListener("kafka:19092") + .WithListener(Listener) .Build(); _kCatContainer = new ContainerBuilder() - .WithImage("confluentinc/cp-kcat") + .WithImage("confluentinc/cp-kafkacat:6.1.9") .WithNetwork(_network) - .WithCommand("-c", "tail -f /dev/null") - .WithEntrypoint("sh") - .WithResourceMapping(Encoding.Default.GetBytes("Message produced by kcat"), "/data/msgs.txt") + .WithEntrypoint(CommonCommands.SleepInfinity) + .WithResourceMapping(Encoding.Default.GetBytes(Message), DataFilePath) .Build(); - - await _kCatContainer.StartAsync(); - await _kafkaContainer.StartAsync(); } - public Task DisposeAsync() + public async Task InitializeAsync() { - return Task.WhenAll( - _kafkaContainer.DisposeAsync().AsTask(), - _kCatContainer.DisposeAsync().AsTask() - ); + await _kafkaContainer.StartAsync() + .ConfigureAwait(false); + + await _kCatContainer.StartAsync() + .ConfigureAwait(false); } - + + public async Task DisposeAsync() + { + await _kafkaContainer.StartAsync() + .ConfigureAwait(false); + + await _kCatContainer.StartAsync() + .ConfigureAwait(false); + + await _network.DisposeAsync() + .ConfigureAwait(false); + } + [Fact] - public async Task TestUsageWithListener() + public async Task ConsumesProducedKafkaMessage() { - // kcat producer - await _kCatContainer.ExecAsync(new List() - { - "kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt" - }); - - - // kcat consumer - var kCatResult = await _kCatContainer.ExecAsync(new List() - { - "kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1" - }); - - Assert.Contains("Message produced by kcat", kCatResult.Stdout); + _ = await _kCatContainer.ExecAsync(new[] { "kafkacat", "-b", Listener, "-t", "msgs", "-P", "-l", DataFilePath }) + .ConfigureAwait(true); + + var execResult = await _kCatContainer.ExecAsync(new[] { "kafkacat", "-b", Listener, "-C", "-t", "msgs", "-c", "1" }) + .ConfigureAwait(true); + + Assert.Equal(Message, execResult.Stdout.Trim()); } - } \ No newline at end of file diff --git a/tests/Testcontainers.Kafka.Tests/KafkaContainerRegistryTest.cs b/tests/Testcontainers.Kafka.Tests/KafkaContainerRegistryTest.cs new file mode 100644 index 000000000..9220c202b --- /dev/null +++ b/tests/Testcontainers.Kafka.Tests/KafkaContainerRegistryTest.cs @@ -0,0 +1,129 @@ +namespace Testcontainers.Kafka; + +public sealed class KafkaContainerRegistryTest : IAsyncLifetime +{ + private const string Schema = @" + { + ""$schema"": ""http://json-schema.org/draft-04/schema#"", + ""title"": ""User"", + ""type"": ""object"", + ""additionalProperties"": false, + ""properties"": { + ""FirstName"": { + ""type"": [""null"", ""string""] + }, + ""LastName"": { + ""type"": [""null"", ""string""] + } + } + }"; + + private const ushort RestPort = 8085; + + private const string SchemaRegistryNetworkAlias = "schema-registry"; + + private const string Listener = "kafka:19092"; + + private readonly INetwork _network; + + private readonly KafkaContainer _kafkaContainer; + + private readonly IContainer _schemaRegistryContainer; + + public KafkaContainerRegistryTest() + { + _network = new NetworkBuilder() + .Build(); + + _kafkaContainer = new KafkaBuilder() + .WithImage("confluentinc/cp-kafka:6.1.9") + .WithNetwork(_network) + .WithListener(Listener) + .Build(); + + _schemaRegistryContainer = new ContainerBuilder() + .WithImage("confluentinc/cp-schema-registry:6.1.9") + .WithPortBinding(RestPort, true) + .WithNetwork(_network) + .WithNetworkAliases(SchemaRegistryNetworkAlias) + .WithEnvironment("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + RestPort) + .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT") + .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + Listener) + .WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", SchemaRegistryNetworkAlias) + .WithWaitStrategy(Wait.ForUnixContainer().UntilHttpRequestIsSucceeded(request => + request.ForPort(RestPort).ForPath("/subjects"))) + .Build(); + } + + public async Task InitializeAsync() + { + await _kafkaContainer.StartAsync() + .ConfigureAwait(false); + + await _schemaRegistryContainer.StartAsync() + .ConfigureAwait(false); + } + + public async Task DisposeAsync() + { + await _kafkaContainer.StartAsync() + .ConfigureAwait(false); + + await _schemaRegistryContainer.StartAsync() + .ConfigureAwait(false); + + await _network.DisposeAsync() + .ConfigureAwait(false); + } + + [Fact] + public async Task ConsumerReturnsProducerMessage() + { + // Given + const string topic = "user"; + + var subject = SubjectNameStrategy.Topic.ConstructValueSubjectName(topic); + + var bootstrapServer = _kafkaContainer.GetBootstrapAddress(); + + var producerConfig = new ProducerConfig(); + producerConfig.BootstrapServers = bootstrapServer; + + var consumerConfig = new ConsumerConfig(); + consumerConfig.BootstrapServers = bootstrapServer; + consumerConfig.GroupId = "sample-consumer"; + consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest; + + var message = new Message(); + message.Value = new User("John", "Doe"); + + var schemaRegistryConfig = new SchemaRegistryConfig(); + schemaRegistryConfig.Url = new UriBuilder(Uri.UriSchemeHttp, _schemaRegistryContainer.Hostname, _schemaRegistryContainer.GetMappedPublicPort(RestPort)).ToString(); + + // When + using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig); + _ = await schemaRegistry.RegisterSchemaAsync(subject, new Schema(Schema, SchemaType.Json)) + .ConfigureAwait(true); + + using var producer = new ProducerBuilder(producerConfig) + .SetValueSerializer(new JsonSerializer(schemaRegistry)) + .Build(); + + _ = await producer.ProduceAsync(topic, message) + .ConfigureAwait(true); + + using var consumer = new ConsumerBuilder(consumerConfig) + .SetValueDeserializer(new JsonDeserializer().AsSyncOverAsync()) + .Build(); + + consumer.Subscribe(topic); + + var result = consumer.Consume(TimeSpan.FromSeconds(15)); + + // Then + Assert.NotNull(result); + Assert.Equal(message.Value, result.Message.Value); + } + + private record User(string FirstName, string LastName); +} \ No newline at end of file diff --git a/tests/Testcontainers.Kafka.Tests/KafkaContainerWithRegistryTest.cs b/tests/Testcontainers.Kafka.Tests/KafkaContainerWithRegistryTest.cs deleted file mode 100644 index 3ad1a11d5..000000000 --- a/tests/Testcontainers.Kafka.Tests/KafkaContainerWithRegistryTest.cs +++ /dev/null @@ -1,132 +0,0 @@ -using System.Diagnostics; -using System.Threading; -using Confluent.SchemaRegistry; -using Confluent.SchemaRegistry.Serdes; -using DotNet.Testcontainers.Builders; -using DotNet.Testcontainers.Containers; -using DotNet.Testcontainers.Networks; - -namespace Testcontainers.Kafka; - -public class KafkaContainerWithRegistryTest : IAsyncLifetime -{ - private INetwork _network; - private KafkaContainer _kafkaContainer; - private IContainer _kafkaSchemaRegistry; - - const string schema = @" - { - ""$schema"": ""http://json-schema.org/draft-07/schema#"", - ""$id"": ""http://example.com/product.schema.json"", - ""title"": ""User"", - ""description"": ""A User"", - ""type"": ""object"", - ""properties"": { - ""age"": { - ""description"": ""The age of the user"", - ""type"": ""integer"" - }, - ""lastname"": { - ""description"": ""Last name of the user"", - ""type"": ""string"" - }, - ""firstname"": { - ""description"": ""First name of the user"", - ""type"": ""string"" - } - }, - ""required"": [""firstname"", ""lastname""] - }"; - - public async Task InitializeAsync() - { - _network = new NetworkBuilder().Build(); - _kafkaContainer = new KafkaBuilder() - .WithImage("confluentinc/cp-kafka") - .WithNetwork(_network) - .WithListener("kafka:19092") - .Build(); - - _kafkaSchemaRegistry = new ContainerBuilder() - .WithImage("confluentinc/cp-schema-registry:7.8.0") - .DependsOn(_kafkaContainer) - .WithPortBinding(8085, true) - .WithNetworkAliases("schemaregistry") - .WithNetwork(_network) - .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:19092") - .WithEnvironment("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085") - .WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry") - .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT") - .WithWaitStrategy( - Wait.ForUnixContainer() - .UntilHttpRequestIsSucceeded(request => request.ForPath("/subjects") - .ForPort(8085)) - - ) - .Build(); - - await _kafkaContainer.StartAsync(); - await _kafkaSchemaRegistry.StartAsync(); - } - - public async Task DisposeAsync() - { - await Task.WhenAll( - _kafkaContainer.DisposeAsync().AsTask(), - _kafkaSchemaRegistry.DisposeAsync().AsTask() - ); - } - - /// - /// Test the usage of the Kafka container with the schema registry. - /// - [Fact] - public async Task TestUsageWithSchemaRegistry() - { - const string topicName = "user-topic"; - var subject = SubjectNameStrategy.Topic.ConstructValueSubjectName(topicName, null); - - var bootstrapServers = this._kafkaContainer.GetBootstrapAddress() - .Replace("PLAINTEXT://", "", StringComparison.OrdinalIgnoreCase); - - var jsonSerializerConfig = new JsonSerializerConfig - { - BufferBytes = 100, - }; - - var schemaRegistryUrl = $"http://localhost:{_kafkaSchemaRegistry.GetMappedPublicPort(8085)}"; - - var schemaRegistryConfig = new SchemaRegistryConfig - { - Url = schemaRegistryUrl, - }; - // Init Kafka producer to send a message - var producerConfig = new ProducerConfig - { - BootstrapServers = bootstrapServers, - ClientId = $"test-client-{DateTime.Now.Ticks}", - }; - using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig); - - var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, new Schema(schema, SchemaType.Json)); - - using var producer = new ProducerBuilder(producerConfig) - .SetValueSerializer(new JsonSerializer(schemaRegistry, jsonSerializerConfig)) - .Build(); - - await Assert.ThrowsAsync(async () => - { - try - { - var user = new User { Name = "value", Age = 30 }; - await producer.ProduceAsync(topicName, new Message { Value = user }); - } - catch (Exception e) - { - Assert.True(e is ProduceException); - Debug.Assert(e.InnerException != null, "e.InnerException != null"); - throw e.InnerException; - } - }); - } -} diff --git a/tests/Testcontainers.Kafka.Tests/User.cs b/tests/Testcontainers.Kafka.Tests/User.cs deleted file mode 100644 index 30759b7dc..000000000 --- a/tests/Testcontainers.Kafka.Tests/User.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace Testcontainers.Kafka; - -public class User -{ - public string Name { get; set; } - public int Age { get; set; } -} \ No newline at end of file diff --git a/tests/Testcontainers.Kafka.Tests/Usings.cs b/tests/Testcontainers.Kafka.Tests/Usings.cs index a7fcce194..3e69bfb4f 100644 --- a/tests/Testcontainers.Kafka.Tests/Usings.cs +++ b/tests/Testcontainers.Kafka.Tests/Usings.cs @@ -1,5 +1,15 @@ global using System; +global using System.Collections.Generic; +global using System.Diagnostics; +global using System.Text; +global using System.Threading; global using System.Threading.Tasks; global using Confluent.Kafka; +global using Confluent.Kafka.SyncOverAsync; +global using Confluent.SchemaRegistry; +global using Confluent.SchemaRegistry.Serdes; +global using DotNet.Testcontainers.Builders; global using DotNet.Testcontainers.Commons; +global using DotNet.Testcontainers.Containers; +global using DotNet.Testcontainers.Networks; global using Xunit; \ No newline at end of file