From 20c39679ff946ff35061c1d2372442168b8a6214 Mon Sep 17 00:00:00 2001 From: ser-0xff <122270051+ser-0xff@users.noreply.github.com> Date: Wed, 7 Aug 2024 12:19:36 +0300 Subject: [PATCH 1/5] fix(minor): [sc-9450] Add possibility to fetch list of consumer groups. --- .../KafkaGroupConfiguration.swift | 37 ++++++++++ Sources/Kafka/KafkaGroup.swift | 73 +++++++++++++++++++ 2 files changed, 110 insertions(+) create mode 100644 Sources/Kafka/Configuration/KafkaGroupConfiguration.swift create mode 100644 Sources/Kafka/KafkaGroup.swift diff --git a/Sources/Kafka/Configuration/KafkaGroupConfiguration.swift b/Sources/Kafka/Configuration/KafkaGroupConfiguration.swift new file mode 100644 index 00000000..ef597fa6 --- /dev/null +++ b/Sources/Kafka/Configuration/KafkaGroupConfiguration.swift @@ -0,0 +1,37 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +public struct KafkaGroupConfiguration { + // MARK: - Common Client Config Properties + + /// Initial list of brokers. + /// Default: `[]` + public var bootstrapBrokerAddresses: [KafkaConfiguration.BrokerAddress] = [] + + public init(bootstrapBrokerAddresses: [KafkaConfiguration.BrokerAddress]) { + self.bootstrapBrokerAddresses = bootstrapBrokerAddresses + } +} + +// MARK: - KafkaGroupConfiguration + Dictionary + +extension KafkaGroupConfiguration { + internal var dictionary: [String: String] { + var resultDict: [String: String] = [:] + resultDict["bootstrap.servers"] = bootstrapBrokerAddresses.map(\.description).joined(separator: ",") + return resultDict + } +} + +extension KafkaGroupConfiguration: Sendable {} diff --git a/Sources/Kafka/KafkaGroup.swift b/Sources/Kafka/KafkaGroup.swift new file mode 100644 index 00000000..18363420 --- /dev/null +++ b/Sources/Kafka/KafkaGroup.swift @@ -0,0 +1,73 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Crdkafka + +public struct KafkaGroup { + public let name: String +} + +extension KafkaGroup { + public static func list(configuration: KafkaGroupConfiguration, + group: String? = nil, + retries: Int = 5, + timeout: Duration = .seconds(5)) throws -> [KafkaGroup] { + let configDictionary = configuration.dictionary + let rdConfig = try RDKafkaConfig.createFrom(configDictionary: configDictionary) + + let errorChars = UnsafeMutablePointer.allocate(capacity: RDKafkaClient.stringSize) + defer { errorChars.deallocate() } + + guard let kafkaHandle = rd_kafka_new(RD_KAFKA_PRODUCER, rdConfig, errorChars, RDKafkaClient.stringSize) else { + rd_kafka_conf_destroy(rdConfig) + let errorString = String(cString: errorChars) + throw KafkaError.client(reason: errorString) + } + defer { rd_kafka_destroy(kafkaHandle) } + + let rdGroup = group?.cString(using: .utf8) + let timeoutMs = Int32(timeout.inMilliseconds) + var err = RD_KAFKA_RESP_ERR_NO_ERROR + var grplist: UnsafePointer? = nil + var retries = min(retries, 1) + while true { + err = rd_kafka_list_groups(kafkaHandle, rdGroup, &grplist, timeoutMs) + if err == RD_KAFKA_RESP_ERR_NO_ERROR { + break + } + else if (err == RD_KAFKA_RESP_ERR__TRANSPORT) || (err == RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS) { + retries -= 1 + if retries == 0 { + throw KafkaError.rdKafkaError(wrapping: err) + } + } else { + throw KafkaError.rdKafkaError(wrapping: err) + } + } + + defer { rd_kafka_group_list_destroy(grplist) } + + if let grplist { + var groups = [KafkaGroup]() + for idx in 0.. Date: Wed, 7 Aug 2024 12:35:14 +0300 Subject: [PATCH 2/5] Propagate most group info. --- Sources/Kafka/KafkaGroup.swift | 12 +++++++-- Sources/Kafka/KafkaMetadataBroker.swift | 34 +++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) create mode 100644 Sources/Kafka/KafkaMetadataBroker.swift diff --git a/Sources/Kafka/KafkaGroup.swift b/Sources/Kafka/KafkaGroup.swift index 18363420..dd11f7e8 100644 --- a/Sources/Kafka/KafkaGroup.swift +++ b/Sources/Kafka/KafkaGroup.swift @@ -16,6 +16,10 @@ import Crdkafka public struct KafkaGroup { public let name: String + public let broker: KafkaMetadataBroker + public let state: String + public let protocolType: String + public let `protocol`: String } extension KafkaGroup { @@ -62,8 +66,12 @@ extension KafkaGroup { var groups = [KafkaGroup]() for idx in 0.. Date: Wed, 28 Aug 2024 12:22:34 +0300 Subject: [PATCH 3/5] Do not use deprecated API. --- .../ForTesting/RDKafkaClient+Group.swift | 80 +++++++++++++++++++ Sources/Kafka/KafkaError.swift | 21 +++++ Sources/Kafka/KafkaGroup.swift | 77 ++++-------------- 3 files changed, 116 insertions(+), 62 deletions(-) create mode 100644 Sources/Kafka/ForTesting/RDKafkaClient+Group.swift diff --git a/Sources/Kafka/ForTesting/RDKafkaClient+Group.swift b/Sources/Kafka/ForTesting/RDKafkaClient+Group.swift new file mode 100644 index 00000000..0409d7aa --- /dev/null +++ b/Sources/Kafka/ForTesting/RDKafkaClient+Group.swift @@ -0,0 +1,80 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Crdkafka +import Logging + +@_spi(Internal) +extension RDKafkaClient { + /// List client consumer groups in cluster. + /// Blocks for a maximum of `timeout` milliseconds. + /// - Parameter timeout: timeout + /// - Returns: Groups + /// - Throws: A ``KafkaError`` if kafka call fails. + public func _listGroups(timeout: Duration) throws -> [KafkaGroup] { + try withKafkaHandlePointer { kafkaHandle in + var adminOptions = rd_kafka_AdminOptions_new(kafkaHandle, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS) + defer { rd_kafka_AdminOptions_destroy(adminOptions) } + + let timeoutMs = Int32(timeout.inMilliseconds) + if timeoutMs > 0 { + let errorChars = UnsafeMutablePointer.allocate(capacity: RDKafkaClient.stringSize) + defer { errorChars.deallocate() } + let err = rd_kafka_AdminOptions_set_request_timeout(adminOptions, timeoutMs, errorChars, RDKafkaClient.stringSize) + if err != RD_KAFKA_RESP_ERR_NO_ERROR { + throw KafkaError.rdKafkaError(wrapping: err, errorMessage: String(cString: errorChars)) + } + } + + let resultQueue = rd_kafka_queue_new(kafkaHandle) + defer { rd_kafka_queue_destroy(resultQueue) } + + rd_kafka_ListConsumerGroups(kafkaHandle, adminOptions, resultQueue) + + guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeoutMs) else { + throw KafkaError.rdKafkaError(errorMessage: "rd_kafka_queue_poll() timed out") + } + defer { rd_kafka_event_destroy(resultEvent) } + + if rd_kafka_event_error(resultEvent) != RD_KAFKA_RESP_ERR_NO_ERROR { + let errorMessage = String(cString: rd_kafka_event_error_string(resultEvent)) + throw KafkaError.rdKafkaError(wrapping: rd_kafka_event_error(resultEvent), errorMessage: errorMessage) + } + + let result = rd_kafka_event_ListConsumerGroups_result(resultEvent) + + var ret = [KafkaGroup]() + var groupsCnt: size_t = 0 + let groups = rd_kafka_ListConsumerGroups_result_valid(result, &groupsCnt) + if let groups { + for idx in 0.. KafkaError { + let errorMessage = String(cString: rd_kafka_err2str(error)) + ": " + errorMessage + return KafkaError( + backing: .init( + code: .underlying, reason: errorMessage, file: file, line: line + ) + ) + } + + static func rdKafkaError( + errorMessage: String, isFatal: Bool = false, file: String = #fileID, line: UInt = #line + ) -> KafkaError { + return KafkaError( + backing: .init( + code: .underlying, reason: errorMessage, file: file, line: line + ) + ) + } + static func config( reason: String, file: String = #fileID, line: UInt = #line ) -> KafkaError { diff --git a/Sources/Kafka/KafkaGroup.swift b/Sources/Kafka/KafkaGroup.swift index dd11f7e8..7d94eeff 100644 --- a/Sources/Kafka/KafkaGroup.swift +++ b/Sources/Kafka/KafkaGroup.swift @@ -14,68 +14,21 @@ import Crdkafka -public struct KafkaGroup { - public let name: String - public let broker: KafkaMetadataBroker - public let state: String - public let protocolType: String - public let `protocol`: String -} - -extension KafkaGroup { - public static func list(configuration: KafkaGroupConfiguration, - group: String? = nil, - retries: Int = 5, - timeout: Duration = .seconds(5)) throws -> [KafkaGroup] { - let configDictionary = configuration.dictionary - let rdConfig = try RDKafkaConfig.createFrom(configDictionary: configDictionary) - - let errorChars = UnsafeMutablePointer.allocate(capacity: RDKafkaClient.stringSize) - defer { errorChars.deallocate() } - - guard let kafkaHandle = rd_kafka_new(RD_KAFKA_PRODUCER, rdConfig, errorChars, RDKafkaClient.stringSize) else { - rd_kafka_conf_destroy(rdConfig) - let errorString = String(cString: errorChars) - throw KafkaError.client(reason: errorString) - } - defer { rd_kafka_destroy(kafkaHandle) } +let uuu: Int = 0 - let rdGroup = group?.cString(using: .utf8) - let timeoutMs = Int32(timeout.inMilliseconds) - var err = RD_KAFKA_RESP_ERR_NO_ERROR - var grplist: UnsafePointer? = nil - var retries = min(retries, 1) - while true { - err = rd_kafka_list_groups(kafkaHandle, rdGroup, &grplist, timeoutMs) - if err == RD_KAFKA_RESP_ERR_NO_ERROR { - break - } - else if (err == RD_KAFKA_RESP_ERR__TRANSPORT) || (err == RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS) { - retries -= 1 - if retries == 0 { - throw KafkaError.rdKafkaError(wrapping: err) - } - } else { - throw KafkaError.rdKafkaError(wrapping: err) - } - } - - defer { rd_kafka_group_list_destroy(grplist) } - - if let grplist { - var groups = [KafkaGroup]() - for idx in 0.. Date: Wed, 28 Aug 2024 12:30:01 +0300 Subject: [PATCH 4/5] Remove useless files. --- .../KafkaGroupConfiguration.swift | 37 ------------------- Sources/Kafka/KafkaMetadataBroker.swift | 34 ----------------- 2 files changed, 71 deletions(-) delete mode 100644 Sources/Kafka/Configuration/KafkaGroupConfiguration.swift delete mode 100644 Sources/Kafka/KafkaMetadataBroker.swift diff --git a/Sources/Kafka/Configuration/KafkaGroupConfiguration.swift b/Sources/Kafka/Configuration/KafkaGroupConfiguration.swift deleted file mode 100644 index ef597fa6..00000000 --- a/Sources/Kafka/Configuration/KafkaGroupConfiguration.swift +++ /dev/null @@ -1,37 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the swift-kafka-client open source project -// -// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -public struct KafkaGroupConfiguration { - // MARK: - Common Client Config Properties - - /// Initial list of brokers. - /// Default: `[]` - public var bootstrapBrokerAddresses: [KafkaConfiguration.BrokerAddress] = [] - - public init(bootstrapBrokerAddresses: [KafkaConfiguration.BrokerAddress]) { - self.bootstrapBrokerAddresses = bootstrapBrokerAddresses - } -} - -// MARK: - KafkaGroupConfiguration + Dictionary - -extension KafkaGroupConfiguration { - internal var dictionary: [String: String] { - var resultDict: [String: String] = [:] - resultDict["bootstrap.servers"] = bootstrapBrokerAddresses.map(\.description).joined(separator: ",") - return resultDict - } -} - -extension KafkaGroupConfiguration: Sendable {} diff --git a/Sources/Kafka/KafkaMetadataBroker.swift b/Sources/Kafka/KafkaMetadataBroker.swift deleted file mode 100644 index 6c3ea76b..00000000 --- a/Sources/Kafka/KafkaMetadataBroker.swift +++ /dev/null @@ -1,34 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the swift-kafka-client open source project -// -// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Crdkafka - -/// A proxy structure for rd_kafka_metadata_broker_t -public struct KafkaMetadataBroker: Sendable, Hashable { - public let id: Int32 - public let host: String - public let port: Int32 - - public init(id: Int32, host: String, port: Int32) { - self.id = id - self.host = host - self.port = port - } - - init(_ rdMetadataBroker: rd_kafka_metadata_broker_t) { - self.id = rdMetadataBroker.id - self.host = String(cString: rdMetadataBroker.host) - self.port = rdMetadataBroker.port - } -} From abae3a3cc0c76237a3d60153a5ecd541d24a06e4 Mon Sep 17 00:00:00 2001 From: ser-0xff <122270051+ser-0xff@users.noreply.github.com> Date: Wed, 28 Aug 2024 12:53:37 +0300 Subject: [PATCH 5/5] Cosmetic fix. --- .../ForTesting/RDKafkaClient+Group.swift | 32 ++++++++++--------- Sources/Kafka/KafkaGroup.swift | 2 -- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/Sources/Kafka/ForTesting/RDKafkaClient+Group.swift b/Sources/Kafka/ForTesting/RDKafkaClient+Group.swift index 0409d7aa..418e3ced 100644 --- a/Sources/Kafka/ForTesting/RDKafkaClient+Group.swift +++ b/Sources/Kafka/ForTesting/RDKafkaClient+Group.swift @@ -54,24 +54,26 @@ extension RDKafkaClient { let result = rd_kafka_event_ListConsumerGroups_result(resultEvent) - var ret = [KafkaGroup]() var groupsCnt: size_t = 0 let groups = rd_kafka_ListConsumerGroups_result_valid(result, &groupsCnt) - if let groups { - for idx in 0..