Skip to content

Commit

Permalink
Merge pull request #42 from ordo-one/feature/sc-9450/create-order-aug…
Browse files Browse the repository at this point in the history
…mentation-plugin-delete-consumer-groups

feat(minor): [sc-9450] Add possibility to delete consumer groups.
  • Loading branch information
ser-0xff authored Aug 29, 2024
2 parents b2e3d1f + 8e77d2b commit ac88ceb
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions Sources/Kafka/ForTesting/RDKafkaClient+Group.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,71 @@ extension RDKafkaClient {
return ret
}
}

/// Delete client consumer groups from the cluster.
/// Blocks for a maximum of `timeout` milliseconds.
/// - Parameter groups: groups to delete
/// - Parameter timeout: timeout
/// - Returns: groups that could not be deleted with error message
/// - Throws: A ``KafkaError`` if kafka call fails.
public func _deleteGroups(_ groups: [String], timeout: Duration) throws -> [(groupId: String, errorMessage: String)] {
try withKafkaHandlePointer { kafkaHandle in
let delGroups = UnsafeMutablePointer<OpaquePointer?>.allocate(capacity: groups.count)

for idx in 0..<groups.count {
let group = groups[idx]
delGroups[idx] = rd_kafka_DeleteGroup_new(group.cString(using: .utf8))
}

defer {
rd_kafka_DeleteGroup_destroy_array(delGroups, groups.count)
delGroups.deallocate()
}

var adminOptions = rd_kafka_AdminOptions_new(kafkaHandle, RD_KAFKA_ADMIN_OP_DELETEGROUPS)
defer { rd_kafka_AdminOptions_destroy(adminOptions) }

let timeoutMs = Int32(timeout.inMilliseconds)
if timeoutMs > 0 {
let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: RDKafkaClient.stringSize)
defer { errorChars.deallocate() }
let err = rd_kafka_AdminOptions_set_request_timeout(adminOptions, timeoutMs, errorChars, RDKafkaClient.stringSize)
if err != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: err, errorMessage: String(cString: errorChars))
}
}

let resultQueue = rd_kafka_queue_new(kafkaHandle)
defer { rd_kafka_queue_destroy(resultQueue) }

rd_kafka_DeleteGroups(kafkaHandle, delGroups, groups.count, adminOptions, resultQueue);

guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeoutMs) else {
throw KafkaError.rdKafkaError(errorMessage: "rd_kafka_queue_poll() timed out")
}
defer { rd_kafka_event_destroy(resultEvent) }

if rd_kafka_event_error(resultEvent) != RD_KAFKA_RESP_ERR_NO_ERROR {
let errorMessage = String(cString: rd_kafka_event_error_string(resultEvent))
throw KafkaError.rdKafkaError(wrapping: rd_kafka_event_error(resultEvent), errorMessage: errorMessage)
}

var resultGroupsCnt = 0
let resultGroups = rd_kafka_DeleteGroups_result_groups(resultEvent, &resultGroupsCnt);
guard let resultGroups else {
return []
}

var ret = [(String, String)]()
for idx in 0..<resultGroupsCnt {
if let groupResultError = rd_kafka_group_result_error(resultGroups[idx]) {
let groupResultName = String(cString: rd_kafka_group_result_name(groupResultError))
let errorString = String(cString: rd_kafka_error_string(groupResultError))
ret.append((groupResultName, errorString))
}
}

return ret
}
}
}

0 comments on commit ac88ceb

Please sign in to comment.