Skip to content

Commit

Permalink
Merge branch 'master' into fix/produce-memory-leak
Browse files Browse the repository at this point in the history
  • Loading branch information
JotaDobleEse authored Feb 13, 2025
2 parents 9f339ac + 83bce82 commit 09c04bf
Show file tree
Hide file tree
Showing 14 changed files with 573 additions and 22 deletions.
7 changes: 7 additions & 0 deletions KafkaFlow.sln
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.OpenTelemetry", "samples\KafkaFlow.Sample.OpenTelemetry\KafkaFlow.Sample.OpenTelemetry.csproj", "{E9E8B374-4165-45F2-8DF5-F141E141AC1D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.CooperativeSticky", "samples\KafkaFlow.Sample.CooperativeSticky\KafkaFlow.Sample.CooperativeSticky.csproj", "{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -222,6 +224,10 @@ Global
{E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Release|Any CPU.Build.0 = Release|Any CPU
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -265,6 +271,7 @@ Global
{1755E8DB-970C-4A24-8B7C-A2BEC1410BEE} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068}
{80080C1D-579E-4AB2-935D-5CFFC51843D8} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068}
{E9E8B374-4165-45F2-8DF5-F141E141AC1D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB}
Expand Down
44 changes: 44 additions & 0 deletions samples/KafkaFlow.Sample.CooperativeSticky/HostedService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using KafkaFlow.Producers;
using Microsoft.Extensions.Hosting;

namespace KafkaFlow.Sample.CooperativeSticky;

public class HostedService : IHostedService
{
private IMessageProducer _producer;
const string producerName = "PrintConsole";
const string topicName = "sample-topic";


public HostedService(IProducerAccessor producerAccessor)
{
_producer = producerAccessor.GetProducer(producerName);
}

public async Task StartAsync(CancellationToken cancellationToken)
{
try
{
while (true)
{
await _producer.ProduceAsync(
topicName,
Guid.NewGuid().ToString(),
new TestMessage { Text = $"Message: {Guid.NewGuid()}" });
await Task.Delay(500, cancellationToken);
}
}
catch (Exception e)
{
Console.WriteLine(e);
}
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<IsPackable>false</IsPackable>
<GenerateDocumentationFile>false</GenerateDocumentationFile>
<InvariantGlobalization>true</InvariantGlobalization>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<NoWarn>1701;1702;CS1591;SA1600</NoWarn>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<NoWarn>1701;1702;CS1591;SA1600</NoWarn>
</PropertyGroup>


<ItemGroup>
<ProjectReference Include="..\..\src\KafkaFlow.LogHandler.Console\KafkaFlow.LogHandler.Console.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.ProtobufNet\KafkaFlow.Serializer.ProtobufNet.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow\KafkaFlow.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
</ItemGroup>


</Project>
18 changes: 18 additions & 0 deletions samples/KafkaFlow.Sample.CooperativeSticky/PrintConsoleHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Threading.Tasks;

namespace KafkaFlow.Sample.CooperativeSticky;

public class PrintConsoleHandler : IMessageHandler<TestMessage>
{
public Task Handle(IMessageContext context, TestMessage message)
{
Console.WriteLine(
"Partition: {0} | Offset: {1} | Message: {2}",
context.ConsumerContext.Partition,
context.ConsumerContext.Offset,
message.Text);

return Task.CompletedTask;
}
}
51 changes: 51 additions & 0 deletions samples/KafkaFlow.Sample.CooperativeSticky/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Confluent.Kafka;
using KafkaFlow;
using KafkaFlow.Sample.CooperativeSticky;
using KafkaFlow.Serializer;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using AutoOffsetReset = KafkaFlow.AutoOffsetReset;

const string producerName = "PrintConsole";
const string topicName = "sample-topic";
var hostBuilder = new HostBuilder();
hostBuilder.ConfigureServices(services =>
services.AddHostedService<HostedService>().AddKafka(
kafka => kafka
.UseConsoleLog()
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.CreateTopicIfNotExists(topicName, 6, 1)
.AddProducer(
producerName,
producer => producer
.DefaultTopic(topicName)
.AddMiddlewares(m => m.AddSerializer<ProtobufNetSerializer>())
)
.AddConsumer(
consumer => consumer
.WithConsumerConfig(new ConsumerConfig
{
PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky})
.Topic(topicName)
.WithGroupId("print-console-handler")
.WithBufferSize(100)
.WithWorkersCount(3)
.WithAutoCommitIntervalMs(100)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddDeserializer<ProtobufNetDeserializer>()
.AddTypedHandlers(h => h.AddHandler<PrintConsoleHandler>())
)
)
)
));

var build = hostBuilder.Build();
var kafkaBus = build.Services.CreateKafkaBus();
await kafkaBus.StartAsync();

await build.RunAsync();
await kafkaBus.StopAsync();
28 changes: 28 additions & 0 deletions samples/KafkaFlow.Sample.CooperativeSticky/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# KafkaFlow.Sample

This is a simple sample that shows how to produce and consume messages.

## How to run

### Requirements

- [.NET 6.0 SDK](https://dotnet.microsoft.com/en-us/download/dotnet/6.0)
- [Docker Desktop](https://www.docker.com/products/docker-desktop/)

### Start the cluster

Using your terminal of choice, start the cluster.
You can find a docker-compose file at the root of this repository.
Position the terminal in that folder and run the following command.

```bash
docker-compose up -d
```

### Run the Sample

Using your terminal of choice, start the sample for the sample folder.

```bash
dotnet run
```
10 changes: 10 additions & 0 deletions samples/KafkaFlow.Sample.CooperativeSticky/TestMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Runtime.Serialization;

namespace KafkaFlow.Sample.CooperativeSticky;

[DataContract]
public class TestMessage
{
[DataMember(Order = 1)]
public string Text { get; set; }
}
4 changes: 3 additions & 1 deletion src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Linq;
using System.Threading.Tasks;
using KafkaFlow.Consumers.DistributionStrategies;
using KafkaFlow.Extensions;

namespace KafkaFlow.Configuration;

Expand Down Expand Up @@ -251,7 +252,8 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
consumerConfigCopy.StatisticsIntervalMs = _consumerConfig.StatisticsIntervalMs ?? _statisticsInterval;

consumerConfigCopy.EnableAutoOffsetStore = false;
consumerConfigCopy.EnableAutoCommit = false;
consumerConfigCopy.EnableAutoCommit = _consumerConfig.PartitionAssignmentStrategy.IsStopTheWorldStrategy() is false;
consumerConfigCopy.AutoCommitIntervalMs = (int?)_autoCommitInterval.TotalMilliseconds;

consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration);

Expand Down
70 changes: 56 additions & 14 deletions src/KafkaFlow/Consumers/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
using Confluent.Kafka;
using KafkaFlow.Authentication;
using KafkaFlow.Configuration;
using KafkaFlow.Extensions;

namespace KafkaFlow.Consumers;

internal class Consumer : IConsumer
{
private readonly IDependencyResolver _dependencyResolver;
private readonly ILogHandler _logHandler;
private readonly bool _stopTheWorldStrategy;

private readonly List<Action<IDependencyResolver, IConsumer<byte[], byte[]>, List<TopicPartition>>>
_partitionsAssignedHandlers = new();
Expand All @@ -40,6 +42,7 @@ public Consumer(
this.Configuration = configuration;
_flowManager = new ConsumerFlowManager(this, _logHandler);
_maxPollIntervalExceeded = new(_logHandler);
_stopTheWorldStrategy = Configuration.GetKafkaConfig().PartitionAssignmentStrategy.IsStopTheWorldStrategy();

foreach (var handler in this.Configuration.StatisticsHandlers)
{
Expand Down Expand Up @@ -148,7 +151,17 @@ public void Commit(IReadOnlyCollection<Confluent.Kafka.TopicPartitionOffset> off
return;
}

_consumer.Commit(validOffsets);
if (_stopTheWorldStrategy)
{
_consumer.Commit(validOffsets);
}
else
{
foreach (var topicPartitionOffset in validOffsets)
{
_consumer.StoreOffset(topicPartitionOffset);
}
}

foreach (var offset in validOffsets)
{
Expand Down Expand Up @@ -237,17 +250,8 @@ private void EnsureConsumer()
var kafkaConfig = this.Configuration.GetKafkaConfig();

var consumerBuilder = new ConsumerBuilder<byte[], byte[]>(kafkaConfig)
.SetPartitionsAssignedHandler(
(consumer, partitions) => this.FirePartitionsAssignedHandlers(consumer, partitions))
.SetPartitionsRevokedHandler(
(consumer, partitions) =>
{
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
this.Assignment = new List<TopicPartition>();
this.Subscription = new List<string>();
_currentPartitionsOffsets.Clear();
_flowManager.Stop();
})
.SetPartitionsAssignedHandler(FirePartitionsAssignedHandlers)
.SetPartitionsRevokedHandler(FirePartitionRevokedHandlers)
.SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error)))
.SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics)));

Expand Down Expand Up @@ -293,13 +297,51 @@ private void FirePartitionsAssignedHandlers(
IConsumer<byte[], byte[]> consumer,
List<TopicPartition> partitions)
{
this.Assignment = partitions;
if (_stopTheWorldStrategy)
{
this.Assignment = partitions;
this.Subscription = consumer.Subscription;
_flowManager.Start(consumer);
_partitionsAssignedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
return;
}

if (partitions.Count == 0)
{
return;
}

this.Assignment = this.Assignment.Union(partitions).ToArray();
this.Subscription = consumer.Subscription;
_flowManager.Stop();
_flowManager.Start(consumer);

_partitionsAssignedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
}

private void FirePartitionRevokedHandlers(IConsumer<byte[], byte[]> consumer, List<Confluent.Kafka.TopicPartitionOffset> partitions)
{
if (_stopTheWorldStrategy)
{
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
this.Assignment = new List<TopicPartition>();
this.Subscription = new List<string>();
_currentPartitionsOffsets.Clear();
_flowManager.Stop();
return;
}

this.Assignment = this.Assignment.Except(partitions.Select(x => x.TopicPartition)).ToArray();
this.Subscription = consumer.Subscription;
foreach (var partition in partitions)
{
_currentPartitionsOffsets.TryRemove(partition.TopicPartition, out _);
}

_flowManager.Stop();
_flowManager.Start(consumer);
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
}

private void InvalidateConsumer()
{
_consumer?.Close();
Expand Down
Loading

0 comments on commit 09c04bf

Please sign in to comment.