Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/npm_and_yarn/website/micromatch…
Browse files Browse the repository at this point in the history
…-4.0.8
  • Loading branch information
brmagadutra authored Feb 10, 2025
2 parents ddc8445 + f96d4fe commit cdc08bc
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 20 deletions.
46 changes: 38 additions & 8 deletions src/KafkaFlow/Producers/MessageProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,19 @@ public void Dispose()
_producer?.Dispose();
}

private static void FillContextWithResultMetadata(IMessageContext context, DeliveryResult<byte[], byte[]> result)
private void FillContextWithResultMetadata(IMessageContext context, DeliveryResult<byte[], byte[]> result)
{
var concreteProducerContext = (ProducerContext)context.ProducerContext;
var concreteProducerContext = context.ProducerContext as ProducerContext;

if (concreteProducerContext is null)
{
_logHandler.Warning("Producer context is null on FillContextWithResultMetadata", new
{
DeliveryResult = result,
});

return;
}

concreteProducerContext.Offset = result.Offset;
concreteProducerContext.Partition = result.Partition;
Expand Down Expand Up @@ -258,7 +268,17 @@ private IProducer<byte[], byte[]> EnsureProducer()
{
foreach (var handler in _configuration.StatisticsHandlers)
{
handler.Invoke(statistics);
try
{
handler?.Invoke(statistics);
}
catch (Exception ex)
{
_logHandler.Error("Kafka Producer StatisticsHandler Error", ex, new
{
ProducerName = _configuration.Name,
});
}
}
});

Expand Down Expand Up @@ -351,14 +371,24 @@ private void InternalProduce(

void Handler(DeliveryReport<byte[], byte[]> report)
{
if (report.Error.IsFatal)
try
{
this.InvalidateProducer(report.Error, report);
}
if (report.Error.IsFatal)
{
this.InvalidateProducer(report.Error, report);
}

FillContextWithResultMetadata(context, report);
FillContextWithResultMetadata(context, report);
}
catch (Exception ex)
{
_logHandler.Error("Delivery Result Handler Error", ex, new
{
Report = report,
});
}

deliveryHandler(report);
deliveryHandler?.Invoke(report);
}
}

Expand Down
12 changes: 6 additions & 6 deletions website/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions website/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8455,14 +8455,14 @@ write-file-atomic@^3.0.3:
typedarray-to-buffer "^3.1.5"

ws@^7.3.1:
version "7.5.9"
resolved "https://registry.npmjs.org/ws/-/ws-7.5.9.tgz"
integrity sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==
version "7.5.10"
resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.10.tgz#58b5c20dc281633f6c19113f39b349bd8bd558d9"
integrity sha512-+dbF1tHwZpXcbOJdVOkzLDxZP1ailvSxM6ZweXTegylPny803bFhA+vqBYw4s31NSAk4S2Qz+AKXK9a4wkdjcQ==

ws@^8.13.0:
version "8.16.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.16.0.tgz#d1cd774f36fbc07165066a60e40323eab6446fd4"
integrity sha512-HS0c//TP7Ina87TfiPUz1rQzMhHrl/SG2guqRcTOIUYD2q8uhUdNHZYJUaQ8aTGPzCh+c6oawMKW35nFl1dxyQ==
version "8.17.1"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.17.1.tgz#9293da530bb548febc95371d90f9c878727d919b"
integrity sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==

xdg-basedir@^5.0.1, xdg-basedir@^5.1.0:
version "5.1.0"
Expand Down

0 comments on commit cdc08bc

Please sign in to comment.