Skip to content

Commit

Permalink
feat: be able to specify configuration for created topic [sc-14009]
Browse files Browse the repository at this point in the history
  • Loading branch information
dimlio authored Sep 16, 2024
1 parent ca96fb7 commit 9aaf5a4
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,26 @@ extension RDKafkaClient {
/// Create a topic with a unique name (`UUID`).
/// Blocks for a maximum of `timeout` milliseconds.
/// - Parameter partitions: Partitions in topic (default: -1 - default for broker)
/// - Parameter config: Topic configuration, if empty broker defaults are used. See https://kafka.apache.org/documentation.html#topicconfigs
/// - Parameter timeout: Timeout in milliseconds.
/// - Returns: Name of newly created topic.
/// - Throws: A ``KafkaError`` if the topic creation failed.
public func _createUniqueTopic(partitions: Int32 = -1, timeout: Int32) throws -> String {
public func _createUniqueTopic(partitions: Int32 = -1, config: [String: String] = [:], timeout: Int32) throws -> String {
let uniqueTopicName = UUID().uuidString

try _createTopic(topicName: uniqueTopicName, partitions: partitions, timeout: timeout)
try _createTopic(topicName: uniqueTopicName, partitions: partitions, config: config, timeout: timeout)

return uniqueTopicName
}

/// Create a topic with specified name
/// Blocks for a maximum of `timeout` milliseconds.
/// - Parameter partitions: Partitions in topic (default: -1 - default for broker)
/// - Parameter config: Topic configuration, if empty broker defaults are used. See https://kafka.apache.org/documentation.html#topicconfigs
/// - Parameter timeout: Timeout in milliseconds.
/// - Returns: Name of newly created topic.
/// - Throws: A ``KafkaError`` if the topic creation failed.
public func _createTopic(topicName: String, partitions: Int32 = -1, timeout: Int32) throws {
public func _createTopic(topicName: String, partitions: Int32 = -1, config: [String: String] = [:], timeout: Int32) throws {
let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: RDKafkaClient.stringSize)
defer { errorChars.deallocate() }

Expand All @@ -54,6 +56,13 @@ extension RDKafkaClient {
}
defer { rd_kafka_NewTopic_destroy(newTopic) }

for (key, value) in config {
let resultCode = rd_kafka_NewTopic_set_config(newTopic, key, value)
guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else {
throw KafkaError.rdKafkaError(wrapping: resultCode)
}
}

try self.withKafkaHandlePointer { kafkaHandle in
let resultQueue = rd_kafka_queue_new(kafkaHandle)
defer { rd_kafka_queue_destroy(resultQueue) }
Expand Down

0 comments on commit 9aaf5a4

Please sign in to comment.