Skip to content

Commit

Permalink
fix(minor): [sc-9450] Add possibility to fetch list of consumer groups.
Browse files Browse the repository at this point in the history
  • Loading branch information
ser-0xff committed Aug 7, 2024
1 parent b04bb7e commit 20c3967
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 0 deletions.
37 changes: 37 additions & 0 deletions Sources/Kafka/Configuration/KafkaGroupConfiguration.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-client open source project
//
// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

public struct KafkaGroupConfiguration {
// MARK: - Common Client Config Properties

/// Initial list of brokers.
/// Default: `[]`
public var bootstrapBrokerAddresses: [KafkaConfiguration.BrokerAddress] = []

public init(bootstrapBrokerAddresses: [KafkaConfiguration.BrokerAddress]) {
self.bootstrapBrokerAddresses = bootstrapBrokerAddresses
}
}

// MARK: - KafkaGroupConfiguration + Dictionary

extension KafkaGroupConfiguration {
internal var dictionary: [String: String] {
var resultDict: [String: String] = [:]
resultDict["bootstrap.servers"] = bootstrapBrokerAddresses.map(\.description).joined(separator: ",")
return resultDict
}
}

extension KafkaGroupConfiguration: Sendable {}
73 changes: 73 additions & 0 deletions Sources/Kafka/KafkaGroup.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-client open source project
//
// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Crdkafka

public struct KafkaGroup {
public let name: String
}

extension KafkaGroup {
public static func list(configuration: KafkaGroupConfiguration,
group: String? = nil,
retries: Int = 5,
timeout: Duration = .seconds(5)) throws -> [KafkaGroup] {
let configDictionary = configuration.dictionary
let rdConfig = try RDKafkaConfig.createFrom(configDictionary: configDictionary)

let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: RDKafkaClient.stringSize)
defer { errorChars.deallocate() }

guard let kafkaHandle = rd_kafka_new(RD_KAFKA_PRODUCER, rdConfig, errorChars, RDKafkaClient.stringSize) else {
rd_kafka_conf_destroy(rdConfig)
let errorString = String(cString: errorChars)
throw KafkaError.client(reason: errorString)
}
defer { rd_kafka_destroy(kafkaHandle) }

let rdGroup = group?.cString(using: .utf8)
let timeoutMs = Int32(timeout.inMilliseconds)
var err = RD_KAFKA_RESP_ERR_NO_ERROR
var grplist: UnsafePointer<rd_kafka_group_list>? = nil
var retries = min(retries, 1)
while true {
err = rd_kafka_list_groups(kafkaHandle, rdGroup, &grplist, timeoutMs)
if err == RD_KAFKA_RESP_ERR_NO_ERROR {
break
}
else if (err == RD_KAFKA_RESP_ERR__TRANSPORT) || (err == RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS) {
retries -= 1
if retries == 0 {
throw KafkaError.rdKafkaError(wrapping: err)
}
} else {
throw KafkaError.rdKafkaError(wrapping: err)
}
}

defer { rd_kafka_group_list_destroy(grplist) }

if let grplist {
var groups = [KafkaGroup]()
for idx in 0..<Int(grplist.pointee.group_cnt) {
let rdGroupInfo = grplist.pointee.groups[idx]
let groupName = String(cString: rdGroupInfo.group)
groups.append(KafkaGroup(name: groupName))
}
return groups
} else {
return []
}
}
}

0 comments on commit 20c3967

Please sign in to comment.