Skip to content

Commit

Permalink
Merge branch 'master' into fix/produce-memory-leak
Browse files Browse the repository at this point in the history
  • Loading branch information
JotaDobleEse authored Feb 11, 2025
2 parents 6e740b5 + 418cefe commit 9f339ac
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ public interface IClusterConfigurationBuilder
/// <param name="topicName">The topic name</param>
/// <param name="numberOfPartitions">The number of Topic partitions. Default is to use the cluster-defined partitions.</param>
/// <param name="replicationFactor">The Topic replication factor. Default is to use the cluster-defined replication factor.</param>
/// <param name="configs">Additional topic creation configuration values.</param>
/// <returns></returns>
IClusterConfigurationBuilder CreateTopicIfNotExists(
string topicName,
int numberOfPartitions = -1,
short replicationFactor = -1);
short replicationFactor = -1,
Dictionary<string, string> configs = null);
}
1 change: 1 addition & 0 deletions src/KafkaFlow/Clusters/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public async Task CreateIfNotExistsAsync(IEnumerable<TopicConfiguration> configu
Name = topicConfiguration.Name,
ReplicationFactor = topicConfiguration.Replicas,
NumPartitions = topicConfiguration.Partitions,
Configs = topicConfiguration.Configs,
})
.ToArray();

Expand Down
5 changes: 3 additions & 2 deletions src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,10 @@ public IClusterConfigurationBuilder OnStarted(Action<IDependencyResolver> handle
public IClusterConfigurationBuilder CreateTopicIfNotExists(
string topicName,
int numberOfPartitions = -1,
short replicationFactor = -1)
short replicationFactor = -1,
Dictionary<string, string> configs = null)
{
_topicsToCreateIfNotExist.Add(new TopicConfiguration(topicName, numberOfPartitions, replicationFactor));
_topicsToCreateIfNotExist.Add(new TopicConfiguration(topicName, numberOfPartitions, replicationFactor, configs));
return this;
}
}
11 changes: 10 additions & 1 deletion src/KafkaFlow/Configuration/TopicConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Collections.Generic;

namespace KafkaFlow.Configuration;

/// <summary>
Expand All @@ -11,11 +13,13 @@ public class TopicConfiguration
/// <param name="name">The topic name</param>
/// <param name="partitions">The number of partitions for the topic</param>
/// <param name="replicas">Replication factor for the topic</param>
public TopicConfiguration(string name, int partitions, short replicas)
/// <param name="configs">Additional topic creation configuration values.</param>
public TopicConfiguration(string name, int partitions, short replicas, Dictionary<string, string> configs)
{
this.Name = name;
this.Partitions = partitions;
this.Replicas = replicas;
this.Configs = configs;
}

/// <summary>
Expand All @@ -32,4 +36,9 @@ public TopicConfiguration(string name, int partitions, short replicas)
/// Gets the Topic Replication Factor
/// </summary>
public short Replicas { get; }

/// <summary>
/// Gets the topic creation configuration
/// </summary>
public Dictionary<string, string> Configs { get; }
}
46 changes: 38 additions & 8 deletions src/KafkaFlow/Producers/MessageProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,19 @@ public void Dispose()
_producer?.Dispose();
}

private static void FillContextWithResultMetadata(IMessageContext context, DeliveryResult<byte[], byte[]> result)
private void FillContextWithResultMetadata(IMessageContext context, DeliveryResult<byte[], byte[]> result)
{
var concreteProducerContext = (ProducerContext)context.ProducerContext;
var concreteProducerContext = context.ProducerContext as ProducerContext;

if (concreteProducerContext is null)
{
_logHandler.Warning("Producer context is null on FillContextWithResultMetadata", new
{
DeliveryResult = result,
});

return;
}

concreteProducerContext.Offset = result.Offset;
concreteProducerContext.Partition = result.Partition;
Expand Down Expand Up @@ -258,7 +268,17 @@ private IProducer<byte[], byte[]> EnsureProducer()
{
foreach (var handler in _configuration.StatisticsHandlers)
{
handler.Invoke(statistics);
try
{
handler?.Invoke(statistics);
}
catch (Exception ex)
{
_logHandler.Error("Kafka Producer StatisticsHandler Error", ex, new
{
ProducerName = _configuration.Name,
});
}
}
});

Expand Down Expand Up @@ -362,14 +382,24 @@ private void InternalProduce(

void Handler(DeliveryReport<byte[], byte[]> report)
{
if (report.Error.IsFatal)
try
{
this.InvalidateProducer(report.Error, report);
}
if (report.Error.IsFatal)
{
this.InvalidateProducer(report.Error, report);
}

FillContextWithResultMetadata(context, report);
FillContextWithResultMetadata(context, report);
}
catch (Exception ex)
{
_logHandler.Error("Delivery Result Handler Error", ex, new
{
Report = report,
});
}

deliveryHandler(report);
deliveryHandler?.Invoke(report);
}
}

Expand Down
34 changes: 17 additions & 17 deletions website/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 34 additions & 12 deletions website/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1540,7 +1540,7 @@
"@docusaurus/theme-search-algolia" "3.1.0"
"@docusaurus/types" "3.1.0"

"@docusaurus/[email protected]", "react-loadable@npm:@docusaurus/[email protected]":
"@docusaurus/[email protected]":
version "5.5.2"
resolved "https://registry.npmjs.org/@docusaurus/react-loadable/-/react-loadable-5.5.2.tgz"
integrity sha512-A3dYjdBGuy0IGT+wyLIGIKLRE+sAk1iNk0f1HjNDysO7u8lhL4N3VEm+FAubmJbAztn94F7MxBTPmnixbiyFdQ==
Expand Down Expand Up @@ -2849,7 +2849,14 @@ brace-expansion@^1.1.7:
balanced-match "^1.0.0"
concat-map "0.0.1"

braces@^3.0.2, braces@~3.0.2:
braces@^3.0.3:
version "3.0.3"
resolved "https://registry.yarnpkg.com/braces/-/braces-3.0.3.tgz#490332f40919452272d55a8480adc0c441358789"
integrity sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==
dependencies:
fill-range "^7.1.1"

braces@~3.0.2:
version "3.0.2"
resolved "https://registry.npmjs.org/braces/-/braces-3.0.2.tgz"
integrity sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==
Expand Down Expand Up @@ -4106,6 +4113,13 @@ fill-range@^7.0.1:
dependencies:
to-regex-range "^5.0.1"

fill-range@^7.1.1:
version "7.1.1"
resolved "https://registry.yarnpkg.com/fill-range/-/fill-range-7.1.1.tgz#44265d3cac07e3ea7dc247516380643754a05292"
integrity sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==
dependencies:
to-regex-range "^5.0.1"

[email protected]:
version "1.2.0"
resolved "https://registry.npmjs.org/finalhandler/-/finalhandler-1.2.0.tgz"
Expand Down Expand Up @@ -5981,11 +5995,11 @@ micromark@^4.0.0:
micromark-util-types "^2.0.0"

micromatch@^4.0.2, micromatch@^4.0.4, micromatch@^4.0.5:
version "4.0.5"
resolved "https://registry.npmjs.org/micromatch/-/micromatch-4.0.5.tgz"
integrity sha512-DMy+ERcEW2q8Z2Po+WNXuw3c5YaUSFjAO5GsJqfEl7UjvtIuFKO6ZrKvcItdy98dwFI2N1tg3zNIdKaQT+aNdA==
version "4.0.8"
resolved "https://registry.yarnpkg.com/micromatch/-/micromatch-4.0.8.tgz#d66fa18f3a47076789320b9b1af32bd86d9fa202"
integrity sha512-PXwfBhYu0hBCPw8Dn0E+WDYb7af3dSLVWKi3HGv84IdF4TyFoC0ysxFd0Goxw7nSv4T/PzEJQxsYsEiFCKo2BA==
dependencies:
braces "^3.0.2"
braces "^3.0.3"
picomatch "^2.3.1"

[email protected], "mime-db@>= 1.43.0 < 2":
Expand Down Expand Up @@ -6995,6 +7009,14 @@ react-loadable-ssr-addon-v5-slorber@^1.0.1:
dependencies:
"@babel/runtime" "^7.10.3"

"react-loadable@npm:@docusaurus/[email protected]":
version "5.5.2"
resolved "https://registry.npmjs.org/@docusaurus/react-loadable/-/react-loadable-5.5.2.tgz"
integrity sha512-A3dYjdBGuy0IGT+wyLIGIKLRE+sAk1iNk0f1HjNDysO7u8lhL4N3VEm+FAubmJbAztn94F7MxBTPmnixbiyFdQ==
dependencies:
"@types/react" "*"
prop-types "^15.6.2"

react-router-config@^5.1.1:
version "5.1.1"
resolved "https://registry.npmjs.org/react-router-config/-/react-router-config-5.1.1.tgz"
Expand Down Expand Up @@ -8433,14 +8455,14 @@ write-file-atomic@^3.0.3:
typedarray-to-buffer "^3.1.5"

ws@^7.3.1:
version "7.5.9"
resolved "https://registry.npmjs.org/ws/-/ws-7.5.9.tgz"
integrity sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==
version "7.5.10"
resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.10.tgz#58b5c20dc281633f6c19113f39b349bd8bd558d9"
integrity sha512-+dbF1tHwZpXcbOJdVOkzLDxZP1ailvSxM6ZweXTegylPny803bFhA+vqBYw4s31NSAk4S2Qz+AKXK9a4wkdjcQ==

ws@^8.13.0:
version "8.16.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.16.0.tgz#d1cd774f36fbc07165066a60e40323eab6446fd4"
integrity sha512-HS0c//TP7Ina87TfiPUz1rQzMhHrl/SG2guqRcTOIUYD2q8uhUdNHZYJUaQ8aTGPzCh+c6oawMKW35nFl1dxyQ==
version "8.17.1"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.17.1.tgz#9293da530bb548febc95371d90f9c878727d919b"
integrity sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==

xdg-basedir@^5.0.1, xdg-basedir@^5.1.0:
version "5.1.0"
Expand Down

0 comments on commit 9f339ac

Please sign in to comment.