Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/sc 2762/gsoc rebalance callback support #2

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 113 additions & 8 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ extension KafkaConsumerCloseOnTerminate: NIOAsyncSequenceProducerDelegate {
}

func didTerminate() {
self.stateMachine.withLockedValue { $0.messageSequenceTerminated(isMessageSequence: isMessageSequence) }
let eventSource = self.stateMachine.withLockedValue { $0.messageSequenceTerminated(isMessageSequence: isMessageSequence) }
guard let eventSource else {
return
}
eventSource.finish()
}
}

Expand Down Expand Up @@ -206,7 +210,7 @@ public final class KafkaConsumer: Sendable, Service {
) throws {
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))

var subscribedEvents: [RDKafkaEvent] = [.log, .fetch]
var subscribedEvents: [RDKafkaEvent] = [.log, .fetch, .rebalance /* TODO: add rebalance to config */]
// Only listen to offset commit events when autoCommit is false
if config.enableAutoCommit == false {
subscribedEvents.append(.offsetCommit)
Expand Down Expand Up @@ -246,7 +250,7 @@ public final class KafkaConsumer: Sendable, Service {
) throws -> (KafkaConsumer, KafkaConsumerEvents) {
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))

var subscribedEvents: [RDKafkaEvent] = [.log, .fetch]
var subscribedEvents: [RDKafkaEvent] = [.log, .fetch, .rebalance /* TODO: add rebalance to config */]
// Only listen to offset commit events when autoCommit is false
if config.enableAutoCommit == false {
subscribedEvents.append(.offsetCommit)
Expand Down Expand Up @@ -321,6 +325,49 @@ public final class KafkaConsumer: Sendable, Service {
try client.assign(topicPartitionList: assignment)
}
}

// FIXME: it can be internal, instead of that make some
// FIXME: `Rebalance` struct that would call client assign()/assignIncremental()/unassignIncremental()
public func assign(_ list: KafkaTopicList) throws {
let action = self.stateMachine.withLockedValue { $0.seekOrRebalance() }
switch action {
case .allowed(let client):
try client.assign(topicPartitionList: list.list)
case .denied(let err):
throw KafkaError.client(reason: err)
}
}

public func incrementalAssign(_ list: KafkaTopicList) throws {
let action = self.stateMachine.withLockedValue { $0.seekOrRebalance() }
switch action {
case .allowed(let client):
try client.incrementalAssign(topicPartitionList: list.list)
case .denied(let err):
throw KafkaError.client(reason: err)
}
}

public func incrementalUnassign(_ list: KafkaTopicList) throws {
let action = self.stateMachine.withLockedValue { $0.seekOrRebalance() }
switch action {
case .allowed(let client):
try client.incrementalUnassign(topicPartitionList: list.list)
case .denied(let err):
throw KafkaError.client(reason: err)
}
}

// TODO: add docc: timeout = 0 -> async (no errors reported)
public func seek(_ list: KafkaTopicList, timeout: Duration = .kafkaNoWaitTransaction) async throws {
let action = self.stateMachine.withLockedValue { $0.seekOrRebalance() }
switch action {
case .allowed(let client):
try await client.seek(topicPartitionList: list.list, timeout: timeout)
case .denied(let err):
throw KafkaError.client(reason: err)
}
}

/// Start polling Kafka for messages.
///
Expand Down Expand Up @@ -351,9 +398,8 @@ public final class KafkaConsumer: Sendable, Service {
eventSource?.finish()
throw error
}
case .statistics(let statistics):
_ = eventSource?.yield(.statistics(statistics))
default:
_ = eventSource?.yield(KafkaConsumerEvent(event))
break // Ignore
}
}
Expand All @@ -362,7 +408,37 @@ public final class KafkaConsumer: Sendable, Service {
// Ignore poll result.
// We are just polling to serve any remaining events queued inside of `librdkafka`.
// All remaining queued consumer messages will get dropped and not be committed (marked as read).
_ = client.eventPoll()
let events = client.eventPoll()
for event in events {
do { // FIXME: move to separate method?
if case .rebalance(let rdAction) = event {
let action = RebalanceAction.convert(from: rdAction)
logger.info("Event sequence terminated, perform default rebalance for \(action)")
switch action {
case .assign(let proto, let topics):
logger.info("Assign, proto [\(proto)], topics: \(topics)")
if case .cooperative = proto {
try self.incrementalAssign(topics)
}
else {
try self.assign(topics)
}
case .revoke(let proto, let topics):
logger.info("Revoke, proto [\(proto)], topics: \(topics)")
if case .cooperative = proto {
try self.incrementalUnassign(topics)
} else {
try self.assign(topics)
}
case .error(let proto, let topics, let err):
logger.info("Error, proto [\(proto)], topics: \(topics), err: \(err)")
}
}
} catch {
// FIXIME: fatalError?
logger.error("Could not perform default rebalance with error \(error)")
}
}
try await Task.sleep(for: self.config.pollInterval)
case .terminatePollLoop:
return
Expand Down Expand Up @@ -576,7 +652,7 @@ extension KafkaConsumer {

/// The messages asynchronous sequence was terminated.
/// All incoming messages will be dropped.
mutating func messageSequenceTerminated(isMessageSequence: Bool) {
mutating func messageSequenceTerminated(isMessageSequence: Bool) -> ProducerEvents.Source? {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
Expand All @@ -592,7 +668,8 @@ extension KafkaConsumer {
self.state = .consumptionStopped(client: client)
// If message sequence is being terminated, it means class deinit is called
// see `messages` field, it is last change to call finish for `eventSource`
eventSource?.finish()
// but we can't do it under mutex -> let's return
return eventSource
}
else {
// Messages are still consuming, only event source was finished
Expand All @@ -603,6 +680,7 @@ extension KafkaConsumer {
case .finishing, .finished:
break
}
return nil
}

/// Action to take when wanting to store a message offset (to be auto-committed by `librdkafka`).
Expand Down Expand Up @@ -658,6 +736,33 @@ extension KafkaConsumer {
return .throwClosedError
}
}

enum RebalanceAction {
/// Rebalance is still possible
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case allowed(
client: RDKafkaClient
)
/// Throw an error. The ``KafkaConsumer`` is closed.
case denied(error: String)
}


func seekOrRebalance() -> RebalanceAction {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .initializing:
fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets")
case .consumptionStopped(let client),
.consuming(let client, _, _),
.finishing(let client):
return .allowed(client: client)
case .finished:
return .denied(error: "Cannot perform reblance actions, consumer stopped")
}
}

/// Action to be taken when wanting to do close the consumer.
enum FinishAction {
Expand Down
138 changes: 137 additions & 1 deletion Sources/SwiftKafka/KafkaConsumerEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,153 @@
//
//===----------------------------------------------------------------------===//


// TODO: Move new structures to Rebalance:

public struct KafkaTopicList {
let list: RDKafkaTopicPartitionList

init(from: RDKafkaTopicPartitionList) {
self.list = from
}

public func append(topic: TopicPartition) {
self.list.setOffset(topic: topic.topic, partition: topic.partition, offset: topic.offset.rawValue)
}
}

extension KafkaTopicList : Sendable {}
extension KafkaTopicList : Hashable {}

extension KafkaTopicList : CustomDebugStringConvertible {
public var debugDescription: String {
list.debugDescription
}
}

extension KafkaTopicList : Sequence {
public struct TopicPartitionIterator : IteratorProtocol {
private let list: RDKafkaTopicPartitionList
private var idx = 0

init(list: RDKafkaTopicPartitionList) {
self.list = list
}

mutating public func next() -> TopicPartition? {
guard let topic = list.getByIdx(idx: idx) else {
return nil
}
idx += 1
return TopicPartition(from: topic)
}
}

public func makeIterator() -> TopicPartitionIterator {
TopicPartitionIterator(list: self.list)
}
}

public typealias KafkaOffset = RDKafkaOffset // Should we wrap it?


public struct TopicPartition: Sendable, Hashable {
public private(set) var topic: String
public private(set) var partition: KafkaPartition
public private(set) var offset: KafkaOffset

init(from topicPartition: RDKafkaTopicPartition) {
self.topic = topicPartition.topic
self.partition = topicPartition.partition
self.offset = KafkaOffset(rawValue: topicPartition.offset)
}

public init(_ topic: String, _ partition: KafkaPartition, _ offset: KafkaOffset = .beginning) {
self.topic = topic
self.partition = partition
self.offset = offset
}

public var name: String {
"\(topic):\(partition):\(offset)"
}
}

extension TopicPartition : CustomDebugStringConvertible {
public var debugDescription: String {
"\(topic):\(partition):\(offset)"
}
}
//
//public struct TopicPartition {
//
//
// let kafkaTopicPartition: RDKafkaTopicPartition
//
// init(kafkaTopicPartition: RDKafkaTopicPartition) {
// self.kafkaTopicPartition = kafkaTopicPartition
// }
//
// var topic: String {
// self.kafkaTopicPartition.topic
// }
//
// var partition: KafkaPartition {
// kafkaTopicPartition.partition
// }
//
// var offset: Int64 {
// kafkaTopicPartition.offset
// }
//}

public enum KafkaRebalanceProtocol: Sendable, Hashable {
case cooperative
case eager
case none

static func convert(from proto: RDKafkaRebalanceProtocol) -> KafkaRebalanceProtocol{
switch proto {
case .cooperative: return .cooperative
case .eager: return .eager
case .none: return .none
}
}
}

public enum RebalanceAction : Sendable, Hashable {
case assign(KafkaRebalanceProtocol, KafkaTopicList)
case revoke(KafkaRebalanceProtocol, KafkaTopicList)
case error(KafkaRebalanceProtocol, KafkaTopicList, KafkaError)

static func convert(from rebalance: RDKafkaRebalanceAction) -> RebalanceAction {
switch rebalance {
case .assign(let proto, let list):
return .assign(.convert(from: proto), .init(from: list))
case .revoke(let proto, let list):
return .revoke(.convert(from: proto), .init(from: list))
case .error(let proto, let list, let err):
return .error(.convert(from: proto), .init(from: list), err)
}
}
}


/// An enumeration representing events that can be received through the ``KafkaConsumerEvents`` asynchronous sequence.
public enum KafkaConsumerEvent: Sendable, Hashable {
/// Statistics from librdkafka
case statistics(KafkaStatistics)
/// Rebalance callback from Kafka
case rebalance(RebalanceAction)
/// - Important: Always provide a `default` case when switiching over this `enum`.
case DO_NOT_SWITCH_OVER_THIS_EXHAUSITVELY

internal init?(_ event: RDKafkaClient.KafkaEvent) {
internal init(_ event: RDKafkaClient.KafkaEvent) {
switch event {
case .statistics(let stat):
self = .statistics(stat)
case .rebalance(let action):
self = .rebalance(.convert(from: action))
case .deliveryReport:
fatalError("Cannot cast \(event) to KafkaConsumerEvent")
case .consumerMessages:
Expand Down
2 changes: 2 additions & 0 deletions Sources/SwiftKafka/KafkaProducerEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public enum KafkaProducerEvent: Sendable, Hashable {
self = .statistics(stat)
case .consumerMessages:
fatalError("Cannot cast \(event) to KafkaProducerEvent")
case .rebalance:
fatalError("Cannot cast \(event) to KafkaProducerEvent")
}
}
}
Loading