From 9aaf5a4e389a39e65091a6a93a4a88a8d9e1d3af Mon Sep 17 00:00:00 2001 From: dimlio <122263440+dimlio@users.noreply.github.com> Date: Mon, 16 Sep 2024 13:49:49 +0300 Subject: [PATCH] feat: be able to specify configuration for created topic [sc-14009] --- .../Kafka/ForTesting/RDKafkaClient+Topic.swift | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift b/Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift index 4c45e0da..f8bfc67f 100644 --- a/Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift +++ b/Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift @@ -21,13 +21,14 @@ extension RDKafkaClient { /// Create a topic with a unique name (`UUID`). /// Blocks for a maximum of `timeout` milliseconds. /// - Parameter partitions: Partitions in topic (default: -1 - default for broker) + /// - Parameter config: Topic configuration, if empty broker defaults are used. See https://kafka.apache.org/documentation.html#topicconfigs /// - Parameter timeout: Timeout in milliseconds. /// - Returns: Name of newly created topic. /// - Throws: A ``KafkaError`` if the topic creation failed. - public func _createUniqueTopic(partitions: Int32 = -1, timeout: Int32) throws -> String { + public func _createUniqueTopic(partitions: Int32 = -1, config: [String: String] = [:], timeout: Int32) throws -> String { let uniqueTopicName = UUID().uuidString - try _createTopic(topicName: uniqueTopicName, partitions: partitions, timeout: timeout) + try _createTopic(topicName: uniqueTopicName, partitions: partitions, config: config, timeout: timeout) return uniqueTopicName } @@ -35,10 +36,11 @@ extension RDKafkaClient { /// Create a topic with specified name /// Blocks for a maximum of `timeout` milliseconds. /// - Parameter partitions: Partitions in topic (default: -1 - default for broker) + /// - Parameter config: Topic configuration, if empty broker defaults are used. See https://kafka.apache.org/documentation.html#topicconfigs /// - Parameter timeout: Timeout in milliseconds. /// - Returns: Name of newly created topic. /// - Throws: A ``KafkaError`` if the topic creation failed. - public func _createTopic(topicName: String, partitions: Int32 = -1, timeout: Int32) throws { + public func _createTopic(topicName: String, partitions: Int32 = -1, config: [String: String] = [:], timeout: Int32) throws { let errorChars = UnsafeMutablePointer.allocate(capacity: RDKafkaClient.stringSize) defer { errorChars.deallocate() } @@ -54,6 +56,13 @@ extension RDKafkaClient { } defer { rd_kafka_NewTopic_destroy(newTopic) } + for (key, value) in config { + let resultCode = rd_kafka_NewTopic_set_config(newTopic, key, value) + guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: resultCode) + } + } + try self.withKafkaHandlePointer { kafkaHandle in let resultQueue = rd_kafka_queue_new(kafkaHandle) defer { rd_kafka_queue_destroy(resultQueue) }