diff --git a/Sources/Kafka/ForTesting/RDKafkaClient+Group.swift b/Sources/Kafka/ForTesting/RDKafkaClient+Group.swift index 418e3ced..f40a47be 100644 --- a/Sources/Kafka/ForTesting/RDKafkaClient+Group.swift +++ b/Sources/Kafka/ForTesting/RDKafkaClient+Group.swift @@ -79,4 +79,71 @@ extension RDKafkaClient { return ret } } + + /// Delete client consumer groups from the cluster. + /// Blocks for a maximum of `timeout` milliseconds. + /// - Parameter groups: groups to delete + /// - Parameter timeout: timeout + /// - Returns: groups that could not be deleted with error message + /// - Throws: A ``KafkaError`` if kafka call fails. + public func _deleteGroups(_ groups: [String], timeout: Duration) throws -> [(groupId: String, errorMessage: String)] { + try withKafkaHandlePointer { kafkaHandle in + let delGroups = UnsafeMutablePointer.allocate(capacity: groups.count) + + for idx in 0.. 0 { + let errorChars = UnsafeMutablePointer.allocate(capacity: RDKafkaClient.stringSize) + defer { errorChars.deallocate() } + let err = rd_kafka_AdminOptions_set_request_timeout(adminOptions, timeoutMs, errorChars, RDKafkaClient.stringSize) + if err != RD_KAFKA_RESP_ERR_NO_ERROR { + throw KafkaError.rdKafkaError(wrapping: err, errorMessage: String(cString: errorChars)) + } + } + + let resultQueue = rd_kafka_queue_new(kafkaHandle) + defer { rd_kafka_queue_destroy(resultQueue) } + + rd_kafka_DeleteGroups(kafkaHandle, delGroups, groups.count, adminOptions, resultQueue); + + guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeoutMs) else { + throw KafkaError.rdKafkaError(errorMessage: "rd_kafka_queue_poll() timed out") + } + defer { rd_kafka_event_destroy(resultEvent) } + + if rd_kafka_event_error(resultEvent) != RD_KAFKA_RESP_ERR_NO_ERROR { + let errorMessage = String(cString: rd_kafka_event_error_string(resultEvent)) + throw KafkaError.rdKafkaError(wrapping: rd_kafka_event_error(resultEvent), errorMessage: errorMessage) + } + + var resultGroupsCnt = 0 + let resultGroups = rd_kafka_DeleteGroups_result_groups(resultEvent, &resultGroupsCnt); + guard let resultGroups else { + return [] + } + + var ret = [(String, String)]() + for idx in 0..