diff --git a/Sources/Kafka/ForTesting/RDKafkaClient+Group.swift b/Sources/Kafka/ForTesting/RDKafkaClient+Group.swift new file mode 100644 index 00000000..418e3ced --- /dev/null +++ b/Sources/Kafka/ForTesting/RDKafkaClient+Group.swift @@ -0,0 +1,82 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Crdkafka +import Logging + +@_spi(Internal) +extension RDKafkaClient { + /// List client consumer groups in cluster. + /// Blocks for a maximum of `timeout` milliseconds. + /// - Parameter timeout: timeout + /// - Returns: Groups + /// - Throws: A ``KafkaError`` if kafka call fails. + public func _listGroups(timeout: Duration) throws -> [KafkaGroup] { + try withKafkaHandlePointer { kafkaHandle in + var adminOptions = rd_kafka_AdminOptions_new(kafkaHandle, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS) + defer { rd_kafka_AdminOptions_destroy(adminOptions) } + + let timeoutMs = Int32(timeout.inMilliseconds) + if timeoutMs > 0 { + let errorChars = UnsafeMutablePointer.allocate(capacity: RDKafkaClient.stringSize) + defer { errorChars.deallocate() } + let err = rd_kafka_AdminOptions_set_request_timeout(adminOptions, timeoutMs, errorChars, RDKafkaClient.stringSize) + if err != RD_KAFKA_RESP_ERR_NO_ERROR { + throw KafkaError.rdKafkaError(wrapping: err, errorMessage: String(cString: errorChars)) + } + } + + let resultQueue = rd_kafka_queue_new(kafkaHandle) + defer { rd_kafka_queue_destroy(resultQueue) } + + rd_kafka_ListConsumerGroups(kafkaHandle, adminOptions, resultQueue) + + guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeoutMs) else { + throw KafkaError.rdKafkaError(errorMessage: "rd_kafka_queue_poll() timed out") + } + defer { rd_kafka_event_destroy(resultEvent) } + + if rd_kafka_event_error(resultEvent) != RD_KAFKA_RESP_ERR_NO_ERROR { + let errorMessage = String(cString: rd_kafka_event_error_string(resultEvent)) + throw KafkaError.rdKafkaError(wrapping: rd_kafka_event_error(resultEvent), errorMessage: errorMessage) + } + + let result = rd_kafka_event_ListConsumerGroups_result(resultEvent) + + var groupsCnt: size_t = 0 + let groups = rd_kafka_ListConsumerGroups_result_valid(result, &groupsCnt) + guard let groups else { + return [] + } + + var ret = [KafkaGroup]() + for idx in 0.. KafkaError { + let errorMessage = String(cString: rd_kafka_err2str(error)) + ": " + errorMessage + return KafkaError( + backing: .init( + code: .underlying, reason: errorMessage, file: file, line: line + ) + ) + } + + static func rdKafkaError( + errorMessage: String, isFatal: Bool = false, file: String = #fileID, line: UInt = #line + ) -> KafkaError { + return KafkaError( + backing: .init( + code: .underlying, reason: errorMessage, file: file, line: line + ) + ) + } + static func config( reason: String, file: String = #fileID, line: UInt = #line ) -> KafkaError { diff --git a/Sources/Kafka/KafkaGroup.swift b/Sources/Kafka/KafkaGroup.swift new file mode 100644 index 00000000..3f42d6e8 --- /dev/null +++ b/Sources/Kafka/KafkaGroup.swift @@ -0,0 +1,32 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Crdkafka + +public struct KafkaGroup { + /// Swift `enum` wrapping `librdkafka`'s `RD_KAFKA_CONSUMER_GROUP_STATE_*` values. + /// See `rd_kafka_consumer_group_state_t` in rdkafka.h for reference. + public enum State: UInt32 { + case unknown = 0 + case preparingRebalance = 1 + case completingRebalance = 2 + case stable = 3 + case dead = 4 + case empty = 5 + } + + public let name: String + public let state: State + public let isSimple: Bool +}