diff --git a/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift b/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift index fde7aa9f..8814d792 100644 --- a/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift @@ -49,6 +49,9 @@ public struct KafkaProducerConfiguration { /// Default: `false` public var isIdempotenceEnabled: Bool = false + + public var isExperimentalGaplessGuaranteeEnabled: Bool = false + /// Producer queue options. public struct QueueConfiguration: Sendable, Hashable { /// Maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions. @@ -184,6 +187,7 @@ extension KafkaProducerConfiguration { var resultDict: [String: String] = [:] resultDict["enable.idempotence"] = String(self.isIdempotenceEnabled) + resultDict["enable.gapless.guarantee"] = String(isExperimentalGaplessGuaranteeEnabled) resultDict["queue.buffering.max.messages"] = String(self.queue.messageLimit.rawValue) resultDict["queue.buffering.max.kbytes"] = String(self.queue.maximumMessageBytes / 1024) resultDict["queue.buffering.max.ms"] = String(self.queue.maximumMessageQueueTime.inMilliseconds) diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index 19f3dbbc..19b565c0 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -486,6 +486,12 @@ public final class KafkaConsumer: Sendable, Service { } else { try await client.assign(topicPartitionList: nil) // fallback } + case .error(let error): + if let eventSource { + _ = eventSource.yield(.error(error)) + } else { + throw error + } default: break // Ignore } diff --git a/Sources/Kafka/KafkaConsumerEvent.swift b/Sources/Kafka/KafkaConsumerEvent.swift index 62ad313e..1726cc79 100644 --- a/Sources/Kafka/KafkaConsumerEvent.swift +++ b/Sources/Kafka/KafkaConsumerEvent.swift @@ -100,6 +100,8 @@ public enum RebalanceAction : Sendable, Hashable { public enum KafkaConsumerEvent: Sendable, Hashable { /// Rebalance from librdkafka case rebalance(RebalanceAction) + /// Error from librdkafka + case error(KafkaError) /// - Important: Always provide a `default` case when switiching over this `enum`. case DO_NOT_SWITCH_OVER_THIS_EXHAUSITVELY @@ -109,6 +111,8 @@ public enum KafkaConsumerEvent: Sendable, Hashable { fatalError("Cannot cast \(event) to KafkaConsumerEvent") case .rebalance(let action): self = .rebalance(action) + case .error(let error): + self = .error(error) case .deliveryReport: fatalError("Cannot cast \(event) to KafkaConsumerEvent") } diff --git a/Sources/Kafka/KafkaProducer.swift b/Sources/Kafka/KafkaProducer.swift index 690f080e..85d71726 100644 --- a/Sources/Kafka/KafkaProducer.swift +++ b/Sources/Kafka/KafkaProducer.swift @@ -276,6 +276,12 @@ public final class KafkaProducer: Service, Sendable { _ = source?.yield(.deliveryReports(reports)) case .statistics(let statistics): self.configuration.metrics.update(with: statistics) + case .error(let error): + if let source { + _ = source.yield(.error(error)) + } else { + throw error + } default: fatalError("Cannot cast \(event) to KafkaProducerEvent") } diff --git a/Sources/Kafka/KafkaProducerEvent.swift b/Sources/Kafka/KafkaProducerEvent.swift index 3e2ee6b4..ccc68950 100644 --- a/Sources/Kafka/KafkaProducerEvent.swift +++ b/Sources/Kafka/KafkaProducerEvent.swift @@ -16,17 +16,8 @@ public enum KafkaProducerEvent: Sendable, Hashable { /// A collection of delivery reports received from the Kafka cluster indicating the status of produced messages. case deliveryReports([KafkaDeliveryReport]) + /// Error from librdkafka + case error(KafkaError) /// - Important: Always provide a `default` case when switching over this `enum`. case DO_NOT_SWITCH_OVER_THIS_EXHAUSITVELY - - internal init(_ event: RDKafkaClient.KafkaEvent) { - switch event { - case .deliveryReport(results: let results): - self = .deliveryReports(results) - case .rebalance(_): - fatalError("Cannot cast \(event) to KafkaProducerEvent") - case .statistics: - fatalError("Cannot cast \(event) to KafkaProducerEvent") - } - } } diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index b1e371b5..f69a5823 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -347,6 +347,7 @@ public final class RDKafkaClient: Sendable { case deliveryReport(results: [KafkaDeliveryReport]) case statistics(RDKafkaStatistics) case rebalance(RebalanceAction) + case error(KafkaError) } /// Poll the event `rd_kafka_queue_t` for new events. @@ -406,6 +407,8 @@ public final class RDKafkaClient: Sendable { if let forwardEvent = self.handleStatistics(event) { events.append(forwardEvent) } + case .error: + events.append(self.handleError(event)) case .none: // Finished reading events, return early return shouldSleep @@ -417,6 +420,19 @@ public final class RDKafkaClient: Sendable { return shouldSleep } + private func handleError(_ event: OpaquePointer?) -> KafkaEvent { + let err = rd_kafka_event_error(event) + let errorString = if let error = rd_kafka_err2str(rd_kafka_event_error(event)) { + String(cString: error) + } else { + "\(err)" + } + let fatal = rd_kafka_event_error_is_fatal(event) != 0 + + return .error(KafkaError.rdKafkaError(wrapping: err, errorMessage: errorString, isFatal: fatal)) + + } + /// Handle event of type `RDKafkaEvent.deliveryReport`. /// /// - Parameter event: Pointer to underlying `rd_kafka_event_t`.