Skip to content

Commit

Permalink
fix(patch): [sc-14974] make kafka metadata sendable (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
blindspotbounty authored Oct 3, 2024
1 parent 9aaf5a4 commit dabd0b6
Showing 1 changed file with 17 additions and 42 deletions.
59 changes: 17 additions & 42 deletions Sources/Kafka/KafkaMetadata.swift
Original file line number Diff line number Diff line change
@@ -1,54 +1,29 @@
import Crdkafka

public final class KafkaMetadata {
private let metadata: UnsafePointer<rd_kafka_metadata>
public struct KafkaMetadata: Sendable {
public let topics: [KafkaTopicMetadata]

init(metadata: UnsafePointer<rd_kafka_metadata>) {
self.metadata = metadata
}

deinit {
rd_kafka_metadata_destroy(metadata)
self.topics = (0..<Int(metadata.pointee.topic_cnt)).map { KafkaTopicMetadata(topic: metadata.pointee.topics[$0]) }
}

public private(set) lazy var topics = {
(0..<Int(self.metadata.pointee.topic_cnt)).map { KafkaTopicMetadata(metadata: self, topic: self.metadata.pointee.topics[$0]) }
}()
}

// must be a class to allow mutating lazy vars, otherwise require struct copies
public final class KafkaTopicMetadata {
private let metadata: KafkaMetadata // retain metadata
private let topic: rd_kafka_metadata_topic

init(metadata: KafkaMetadata, topic: rd_kafka_metadata_topic) {
self.metadata = metadata
self.topic = topic
}
public struct KafkaTopicMetadata: Sendable {
public let name: String
public let partitions: [KafkaPartitionMetadata]

public private(set) lazy var name = {
String(cString: self.topic.topic)
}()

public private(set) lazy var partitions = {
(0..<Int(self.topic.partition_cnt)).map { KafkaPartitionMetadata(metadata: self.metadata, partition: topic.partitions[$0]) }
}()
init(topic: rd_kafka_metadata_topic) {
self.name = String(cString: topic.topic)
self.partitions = (0..<Int(topic.partition_cnt)).map { KafkaPartitionMetadata( topic.partitions[$0]) }
}
}

public struct KafkaPartitionMetadata {
private let metadata: KafkaMetadata // retain metadata
private let partition: rd_kafka_metadata_partition

init(metadata: KafkaMetadata, partition: rd_kafka_metadata_partition) {
self.metadata = metadata
self.partition = partition
}
public struct KafkaPartitionMetadata: Sendable {
public let id: Int
public let replicasCount: Int

var id: Int {
Int(partition.id)
}

var replicasCount: Int {
Int(partition.replica_cnt)
init(_ partition: rd_kafka_metadata_partition) {
self.id = Int(partition.id)
self.replicasCount = Int(partition.replica_cnt)
}
}

0 comments on commit dabd0b6

Please sign in to comment.