diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index 4894023ea..6cf6e5288 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -188,9 +188,19 @@ public void Dispose() _producer?.Dispose(); } - private static void FillContextWithResultMetadata(IMessageContext context, DeliveryResult result) + private void FillContextWithResultMetadata(IMessageContext context, DeliveryResult result) { - var concreteProducerContext = (ProducerContext)context.ProducerContext; + var concreteProducerContext = context.ProducerContext as ProducerContext; + + if (concreteProducerContext is null) + { + _logHandler.Warning("Producer context is null on FillContextWithResultMetadata", new + { + DeliveryResult = result, + }); + + return; + } concreteProducerContext.Offset = result.Offset; concreteProducerContext.Partition = result.Partition; @@ -258,7 +268,17 @@ private IProducer EnsureProducer() { foreach (var handler in _configuration.StatisticsHandlers) { - handler.Invoke(statistics); + try + { + handler?.Invoke(statistics); + } + catch (Exception ex) + { + _logHandler.Error("Kafka Producer StatisticsHandler Error", ex, new + { + ProducerName = _configuration.Name, + }); + } } }); @@ -351,14 +371,24 @@ private void InternalProduce( void Handler(DeliveryReport report) { - if (report.Error.IsFatal) + try { - this.InvalidateProducer(report.Error, report); - } + if (report.Error.IsFatal) + { + this.InvalidateProducer(report.Error, report); + } - FillContextWithResultMetadata(context, report); + FillContextWithResultMetadata(context, report); + } + catch (Exception ex) + { + _logHandler.Error("Delivery Result Handler Error", ex, new + { + Report = report, + }); + } - deliveryHandler(report); + deliveryHandler?.Invoke(report); } }