Skip to content

Commit

Permalink
Merge pull request #41 from ordo-one/feature/sc-9450/create-order-aug…
Browse files Browse the repository at this point in the history
…mentation-plugin-groups-do-not-use-deprecated-api

fix(minor): [sc-9450] Add possibility to list consumer groups.
  • Loading branch information
ser-0xff authored Aug 28, 2024
2 parents 527a8a6 + abae3a3 commit 81ae1f9
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 0 deletions.
82 changes: 82 additions & 0 deletions Sources/Kafka/ForTesting/RDKafkaClient+Group.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-client open source project
//
// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Crdkafka
import Logging

@_spi(Internal)
extension RDKafkaClient {
/// List client consumer groups in cluster.
/// Blocks for a maximum of `timeout` milliseconds.
/// - Parameter timeout: timeout
/// - Returns: Groups
/// - Throws: A ``KafkaError`` if kafka call fails.
public func _listGroups(timeout: Duration) throws -> [KafkaGroup] {
try withKafkaHandlePointer { kafkaHandle in
var adminOptions = rd_kafka_AdminOptions_new(kafkaHandle, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS)
defer { rd_kafka_AdminOptions_destroy(adminOptions) }

let timeoutMs = Int32(timeout.inMilliseconds)
if timeoutMs > 0 {
let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: RDKafkaClient.stringSize)
defer { errorChars.deallocate() }
let err = rd_kafka_AdminOptions_set_request_timeout(adminOptions, timeoutMs, errorChars, RDKafkaClient.stringSize)
if err != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: err, errorMessage: String(cString: errorChars))
}
}

let resultQueue = rd_kafka_queue_new(kafkaHandle)
defer { rd_kafka_queue_destroy(resultQueue) }

rd_kafka_ListConsumerGroups(kafkaHandle, adminOptions, resultQueue)

guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeoutMs) else {
throw KafkaError.rdKafkaError(errorMessage: "rd_kafka_queue_poll() timed out")
}
defer { rd_kafka_event_destroy(resultEvent) }

if rd_kafka_event_error(resultEvent) != RD_KAFKA_RESP_ERR_NO_ERROR {
let errorMessage = String(cString: rd_kafka_event_error_string(resultEvent))
throw KafkaError.rdKafkaError(wrapping: rd_kafka_event_error(resultEvent), errorMessage: errorMessage)
}

let result = rd_kafka_event_ListConsumerGroups_result(resultEvent)

var groupsCnt: size_t = 0
let groups = rd_kafka_ListConsumerGroups_result_valid(result, &groupsCnt)
guard let groups else {
return []
}

var ret = [KafkaGroup]()
for idx in 0..<groupsCnt {
let group = groups[idx]
let groupId = rd_kafka_ConsumerGroupListing_group_id(group)
guard let groupId else {
throw KafkaError.rdKafkaError(errorMessage: "rd_kafka_ConsumerGroupListing_group_id() unexpectedly returned nil")
}
let kafkaGroupState = rd_kafka_ConsumerGroupListing_state(group)
guard let state = KafkaGroup.State(rawValue: kafkaGroupState.rawValue) else {
throw KafkaError.rdKafkaError(errorMessage: "unexpected value \(kafkaGroupState) for rd_kafka_consumer_group_state_t enumeration")
}
let isSimple = rd_kafka_ConsumerGroupListing_is_simple_consumer_group(group)
let kafkaGroup = KafkaGroup(name: String(cString: groupId), state: state, isSimple: isSimple != 0)
ret.append(kafkaGroup)
}

return ret
}
}
}
21 changes: 21 additions & 0 deletions Sources/Kafka/KafkaError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,27 @@ public struct KafkaError: Error, CustomStringConvertible, @unchecked Sendable {
)
}

static func rdKafkaError(
wrapping error: rd_kafka_resp_err_t, errorMessage: String, isFatal: Bool = false, file: String = #fileID, line: UInt = #line
) -> KafkaError {
let errorMessage = String(cString: rd_kafka_err2str(error)) + ": " + errorMessage
return KafkaError(
backing: .init(
code: .underlying, reason: errorMessage, file: file, line: line
)
)
}

static func rdKafkaError(
errorMessage: String, isFatal: Bool = false, file: String = #fileID, line: UInt = #line
) -> KafkaError {
return KafkaError(
backing: .init(
code: .underlying, reason: errorMessage, file: file, line: line
)
)
}

static func config(
reason: String, file: String = #fileID, line: UInt = #line
) -> KafkaError {
Expand Down
32 changes: 32 additions & 0 deletions Sources/Kafka/KafkaGroup.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-client open source project
//
// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Crdkafka

public struct KafkaGroup {
/// Swift `enum` wrapping `librdkafka`'s `RD_KAFKA_CONSUMER_GROUP_STATE_*` values.
/// See `rd_kafka_consumer_group_state_t` in rdkafka.h for reference.
public enum State: UInt32 {
case unknown = 0
case preparingRebalance = 1
case completingRebalance = 2
case stable = 3
case dead = 4
case empty = 5
}

public let name: String
public let state: State
public let isSimple: Bool
}

0 comments on commit 81ae1f9

Please sign in to comment.