Skip to content

Commit

Permalink
test: add test to connect Kafka and Schema Registry to resolve issue #…
Browse files Browse the repository at this point in the history
…736

Signed-off-by: SebastienDegodez <[email protected]>
  • Loading branch information
SebastienDegodez committed Jan 9, 2025
1 parent 58928ec commit 97af438
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 1 deletion.
4 changes: 3 additions & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
<PackageVersion Include="Azure.Storage.Blobs" Version="12.17.0"/>
<PackageVersion Include="Azure.Storage.Queues" Version="12.15.0"/>
<PackageVersion Include="ClickHouse.Client" Version="7.9.1"/>
<PackageVersion Include="Confluent.Kafka" Version="2.0.2"/>
<PackageVersion Include="Confluent.Kafka" Version="2.8.0"/>
<PackageVersion Include="Confluent.SchemaRegistry" Version="2.8.0" />
<PackageVersion Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.8.0" />
<PackageVersion Include="Consul" Version="1.6.10.9"/>
<PackageVersion Include="CouchbaseNetClient" Version="3.6.4"/>
<PackageVersion Include="DotPulsar" Version="3.3.2"/>
Expand Down
134 changes: 134 additions & 0 deletions tests/Testcontainers.Kafka.Tests/KafkaContainerWithRegistryTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
using System.Diagnostics;
using System.Threading;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Containers;
using DotNet.Testcontainers.Networks;

namespace Testcontainers.Kafka;

public class KafkaContainerWithRegistryTest : IAsyncLifetime
{
private INetwork _network;
private KafkaContainer _kafkaContainer;
private IContainer _kafkaSchemaRegistry;

const string schema = @"
{
""$schema"": ""http://json-schema.org/draft-07/schema#"",
""$id"": ""http://example.com/product.schema.json"",
""title"": ""User"",
""description"": ""A User"",
""type"": ""object"",
""properties"": {
""age"": {
""description"": ""The age of the user"",
""type"": ""integer""
},
""lastname"": {
""description"": ""Last name of the user"",
""type"": ""string""
},
""firstname"": {
""description"": ""First name of the user"",
""type"": ""string""
}
},
""required"": [""firstname"", ""lastname""]
}";

public async Task InitializeAsync()
{
_network = new NetworkBuilder().Build();
_kafkaContainer = new KafkaBuilder()
.WithImage("confluentinc/cp-kafka")
.WithNetwork(_network)
.WithListener("kafka:19092")
.Build();

_kafkaSchemaRegistry = new ContainerBuilder()
.WithImage("confluentinc/cp-schema-registry:7.8.0")
.DependsOn(_kafkaContainer)
.WithPortBinding(8085, true)
.WithNetworkAliases("schemaregistry")
.WithNetwork(_network)
.WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:19092")
.WithEnvironment("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085")
.WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry")
.WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT")
.WithWaitStrategy(
Wait.ForUnixContainer()
.UntilHttpRequestIsSucceeded(request => request.ForPath("/subjects")
.ForPort(8085))

)
.Build();

await _kafkaContainer.StartAsync();

var cts = new CancellationTokenSource(15000);
await _kafkaSchemaRegistry.StartAsync(cts.Token);
}

public async Task DisposeAsync()
{
await Task.WhenAll(
_kafkaContainer.DisposeAsync().AsTask(),
_kafkaSchemaRegistry.DisposeAsync().AsTask()
);
}

/// <summary>
/// Test the usage of the Kafka container with the schema registry.
/// </summary>
[Fact]
public async Task TestUsageWithSchemaRegistry()
{
const string topicName = "user-topic";
var subject = SubjectNameStrategy.Topic.ConstructValueSubjectName(topicName, null);

var bootstrapServers = this._kafkaContainer.GetBootstrapAddress()
.Replace("PLAINTEXT://", "", StringComparison.OrdinalIgnoreCase);

var jsonSerializerConfig = new JsonSerializerConfig
{
BufferBytes = 100,
};

var schemaRegistryUrl = $"http://localhost:{_kafkaSchemaRegistry.GetMappedPublicPort(8085)}";

var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = schemaRegistryUrl,
};
// Init Kafka producer to send a message
var producerConfig = new ProducerConfig
{
BootstrapServers = bootstrapServers,
ClientId = $"test-client-{DateTime.Now.Ticks}",
};
using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);

var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, new Schema(schema, SchemaType.Json));

using var producer = new ProducerBuilder<string, User>(producerConfig)
.SetValueSerializer(new JsonSerializer<User>(schemaRegistry, jsonSerializerConfig))
.Build();

await Assert.ThrowsAsync<SchemaRegistryException>(async () =>
{
try
{
var user = new User { Name = "value", Age = 30 };
await producer.ProduceAsync(topicName, new Message<string, User> { Value = user });
}
catch (Exception e)
{
Assert.True(e is ProduceException<string, User>);
Debug.Assert(e.InnerException != null, "e.InnerException != null");
throw e.InnerException;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
<IsPublishable>false</IsPublishable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.SchemaRegistry" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Json" />
<PackageReference Include="Microsoft.NET.Test.Sdk"/>
<PackageReference Include="coverlet.collector"/>
<PackageReference Include="xunit.runner.visualstudio"/>
Expand Down
7 changes: 7 additions & 0 deletions tests/Testcontainers.Kafka.Tests/User.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Testcontainers.Kafka;

public class User
{
public string Name { get; set; }
public int Age { get; set; }
}

0 comments on commit 97af438

Please sign in to comment.