diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index 6d0ce9189..61b5930aa 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -54,6 +54,7 @@ library kafka-protocol Kafka.Protocol.Encoding.Internal Kafka.Protocol.Encoding.Parser Kafka.Protocol.Message.Struct + Kafka.Protocol.Message.Total hs-source-dirs: protocol build-tool-depends: hpp:hpp >=0.6 && <0.7 diff --git a/hstream-kafka/protocol/Kafka/Protocol/Encoding.hs b/hstream-kafka/protocol/Kafka/Protocol/Encoding.hs index f12f77e3c..a69b2ecdb 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Encoding.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Encoding.hs @@ -50,6 +50,8 @@ module Kafka.Protocol.Encoding -- ** Misc , pattern NonNullKaArray , unNonNullKaArray + , kaArrayToCompact + , kaArrayFromCompact -- * Internals , Parser , runParser @@ -222,10 +224,16 @@ newtype KaArray a = KaArray { unKaArray :: Maybe (Vector a) } deriving newtype (Show, Eq, Ord) +instance Functor KaArray where + fmap f (KaArray xs) = KaArray $ fmap f <$> xs + newtype CompactKaArray a = CompactKaArray { unCompactKaArray :: Maybe (Vector a) } deriving newtype (Show, Eq, Ord) +instance Functor CompactKaArray where + fmap f (CompactKaArray xs) = CompactKaArray $ fmap f <$> xs + newtype RecordKey = RecordKey { unRecordKey :: Maybe ByteString } deriving newtype (Show, Eq, Ord) @@ -585,6 +593,14 @@ unNonNullKaArray :: KaArray a -> Vector a unNonNullKaArray = fromMaybe (error "non-nullable field was serialized as null") . unKaArray +kaArrayToCompact :: KaArray a -> CompactKaArray a +kaArrayToCompact = CompactKaArray . unKaArray +{-# INLINE kaArrayToCompact #-} + +kaArrayFromCompact :: CompactKaArray a -> KaArray a +kaArrayFromCompact = KaArray . unCompactKaArray +{-# INLINE kaArrayFromCompact #-} + ------------------------------------------------------------------------------- -- Internals diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message.hs b/hstream-kafka/protocol/Kafka/Protocol/Message.hs index 7dca3a4c4..8b09d8e54 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message.hs @@ -8,6 +8,7 @@ module Kafka.Protocol.Message , getResponseHeader , module Kafka.Protocol.Message.Struct + , module Kafka.Protocol.Message.Total ) where import qualified Data.ByteString.Lazy as BL @@ -15,6 +16,7 @@ import Data.Int import Kafka.Protocol.Encoding import Kafka.Protocol.Message.Struct +import Kafka.Protocol.Message.Total data RequestHeader = RequestHeader { requestApiKey :: {-# UNPACK #-} !ApiKey diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs index 4895a3387..895001e90 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs @@ -1,7 +1,7 @@ ------------------------------------------------------------------------------- -- Autogenerated by kafka message json schema -- --- $ ./script/kafka_gen.py run > hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +-- $ ./script/kafka_gen.py run -- -- DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs new file mode 100644 index 000000000..c411d7ef1 --- /dev/null +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs @@ -0,0 +1,2123 @@ +------------------------------------------------------------------------------- +-- Autogenerated by kafka message json schema +-- +-- $ ./script/kafka_gen.py run +-- +-- DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE TypeFamilies #-} + +module Kafka.Protocol.Message.Total where + +import Control.Exception +import Data.ByteString (ByteString) +import Data.Int +import Data.Text (Text) +import qualified Data.Vector as V +import GHC.Generics + +import Kafka.Protocol.Encoding +import Kafka.Protocol.Error +import Kafka.Protocol.Message.Struct + +------------------------------------------------------------------------------- + +data ApiVersion = ApiVersion + { apiKey :: {-# UNPACK #-} !ApiKey + -- ^ The API index. + , minVersion :: {-# UNPACK #-} !Int16 + -- ^ The minimum supported version, inclusive. + , maxVersion :: {-# UNPACK #-} !Int16 + -- ^ The maximum supported version, inclusive. + , taggedFields :: !TaggedFields + } deriving (Show, Eq, Generic) +instance Serializable ApiVersion + +apiVersionToV0 :: ApiVersion -> ApiVersionV0 +apiVersionToV0 x = ApiVersionV0 + { apiKey = x.apiKey + , minVersion = x.minVersion + , maxVersion = x.maxVersion + } +apiVersionToV1 :: ApiVersion -> ApiVersionV1 +apiVersionToV1 = apiVersionToV0 +apiVersionToV2 :: ApiVersion -> ApiVersionV2 +apiVersionToV2 = apiVersionToV0 +apiVersionToV3 :: ApiVersion -> ApiVersionV3 +apiVersionToV3 x = ApiVersionV3 + { apiKey = x.apiKey + , minVersion = x.minVersion + , maxVersion = x.maxVersion + , taggedFields = x.taggedFields + } + +apiVersionFromV0 :: ApiVersionV0 -> ApiVersion +apiVersionFromV0 x = ApiVersion + { apiKey = x.apiKey + , minVersion = x.minVersion + , maxVersion = x.maxVersion + , taggedFields = EmptyTaggedFields + } +apiVersionFromV1 :: ApiVersionV1 -> ApiVersion +apiVersionFromV1 = apiVersionFromV0 +apiVersionFromV2 :: ApiVersionV2 -> ApiVersion +apiVersionFromV2 = apiVersionFromV0 +apiVersionFromV3 :: ApiVersionV3 -> ApiVersion +apiVersionFromV3 x = ApiVersion + { apiKey = x.apiKey + , minVersion = x.minVersion + , maxVersion = x.maxVersion + , taggedFields = x.taggedFields + } + +data CreatableReplicaAssignment = CreatableReplicaAssignment + { partitionIndex :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , brokerIds :: !(KaArray Int32) + -- ^ The brokers to place the partition on. + } deriving (Show, Eq, Generic) +instance Serializable CreatableReplicaAssignment + +creatableReplicaAssignmentToV0 :: CreatableReplicaAssignment -> CreatableReplicaAssignmentV0 +creatableReplicaAssignmentToV0 x = CreatableReplicaAssignmentV0 + { partitionIndex = x.partitionIndex + , brokerIds = x.brokerIds + } + +creatableReplicaAssignmentFromV0 :: CreatableReplicaAssignmentV0 -> CreatableReplicaAssignment +creatableReplicaAssignmentFromV0 x = CreatableReplicaAssignment + { partitionIndex = x.partitionIndex + , brokerIds = x.brokerIds + } + +data CreatableTopic = CreatableTopic + { name :: !Text + -- ^ The topic name. + , numPartitions :: {-# UNPACK #-} !Int32 + -- ^ The number of partitions to create in the topic, or -1 if we are + -- either specifying a manual partition assignment or using the default + -- partitions. + , replicationFactor :: {-# UNPACK #-} !Int16 + -- ^ The number of replicas to create for each partition in the topic, or + -- -1 if we are either specifying a manual partition assignment or using + -- the default replication factor. + , assignments :: !(KaArray CreatableReplicaAssignment) + -- ^ The manual partition assignment, or the empty array if we are using + -- automatic assignment. + , configs :: !(KaArray CreateableTopicConfig) + -- ^ The custom topic configurations to set. + } deriving (Show, Eq, Generic) +instance Serializable CreatableTopic + +creatableTopicToV0 :: CreatableTopic -> CreatableTopicV0 +creatableTopicToV0 x = CreatableTopicV0 + { name = x.name + , numPartitions = x.numPartitions + , replicationFactor = x.replicationFactor + , assignments = fmap creatableReplicaAssignmentToV0 x.assignments + , configs = fmap createableTopicConfigToV0 x.configs + } + +creatableTopicFromV0 :: CreatableTopicV0 -> CreatableTopic +creatableTopicFromV0 x = CreatableTopic + { name = x.name + , numPartitions = x.numPartitions + , replicationFactor = x.replicationFactor + , assignments = fmap creatableReplicaAssignmentFromV0 x.assignments + , configs = fmap createableTopicConfigFromV0 x.configs + } + +data CreatableTopicResult = CreatableTopicResult + { name :: !Text + -- ^ The topic name. + , errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + } deriving (Show, Eq, Generic) +instance Serializable CreatableTopicResult + +creatableTopicResultToV0 :: CreatableTopicResult -> CreatableTopicResultV0 +creatableTopicResultToV0 x = CreatableTopicResultV0 + { name = x.name + , errorCode = x.errorCode + } + +creatableTopicResultFromV0 :: CreatableTopicResultV0 -> CreatableTopicResult +creatableTopicResultFromV0 x = CreatableTopicResult + { name = x.name + , errorCode = x.errorCode + } + +data CreateableTopicConfig = CreateableTopicConfig + { name :: !Text + -- ^ The configuration name. + , value :: !NullableString + -- ^ The configuration value. + } deriving (Show, Eq, Generic) +instance Serializable CreateableTopicConfig + +createableTopicConfigToV0 :: CreateableTopicConfig -> CreateableTopicConfigV0 +createableTopicConfigToV0 x = CreateableTopicConfigV0 + { name = x.name + , value = x.value + } + +createableTopicConfigFromV0 :: CreateableTopicConfigV0 -> CreateableTopicConfig +createableTopicConfigFromV0 x = CreateableTopicConfig + { name = x.name + , value = x.value + } + +data DeletableTopicResult = DeletableTopicResult + { name :: !Text + -- ^ The topic name + , errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The deletion error, or 0 if the deletion succeeded. + } deriving (Show, Eq, Generic) +instance Serializable DeletableTopicResult + +deletableTopicResultToV0 :: DeletableTopicResult -> DeletableTopicResultV0 +deletableTopicResultToV0 x = DeletableTopicResultV0 + { name = x.name + , errorCode = x.errorCode + } + +deletableTopicResultFromV0 :: DeletableTopicResultV0 -> DeletableTopicResult +deletableTopicResultFromV0 x = DeletableTopicResult + { name = x.name + , errorCode = x.errorCode + } + +data DescribedGroup = DescribedGroup + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The describe error, or 0 if there was no error. + , groupId :: !Text + -- ^ The group ID string. + , groupState :: !Text + -- ^ The group state string, or the empty string. + , protocolType :: !Text + -- ^ The group protocol type, or the empty string. + , protocolData :: !Text + -- ^ The group protocol data, or the empty string. + , members :: !(KaArray DescribedGroupMember) + -- ^ The group members. + } deriving (Show, Eq, Generic) +instance Serializable DescribedGroup + +describedGroupToV0 :: DescribedGroup -> DescribedGroupV0 +describedGroupToV0 x = DescribedGroupV0 + { errorCode = x.errorCode + , groupId = x.groupId + , groupState = x.groupState + , protocolType = x.protocolType + , protocolData = x.protocolData + , members = fmap describedGroupMemberToV0 x.members + } + +describedGroupFromV0 :: DescribedGroupV0 -> DescribedGroup +describedGroupFromV0 x = DescribedGroup + { errorCode = x.errorCode + , groupId = x.groupId + , groupState = x.groupState + , protocolType = x.protocolType + , protocolData = x.protocolData + , members = fmap describedGroupMemberFromV0 x.members + } + +data DescribedGroupMember = DescribedGroupMember + { memberId :: !Text + -- ^ The member ID assigned by the group coordinator. + , clientId :: !Text + -- ^ The client ID used in the member's latest join group request. + , clientHost :: !Text + -- ^ The client host. + , memberMetadata :: !ByteString + -- ^ The metadata corresponding to the current group protocol in use. + , memberAssignment :: !ByteString + -- ^ The current assignment provided by the group leader. + } deriving (Show, Eq, Generic) +instance Serializable DescribedGroupMember + +describedGroupMemberToV0 :: DescribedGroupMember -> DescribedGroupMemberV0 +describedGroupMemberToV0 x = DescribedGroupMemberV0 + { memberId = x.memberId + , clientId = x.clientId + , clientHost = x.clientHost + , memberMetadata = x.memberMetadata + , memberAssignment = x.memberAssignment + } + +describedGroupMemberFromV0 :: DescribedGroupMemberV0 -> DescribedGroupMember +describedGroupMemberFromV0 x = DescribedGroupMember + { memberId = x.memberId + , clientId = x.clientId + , clientHost = x.clientHost + , memberMetadata = x.memberMetadata + , memberAssignment = x.memberAssignment + } + +data FetchPartition = FetchPartition + { partition :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , fetchOffset :: {-# UNPACK #-} !Int64 + -- ^ The message offset. + , partitionMaxBytes :: {-# UNPACK #-} !Int32 + -- ^ The maximum bytes to fetch from this partition. See KIP-74 for cases + -- where this limit may not be honored. + } deriving (Show, Eq, Generic) +instance Serializable FetchPartition + +fetchPartitionToV2 :: FetchPartition -> FetchPartitionV2 +fetchPartitionToV2 x = FetchPartitionV2 + { partition = x.partition + , fetchOffset = x.fetchOffset + , partitionMaxBytes = x.partitionMaxBytes + } + +fetchPartitionFromV2 :: FetchPartitionV2 -> FetchPartition +fetchPartitionFromV2 x = FetchPartition + { partition = x.partition + , fetchOffset = x.fetchOffset + , partitionMaxBytes = x.partitionMaxBytes + } + +data FetchTopic = FetchTopic + { topic :: !Text + -- ^ The name of the topic to fetch. + , partitions :: !(KaArray FetchPartition) + -- ^ The partitions to fetch. + } deriving (Show, Eq, Generic) +instance Serializable FetchTopic + +fetchTopicToV2 :: FetchTopic -> FetchTopicV2 +fetchTopicToV2 x = FetchTopicV2 + { topic = x.topic + , partitions = fmap fetchPartitionToV2 x.partitions + } + +fetchTopicFromV2 :: FetchTopicV2 -> FetchTopic +fetchTopicFromV2 x = FetchTopic + { topic = x.topic + , partitions = fmap fetchPartitionFromV2 x.partitions + } + +data FetchableTopicResponse = FetchableTopicResponse + { topic :: !Text + -- ^ The topic name. + , partitions :: !(KaArray PartitionData) + -- ^ The topic partitions. + } deriving (Show, Eq, Generic) +instance Serializable FetchableTopicResponse + +fetchableTopicResponseToV2 :: FetchableTopicResponse -> FetchableTopicResponseV2 +fetchableTopicResponseToV2 x = FetchableTopicResponseV2 + { topic = x.topic + , partitions = fmap partitionDataToV2 x.partitions + } + +fetchableTopicResponseFromV2 :: FetchableTopicResponseV2 -> FetchableTopicResponse +fetchableTopicResponseFromV2 x = FetchableTopicResponse + { topic = x.topic + , partitions = fmap partitionDataFromV2 x.partitions + } + +data FinalizedFeatureKey = FinalizedFeatureKey + { name :: !CompactString + -- ^ The name of the feature. + , maxVersionLevel :: {-# UNPACK #-} !Int16 + -- ^ The cluster-wide finalized max version level for the feature. + , minVersionLevel :: {-# UNPACK #-} !Int16 + -- ^ The cluster-wide finalized min version level for the feature. + , taggedFields :: !TaggedFields + } deriving (Show, Eq, Generic) +instance Serializable FinalizedFeatureKey + +finalizedFeatureKeyToV3 :: FinalizedFeatureKey -> FinalizedFeatureKeyV3 +finalizedFeatureKeyToV3 x = FinalizedFeatureKeyV3 + { name = x.name + , maxVersionLevel = x.maxVersionLevel + , minVersionLevel = x.minVersionLevel + , taggedFields = x.taggedFields + } + +finalizedFeatureKeyFromV3 :: FinalizedFeatureKeyV3 -> FinalizedFeatureKey +finalizedFeatureKeyFromV3 x = FinalizedFeatureKey + { name = x.name + , maxVersionLevel = x.maxVersionLevel + , minVersionLevel = x.minVersionLevel + , taggedFields = x.taggedFields + } + +data JoinGroupRequestProtocol = JoinGroupRequestProtocol + { name :: !Text + -- ^ The protocol name. + , metadata :: !ByteString + -- ^ The protocol metadata. + } deriving (Show, Eq, Generic) +instance Serializable JoinGroupRequestProtocol + +joinGroupRequestProtocolToV0 :: JoinGroupRequestProtocol -> JoinGroupRequestProtocolV0 +joinGroupRequestProtocolToV0 x = JoinGroupRequestProtocolV0 + { name = x.name + , metadata = x.metadata + } + +joinGroupRequestProtocolFromV0 :: JoinGroupRequestProtocolV0 -> JoinGroupRequestProtocol +joinGroupRequestProtocolFromV0 x = JoinGroupRequestProtocol + { name = x.name + , metadata = x.metadata + } + +data JoinGroupResponseMember = JoinGroupResponseMember + { memberId :: !Text + -- ^ The group member ID. + , metadata :: !ByteString + -- ^ The group member metadata. + } deriving (Show, Eq, Generic) +instance Serializable JoinGroupResponseMember + +joinGroupResponseMemberToV0 :: JoinGroupResponseMember -> JoinGroupResponseMemberV0 +joinGroupResponseMemberToV0 x = JoinGroupResponseMemberV0 + { memberId = x.memberId + , metadata = x.metadata + } + +joinGroupResponseMemberFromV0 :: JoinGroupResponseMemberV0 -> JoinGroupResponseMember +joinGroupResponseMemberFromV0 x = JoinGroupResponseMember + { memberId = x.memberId + , metadata = x.metadata + } + +data ListOffsetsPartition = ListOffsetsPartition + { partitionIndex :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , timestamp :: {-# UNPACK #-} !Int64 + -- ^ The current timestamp. + , maxNumOffsets :: {-# UNPACK #-} !Int32 + -- ^ The maximum number of offsets to report. + } deriving (Show, Eq, Generic) +instance Serializable ListOffsetsPartition + +listOffsetsPartitionToV0 :: ListOffsetsPartition -> ListOffsetsPartitionV0 +listOffsetsPartitionToV0 x = ListOffsetsPartitionV0 + { partitionIndex = x.partitionIndex + , timestamp = x.timestamp + , maxNumOffsets = x.maxNumOffsets + } +listOffsetsPartitionToV1 :: ListOffsetsPartition -> ListOffsetsPartitionV1 +listOffsetsPartitionToV1 x = ListOffsetsPartitionV1 + { partitionIndex = x.partitionIndex + , timestamp = x.timestamp + } + +listOffsetsPartitionFromV0 :: ListOffsetsPartitionV0 -> ListOffsetsPartition +listOffsetsPartitionFromV0 x = ListOffsetsPartition + { partitionIndex = x.partitionIndex + , timestamp = x.timestamp + , maxNumOffsets = x.maxNumOffsets + } +listOffsetsPartitionFromV1 :: ListOffsetsPartitionV1 -> ListOffsetsPartition +listOffsetsPartitionFromV1 x = ListOffsetsPartition + { partitionIndex = x.partitionIndex + , timestamp = x.timestamp + , maxNumOffsets = 1 + } + +data ListOffsetsPartitionResponse = ListOffsetsPartitionResponse + { partitionIndex :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The partition error code, or 0 if there was no error. + , oldStyleOffsets :: !(KaArray Int64) + -- ^ The result offsets. + , timestamp :: {-# UNPACK #-} !Int64 + -- ^ The timestamp associated with the returned offset. + , offset :: {-# UNPACK #-} !Int64 + -- ^ The returned offset. + } deriving (Show, Eq, Generic) +instance Serializable ListOffsetsPartitionResponse + +listOffsetsPartitionResponseToV0 :: ListOffsetsPartitionResponse -> ListOffsetsPartitionResponseV0 +listOffsetsPartitionResponseToV0 x = ListOffsetsPartitionResponseV0 + { partitionIndex = x.partitionIndex + , errorCode = x.errorCode + , oldStyleOffsets = x.oldStyleOffsets + } +listOffsetsPartitionResponseToV1 :: ListOffsetsPartitionResponse -> ListOffsetsPartitionResponseV1 +listOffsetsPartitionResponseToV1 x = ListOffsetsPartitionResponseV1 + { partitionIndex = x.partitionIndex + , errorCode = x.errorCode + , timestamp = x.timestamp + , offset = x.offset + } + +listOffsetsPartitionResponseFromV0 :: ListOffsetsPartitionResponseV0 -> ListOffsetsPartitionResponse +listOffsetsPartitionResponseFromV0 x = ListOffsetsPartitionResponse + { partitionIndex = x.partitionIndex + , errorCode = x.errorCode + , oldStyleOffsets = x.oldStyleOffsets + , timestamp = (-1) + , offset = (-1) + } +listOffsetsPartitionResponseFromV1 :: ListOffsetsPartitionResponseV1 -> ListOffsetsPartitionResponse +listOffsetsPartitionResponseFromV1 x = ListOffsetsPartitionResponse + { partitionIndex = x.partitionIndex + , errorCode = x.errorCode + , oldStyleOffsets = KaArray (Just V.empty) + , timestamp = x.timestamp + , offset = x.offset + } + +data ListOffsetsTopic = ListOffsetsTopic + { name :: !Text + -- ^ The topic name. + , partitions :: !(KaArray ListOffsetsPartition) + -- ^ Each partition in the request. + } deriving (Show, Eq, Generic) +instance Serializable ListOffsetsTopic + +listOffsetsTopicToV0 :: ListOffsetsTopic -> ListOffsetsTopicV0 +listOffsetsTopicToV0 x = ListOffsetsTopicV0 + { name = x.name + , partitions = fmap listOffsetsPartitionToV0 x.partitions + } +listOffsetsTopicToV1 :: ListOffsetsTopic -> ListOffsetsTopicV1 +listOffsetsTopicToV1 x = ListOffsetsTopicV1 + { name = x.name + , partitions = fmap listOffsetsPartitionToV1 x.partitions + } + +listOffsetsTopicFromV0 :: ListOffsetsTopicV0 -> ListOffsetsTopic +listOffsetsTopicFromV0 x = ListOffsetsTopic + { name = x.name + , partitions = fmap listOffsetsPartitionFromV0 x.partitions + } +listOffsetsTopicFromV1 :: ListOffsetsTopicV1 -> ListOffsetsTopic +listOffsetsTopicFromV1 x = ListOffsetsTopic + { name = x.name + , partitions = fmap listOffsetsPartitionFromV1 x.partitions + } + +data ListOffsetsTopicResponse = ListOffsetsTopicResponse + { name :: !Text + -- ^ The topic name + , partitions :: !(KaArray ListOffsetsPartitionResponse) + -- ^ Each partition in the response. + } deriving (Show, Eq, Generic) +instance Serializable ListOffsetsTopicResponse + +listOffsetsTopicResponseToV0 :: ListOffsetsTopicResponse -> ListOffsetsTopicResponseV0 +listOffsetsTopicResponseToV0 x = ListOffsetsTopicResponseV0 + { name = x.name + , partitions = fmap listOffsetsPartitionResponseToV0 x.partitions + } +listOffsetsTopicResponseToV1 :: ListOffsetsTopicResponse -> ListOffsetsTopicResponseV1 +listOffsetsTopicResponseToV1 x = ListOffsetsTopicResponseV1 + { name = x.name + , partitions = fmap listOffsetsPartitionResponseToV1 x.partitions + } + +listOffsetsTopicResponseFromV0 :: ListOffsetsTopicResponseV0 -> ListOffsetsTopicResponse +listOffsetsTopicResponseFromV0 x = ListOffsetsTopicResponse + { name = x.name + , partitions = fmap listOffsetsPartitionResponseFromV0 x.partitions + } +listOffsetsTopicResponseFromV1 :: ListOffsetsTopicResponseV1 -> ListOffsetsTopicResponse +listOffsetsTopicResponseFromV1 x = ListOffsetsTopicResponse + { name = x.name + , partitions = fmap listOffsetsPartitionResponseFromV1 x.partitions + } + +data ListedGroup = ListedGroup + { groupId :: !Text + -- ^ The group ID. + , protocolType :: !Text + -- ^ The group protocol type. + } deriving (Show, Eq, Generic) +instance Serializable ListedGroup + +listedGroupToV0 :: ListedGroup -> ListedGroupV0 +listedGroupToV0 x = ListedGroupV0 + { groupId = x.groupId + , protocolType = x.protocolType + } + +listedGroupFromV0 :: ListedGroupV0 -> ListedGroup +listedGroupFromV0 x = ListedGroup + { groupId = x.groupId + , protocolType = x.protocolType + } + +newtype MetadataRequestTopic = MetadataRequestTopic + { name :: Text + } deriving (Show, Eq, Generic) +instance Serializable MetadataRequestTopic + +metadataRequestTopicToV0 :: MetadataRequestTopic -> MetadataRequestTopicV0 +metadataRequestTopicToV0 x = MetadataRequestTopicV0 + { name = x.name + } +metadataRequestTopicToV1 :: MetadataRequestTopic -> MetadataRequestTopicV1 +metadataRequestTopicToV1 = metadataRequestTopicToV0 +metadataRequestTopicToV2 :: MetadataRequestTopic -> MetadataRequestTopicV2 +metadataRequestTopicToV2 = metadataRequestTopicToV0 +metadataRequestTopicToV3 :: MetadataRequestTopic -> MetadataRequestTopicV3 +metadataRequestTopicToV3 = metadataRequestTopicToV0 +metadataRequestTopicToV4 :: MetadataRequestTopic -> MetadataRequestTopicV4 +metadataRequestTopicToV4 = metadataRequestTopicToV0 + +metadataRequestTopicFromV0 :: MetadataRequestTopicV0 -> MetadataRequestTopic +metadataRequestTopicFromV0 x = MetadataRequestTopic + { name = x.name + } +metadataRequestTopicFromV1 :: MetadataRequestTopicV1 -> MetadataRequestTopic +metadataRequestTopicFromV1 = metadataRequestTopicFromV0 +metadataRequestTopicFromV2 :: MetadataRequestTopicV2 -> MetadataRequestTopic +metadataRequestTopicFromV2 = metadataRequestTopicFromV0 +metadataRequestTopicFromV3 :: MetadataRequestTopicV3 -> MetadataRequestTopic +metadataRequestTopicFromV3 = metadataRequestTopicFromV0 +metadataRequestTopicFromV4 :: MetadataRequestTopicV4 -> MetadataRequestTopic +metadataRequestTopicFromV4 = metadataRequestTopicFromV0 + +data MetadataResponseBroker = MetadataResponseBroker + { nodeId :: {-# UNPACK #-} !Int32 + -- ^ The broker ID. + , host :: !Text + -- ^ The broker hostname. + , port :: {-# UNPACK #-} !Int32 + -- ^ The broker port. + , rack :: !NullableString + -- ^ The rack of the broker, or null if it has not been assigned to a rack. + } deriving (Show, Eq, Generic) +instance Serializable MetadataResponseBroker + +metadataResponseBrokerToV0 :: MetadataResponseBroker -> MetadataResponseBrokerV0 +metadataResponseBrokerToV0 x = MetadataResponseBrokerV0 + { nodeId = x.nodeId + , host = x.host + , port = x.port + } +metadataResponseBrokerToV1 :: MetadataResponseBroker -> MetadataResponseBrokerV1 +metadataResponseBrokerToV1 x = MetadataResponseBrokerV1 + { nodeId = x.nodeId + , host = x.host + , port = x.port + , rack = x.rack + } +metadataResponseBrokerToV2 :: MetadataResponseBroker -> MetadataResponseBrokerV2 +metadataResponseBrokerToV2 = metadataResponseBrokerToV1 +metadataResponseBrokerToV3 :: MetadataResponseBroker -> MetadataResponseBrokerV3 +metadataResponseBrokerToV3 = metadataResponseBrokerToV1 +metadataResponseBrokerToV4 :: MetadataResponseBroker -> MetadataResponseBrokerV4 +metadataResponseBrokerToV4 = metadataResponseBrokerToV1 + +metadataResponseBrokerFromV0 :: MetadataResponseBrokerV0 -> MetadataResponseBroker +metadataResponseBrokerFromV0 x = MetadataResponseBroker + { nodeId = x.nodeId + , host = x.host + , port = x.port + , rack = Nothing + } +metadataResponseBrokerFromV1 :: MetadataResponseBrokerV1 -> MetadataResponseBroker +metadataResponseBrokerFromV1 x = MetadataResponseBroker + { nodeId = x.nodeId + , host = x.host + , port = x.port + , rack = x.rack + } +metadataResponseBrokerFromV2 :: MetadataResponseBrokerV2 -> MetadataResponseBroker +metadataResponseBrokerFromV2 = metadataResponseBrokerFromV1 +metadataResponseBrokerFromV3 :: MetadataResponseBrokerV3 -> MetadataResponseBroker +metadataResponseBrokerFromV3 = metadataResponseBrokerFromV1 +metadataResponseBrokerFromV4 :: MetadataResponseBrokerV4 -> MetadataResponseBroker +metadataResponseBrokerFromV4 = metadataResponseBrokerFromV1 + +data MetadataResponsePartition = MetadataResponsePartition + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The partition error, or 0 if there was no error. + , partitionIndex :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , leaderId :: {-# UNPACK #-} !Int32 + -- ^ The ID of the leader broker. + , replicaNodes :: !(KaArray Int32) + -- ^ The set of all nodes that host this partition. + , isrNodes :: !(KaArray Int32) + -- ^ The set of nodes that are in sync with the leader for this partition. + } deriving (Show, Eq, Generic) +instance Serializable MetadataResponsePartition + +metadataResponsePartitionToV0 :: MetadataResponsePartition -> MetadataResponsePartitionV0 +metadataResponsePartitionToV0 x = MetadataResponsePartitionV0 + { errorCode = x.errorCode + , partitionIndex = x.partitionIndex + , leaderId = x.leaderId + , replicaNodes = x.replicaNodes + , isrNodes = x.isrNodes + } +metadataResponsePartitionToV1 :: MetadataResponsePartition -> MetadataResponsePartitionV1 +metadataResponsePartitionToV1 = metadataResponsePartitionToV0 +metadataResponsePartitionToV2 :: MetadataResponsePartition -> MetadataResponsePartitionV2 +metadataResponsePartitionToV2 = metadataResponsePartitionToV0 +metadataResponsePartitionToV3 :: MetadataResponsePartition -> MetadataResponsePartitionV3 +metadataResponsePartitionToV3 = metadataResponsePartitionToV0 +metadataResponsePartitionToV4 :: MetadataResponsePartition -> MetadataResponsePartitionV4 +metadataResponsePartitionToV4 = metadataResponsePartitionToV0 + +metadataResponsePartitionFromV0 :: MetadataResponsePartitionV0 -> MetadataResponsePartition +metadataResponsePartitionFromV0 x = MetadataResponsePartition + { errorCode = x.errorCode + , partitionIndex = x.partitionIndex + , leaderId = x.leaderId + , replicaNodes = x.replicaNodes + , isrNodes = x.isrNodes + } +metadataResponsePartitionFromV1 :: MetadataResponsePartitionV1 -> MetadataResponsePartition +metadataResponsePartitionFromV1 = metadataResponsePartitionFromV0 +metadataResponsePartitionFromV2 :: MetadataResponsePartitionV2 -> MetadataResponsePartition +metadataResponsePartitionFromV2 = metadataResponsePartitionFromV0 +metadataResponsePartitionFromV3 :: MetadataResponsePartitionV3 -> MetadataResponsePartition +metadataResponsePartitionFromV3 = metadataResponsePartitionFromV0 +metadataResponsePartitionFromV4 :: MetadataResponsePartitionV4 -> MetadataResponsePartition +metadataResponsePartitionFromV4 = metadataResponsePartitionFromV0 + +data MetadataResponseTopic = MetadataResponseTopic + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The topic error, or 0 if there was no error. + , name :: !Text + -- ^ The topic name. + , partitions :: !(KaArray MetadataResponsePartition) + -- ^ Each partition in the topic. + , isInternal :: Bool + -- ^ True if the topic is internal. + } deriving (Show, Eq, Generic) +instance Serializable MetadataResponseTopic + +metadataResponseTopicToV0 :: MetadataResponseTopic -> MetadataResponseTopicV0 +metadataResponseTopicToV0 x = MetadataResponseTopicV0 + { errorCode = x.errorCode + , name = x.name + , partitions = fmap metadataResponsePartitionToV0 x.partitions + } +metadataResponseTopicToV1 :: MetadataResponseTopic -> MetadataResponseTopicV1 +metadataResponseTopicToV1 x = MetadataResponseTopicV1 + { errorCode = x.errorCode + , name = x.name + , isInternal = x.isInternal + , partitions = fmap metadataResponsePartitionToV1 x.partitions + } +metadataResponseTopicToV2 :: MetadataResponseTopic -> MetadataResponseTopicV2 +metadataResponseTopicToV2 = metadataResponseTopicToV1 +metadataResponseTopicToV3 :: MetadataResponseTopic -> MetadataResponseTopicV3 +metadataResponseTopicToV3 = metadataResponseTopicToV1 +metadataResponseTopicToV4 :: MetadataResponseTopic -> MetadataResponseTopicV4 +metadataResponseTopicToV4 = metadataResponseTopicToV1 + +metadataResponseTopicFromV0 :: MetadataResponseTopicV0 -> MetadataResponseTopic +metadataResponseTopicFromV0 x = MetadataResponseTopic + { errorCode = x.errorCode + , name = x.name + , partitions = fmap metadataResponsePartitionFromV0 x.partitions + , isInternal = False + } +metadataResponseTopicFromV1 :: MetadataResponseTopicV1 -> MetadataResponseTopic +metadataResponseTopicFromV1 x = MetadataResponseTopic + { errorCode = x.errorCode + , name = x.name + , partitions = fmap metadataResponsePartitionFromV1 x.partitions + , isInternal = x.isInternal + } +metadataResponseTopicFromV2 :: MetadataResponseTopicV2 -> MetadataResponseTopic +metadataResponseTopicFromV2 = metadataResponseTopicFromV1 +metadataResponseTopicFromV3 :: MetadataResponseTopicV3 -> MetadataResponseTopic +metadataResponseTopicFromV3 = metadataResponseTopicFromV1 +metadataResponseTopicFromV4 :: MetadataResponseTopicV4 -> MetadataResponseTopic +metadataResponseTopicFromV4 = metadataResponseTopicFromV1 + +data OffsetCommitRequestPartition = OffsetCommitRequestPartition + { partitionIndex :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , committedOffset :: {-# UNPACK #-} !Int64 + -- ^ The message offset to be committed. + , committedMetadata :: !NullableString + -- ^ Any associated metadata the client wants to keep. + , commitTimestamp :: {-# UNPACK #-} !Int64 + -- ^ The timestamp of the commit. + } deriving (Show, Eq, Generic) +instance Serializable OffsetCommitRequestPartition + +offsetCommitRequestPartitionToV0 :: OffsetCommitRequestPartition -> OffsetCommitRequestPartitionV0 +offsetCommitRequestPartitionToV0 x = OffsetCommitRequestPartitionV0 + { partitionIndex = x.partitionIndex + , committedOffset = x.committedOffset + , committedMetadata = x.committedMetadata + } +offsetCommitRequestPartitionToV1 :: OffsetCommitRequestPartition -> OffsetCommitRequestPartitionV1 +offsetCommitRequestPartitionToV1 x = OffsetCommitRequestPartitionV1 + { partitionIndex = x.partitionIndex + , committedOffset = x.committedOffset + , commitTimestamp = x.commitTimestamp + , committedMetadata = x.committedMetadata + } +offsetCommitRequestPartitionToV2 :: OffsetCommitRequestPartition -> OffsetCommitRequestPartitionV2 +offsetCommitRequestPartitionToV2 = offsetCommitRequestPartitionToV0 + +offsetCommitRequestPartitionFromV0 :: OffsetCommitRequestPartitionV0 -> OffsetCommitRequestPartition +offsetCommitRequestPartitionFromV0 x = OffsetCommitRequestPartition + { partitionIndex = x.partitionIndex + , committedOffset = x.committedOffset + , committedMetadata = x.committedMetadata + , commitTimestamp = (-1) + } +offsetCommitRequestPartitionFromV1 :: OffsetCommitRequestPartitionV1 -> OffsetCommitRequestPartition +offsetCommitRequestPartitionFromV1 x = OffsetCommitRequestPartition + { partitionIndex = x.partitionIndex + , committedOffset = x.committedOffset + , committedMetadata = x.committedMetadata + , commitTimestamp = x.commitTimestamp + } +offsetCommitRequestPartitionFromV2 :: OffsetCommitRequestPartitionV2 -> OffsetCommitRequestPartition +offsetCommitRequestPartitionFromV2 = offsetCommitRequestPartitionFromV0 + +data OffsetCommitRequestTopic = OffsetCommitRequestTopic + { name :: !Text + -- ^ The topic name. + , partitions :: !(KaArray OffsetCommitRequestPartition) + -- ^ Each partition to commit offsets for. + } deriving (Show, Eq, Generic) +instance Serializable OffsetCommitRequestTopic + +offsetCommitRequestTopicToV0 :: OffsetCommitRequestTopic -> OffsetCommitRequestTopicV0 +offsetCommitRequestTopicToV0 x = OffsetCommitRequestTopicV0 + { name = x.name + , partitions = fmap offsetCommitRequestPartitionToV0 x.partitions + } +offsetCommitRequestTopicToV1 :: OffsetCommitRequestTopic -> OffsetCommitRequestTopicV1 +offsetCommitRequestTopicToV1 x = OffsetCommitRequestTopicV1 + { name = x.name + , partitions = fmap offsetCommitRequestPartitionToV1 x.partitions + } +offsetCommitRequestTopicToV2 :: OffsetCommitRequestTopic -> OffsetCommitRequestTopicV2 +offsetCommitRequestTopicToV2 = offsetCommitRequestTopicToV0 + +offsetCommitRequestTopicFromV0 :: OffsetCommitRequestTopicV0 -> OffsetCommitRequestTopic +offsetCommitRequestTopicFromV0 x = OffsetCommitRequestTopic + { name = x.name + , partitions = fmap offsetCommitRequestPartitionFromV0 x.partitions + } +offsetCommitRequestTopicFromV1 :: OffsetCommitRequestTopicV1 -> OffsetCommitRequestTopic +offsetCommitRequestTopicFromV1 x = OffsetCommitRequestTopic + { name = x.name + , partitions = fmap offsetCommitRequestPartitionFromV1 x.partitions + } +offsetCommitRequestTopicFromV2 :: OffsetCommitRequestTopicV2 -> OffsetCommitRequestTopic +offsetCommitRequestTopicFromV2 = offsetCommitRequestTopicFromV0 + +data OffsetCommitResponsePartition = OffsetCommitResponsePartition + { partitionIndex :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + } deriving (Show, Eq, Generic) +instance Serializable OffsetCommitResponsePartition + +offsetCommitResponsePartitionToV0 :: OffsetCommitResponsePartition -> OffsetCommitResponsePartitionV0 +offsetCommitResponsePartitionToV0 x = OffsetCommitResponsePartitionV0 + { partitionIndex = x.partitionIndex + , errorCode = x.errorCode + } +offsetCommitResponsePartitionToV1 :: OffsetCommitResponsePartition -> OffsetCommitResponsePartitionV1 +offsetCommitResponsePartitionToV1 = offsetCommitResponsePartitionToV0 +offsetCommitResponsePartitionToV2 :: OffsetCommitResponsePartition -> OffsetCommitResponsePartitionV2 +offsetCommitResponsePartitionToV2 = offsetCommitResponsePartitionToV0 + +offsetCommitResponsePartitionFromV0 :: OffsetCommitResponsePartitionV0 -> OffsetCommitResponsePartition +offsetCommitResponsePartitionFromV0 x = OffsetCommitResponsePartition + { partitionIndex = x.partitionIndex + , errorCode = x.errorCode + } +offsetCommitResponsePartitionFromV1 :: OffsetCommitResponsePartitionV1 -> OffsetCommitResponsePartition +offsetCommitResponsePartitionFromV1 = offsetCommitResponsePartitionFromV0 +offsetCommitResponsePartitionFromV2 :: OffsetCommitResponsePartitionV2 -> OffsetCommitResponsePartition +offsetCommitResponsePartitionFromV2 = offsetCommitResponsePartitionFromV0 + +data OffsetCommitResponseTopic = OffsetCommitResponseTopic + { name :: !Text + -- ^ The topic name. + , partitions :: !(KaArray OffsetCommitResponsePartition) + -- ^ The responses for each partition in the topic. + } deriving (Show, Eq, Generic) +instance Serializable OffsetCommitResponseTopic + +offsetCommitResponseTopicToV0 :: OffsetCommitResponseTopic -> OffsetCommitResponseTopicV0 +offsetCommitResponseTopicToV0 x = OffsetCommitResponseTopicV0 + { name = x.name + , partitions = fmap offsetCommitResponsePartitionToV0 x.partitions + } +offsetCommitResponseTopicToV1 :: OffsetCommitResponseTopic -> OffsetCommitResponseTopicV1 +offsetCommitResponseTopicToV1 = offsetCommitResponseTopicToV0 +offsetCommitResponseTopicToV2 :: OffsetCommitResponseTopic -> OffsetCommitResponseTopicV2 +offsetCommitResponseTopicToV2 = offsetCommitResponseTopicToV0 + +offsetCommitResponseTopicFromV0 :: OffsetCommitResponseTopicV0 -> OffsetCommitResponseTopic +offsetCommitResponseTopicFromV0 x = OffsetCommitResponseTopic + { name = x.name + , partitions = fmap offsetCommitResponsePartitionFromV0 x.partitions + } +offsetCommitResponseTopicFromV1 :: OffsetCommitResponseTopicV1 -> OffsetCommitResponseTopic +offsetCommitResponseTopicFromV1 = offsetCommitResponseTopicFromV0 +offsetCommitResponseTopicFromV2 :: OffsetCommitResponseTopicV2 -> OffsetCommitResponseTopic +offsetCommitResponseTopicFromV2 = offsetCommitResponseTopicFromV0 + +data OffsetFetchRequestTopic = OffsetFetchRequestTopic + { name :: !Text + -- ^ The topic name. + , partitionIndexes :: !(KaArray Int32) + -- ^ The partition indexes we would like to fetch offsets for. + } deriving (Show, Eq, Generic) +instance Serializable OffsetFetchRequestTopic + +offsetFetchRequestTopicToV0 :: OffsetFetchRequestTopic -> OffsetFetchRequestTopicV0 +offsetFetchRequestTopicToV0 x = OffsetFetchRequestTopicV0 + { name = x.name + , partitionIndexes = x.partitionIndexes + } +offsetFetchRequestTopicToV1 :: OffsetFetchRequestTopic -> OffsetFetchRequestTopicV1 +offsetFetchRequestTopicToV1 = offsetFetchRequestTopicToV0 +offsetFetchRequestTopicToV2 :: OffsetFetchRequestTopic -> OffsetFetchRequestTopicV2 +offsetFetchRequestTopicToV2 = offsetFetchRequestTopicToV0 + +offsetFetchRequestTopicFromV0 :: OffsetFetchRequestTopicV0 -> OffsetFetchRequestTopic +offsetFetchRequestTopicFromV0 x = OffsetFetchRequestTopic + { name = x.name + , partitionIndexes = x.partitionIndexes + } +offsetFetchRequestTopicFromV1 :: OffsetFetchRequestTopicV1 -> OffsetFetchRequestTopic +offsetFetchRequestTopicFromV1 = offsetFetchRequestTopicFromV0 +offsetFetchRequestTopicFromV2 :: OffsetFetchRequestTopicV2 -> OffsetFetchRequestTopic +offsetFetchRequestTopicFromV2 = offsetFetchRequestTopicFromV0 + +data OffsetFetchResponsePartition = OffsetFetchResponsePartition + { partitionIndex :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , committedOffset :: {-# UNPACK #-} !Int64 + -- ^ The committed message offset. + , metadata :: !NullableString + -- ^ The partition metadata. + , errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + } deriving (Show, Eq, Generic) +instance Serializable OffsetFetchResponsePartition + +offsetFetchResponsePartitionToV0 :: OffsetFetchResponsePartition -> OffsetFetchResponsePartitionV0 +offsetFetchResponsePartitionToV0 x = OffsetFetchResponsePartitionV0 + { partitionIndex = x.partitionIndex + , committedOffset = x.committedOffset + , metadata = x.metadata + , errorCode = x.errorCode + } +offsetFetchResponsePartitionToV1 :: OffsetFetchResponsePartition -> OffsetFetchResponsePartitionV1 +offsetFetchResponsePartitionToV1 = offsetFetchResponsePartitionToV0 +offsetFetchResponsePartitionToV2 :: OffsetFetchResponsePartition -> OffsetFetchResponsePartitionV2 +offsetFetchResponsePartitionToV2 = offsetFetchResponsePartitionToV0 + +offsetFetchResponsePartitionFromV0 :: OffsetFetchResponsePartitionV0 -> OffsetFetchResponsePartition +offsetFetchResponsePartitionFromV0 x = OffsetFetchResponsePartition + { partitionIndex = x.partitionIndex + , committedOffset = x.committedOffset + , metadata = x.metadata + , errorCode = x.errorCode + } +offsetFetchResponsePartitionFromV1 :: OffsetFetchResponsePartitionV1 -> OffsetFetchResponsePartition +offsetFetchResponsePartitionFromV1 = offsetFetchResponsePartitionFromV0 +offsetFetchResponsePartitionFromV2 :: OffsetFetchResponsePartitionV2 -> OffsetFetchResponsePartition +offsetFetchResponsePartitionFromV2 = offsetFetchResponsePartitionFromV0 + +data OffsetFetchResponseTopic = OffsetFetchResponseTopic + { name :: !Text + -- ^ The topic name. + , partitions :: !(KaArray OffsetFetchResponsePartition) + -- ^ The responses per partition + } deriving (Show, Eq, Generic) +instance Serializable OffsetFetchResponseTopic + +offsetFetchResponseTopicToV0 :: OffsetFetchResponseTopic -> OffsetFetchResponseTopicV0 +offsetFetchResponseTopicToV0 x = OffsetFetchResponseTopicV0 + { name = x.name + , partitions = fmap offsetFetchResponsePartitionToV0 x.partitions + } +offsetFetchResponseTopicToV1 :: OffsetFetchResponseTopic -> OffsetFetchResponseTopicV1 +offsetFetchResponseTopicToV1 = offsetFetchResponseTopicToV0 +offsetFetchResponseTopicToV2 :: OffsetFetchResponseTopic -> OffsetFetchResponseTopicV2 +offsetFetchResponseTopicToV2 = offsetFetchResponseTopicToV0 + +offsetFetchResponseTopicFromV0 :: OffsetFetchResponseTopicV0 -> OffsetFetchResponseTopic +offsetFetchResponseTopicFromV0 x = OffsetFetchResponseTopic + { name = x.name + , partitions = fmap offsetFetchResponsePartitionFromV0 x.partitions + } +offsetFetchResponseTopicFromV1 :: OffsetFetchResponseTopicV1 -> OffsetFetchResponseTopic +offsetFetchResponseTopicFromV1 = offsetFetchResponseTopicFromV0 +offsetFetchResponseTopicFromV2 :: OffsetFetchResponseTopicV2 -> OffsetFetchResponseTopic +offsetFetchResponseTopicFromV2 = offsetFetchResponseTopicFromV0 + +data PartitionData = PartitionData + { partitionIndex :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no fetch error. + , highWatermark :: {-# UNPACK #-} !Int64 + -- ^ The current high water mark. + , recordBytes :: !NullableBytes + -- ^ The record data. + } deriving (Show, Eq, Generic) +instance Serializable PartitionData + +partitionDataToV2 :: PartitionData -> PartitionDataV2 +partitionDataToV2 x = PartitionDataV2 + { partitionIndex = x.partitionIndex + , errorCode = x.errorCode + , highWatermark = x.highWatermark + , recordBytes = x.recordBytes + } + +partitionDataFromV2 :: PartitionDataV2 -> PartitionData +partitionDataFromV2 x = PartitionData + { partitionIndex = x.partitionIndex + , errorCode = x.errorCode + , highWatermark = x.highWatermark + , recordBytes = x.recordBytes + } + +data PartitionProduceData = PartitionProduceData + { index :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , recordBytes :: !NullableBytes + -- ^ The record data to be produced. + } deriving (Show, Eq, Generic) +instance Serializable PartitionProduceData + +partitionProduceDataToV2 :: PartitionProduceData -> PartitionProduceDataV2 +partitionProduceDataToV2 x = PartitionProduceDataV2 + { index = x.index + , recordBytes = x.recordBytes + } + +partitionProduceDataFromV2 :: PartitionProduceDataV2 -> PartitionProduceData +partitionProduceDataFromV2 x = PartitionProduceData + { index = x.index + , recordBytes = x.recordBytes + } + +data PartitionProduceResponse = PartitionProduceResponse + { index :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + , baseOffset :: {-# UNPACK #-} !Int64 + -- ^ The base offset. + , logAppendTimeMs :: {-# UNPACK #-} !Int64 + -- ^ The timestamp returned by broker after appending the messages. If + -- CreateTime is used for the topic, the timestamp will be -1. If + -- LogAppendTime is used for the topic, the timestamp will be the broker + -- local time when the messages are appended. + } deriving (Show, Eq, Generic) +instance Serializable PartitionProduceResponse + +partitionProduceResponseToV2 :: PartitionProduceResponse -> PartitionProduceResponseV2 +partitionProduceResponseToV2 x = PartitionProduceResponseV2 + { index = x.index + , errorCode = x.errorCode + , baseOffset = x.baseOffset + , logAppendTimeMs = x.logAppendTimeMs + } + +partitionProduceResponseFromV2 :: PartitionProduceResponseV2 -> PartitionProduceResponse +partitionProduceResponseFromV2 x = PartitionProduceResponse + { index = x.index + , errorCode = x.errorCode + , baseOffset = x.baseOffset + , logAppendTimeMs = x.logAppendTimeMs + } + +data SupportedFeatureKey = SupportedFeatureKey + { name :: !CompactString + -- ^ The name of the feature. + , minVersion :: {-# UNPACK #-} !Int16 + -- ^ The minimum supported version for the feature. + , maxVersion :: {-# UNPACK #-} !Int16 + -- ^ The maximum supported version for the feature. + , taggedFields :: !TaggedFields + } deriving (Show, Eq, Generic) +instance Serializable SupportedFeatureKey + +supportedFeatureKeyToV3 :: SupportedFeatureKey -> SupportedFeatureKeyV3 +supportedFeatureKeyToV3 x = SupportedFeatureKeyV3 + { name = x.name + , minVersion = x.minVersion + , maxVersion = x.maxVersion + , taggedFields = x.taggedFields + } + +supportedFeatureKeyFromV3 :: SupportedFeatureKeyV3 -> SupportedFeatureKey +supportedFeatureKeyFromV3 x = SupportedFeatureKey + { name = x.name + , minVersion = x.minVersion + , maxVersion = x.maxVersion + , taggedFields = x.taggedFields + } + +data SyncGroupRequestAssignment = SyncGroupRequestAssignment + { memberId :: !Text + -- ^ The ID of the member to assign. + , assignment :: !ByteString + -- ^ The member assignment. + } deriving (Show, Eq, Generic) +instance Serializable SyncGroupRequestAssignment + +syncGroupRequestAssignmentToV0 :: SyncGroupRequestAssignment -> SyncGroupRequestAssignmentV0 +syncGroupRequestAssignmentToV0 x = SyncGroupRequestAssignmentV0 + { memberId = x.memberId + , assignment = x.assignment + } + +syncGroupRequestAssignmentFromV0 :: SyncGroupRequestAssignmentV0 -> SyncGroupRequestAssignment +syncGroupRequestAssignmentFromV0 x = SyncGroupRequestAssignment + { memberId = x.memberId + , assignment = x.assignment + } + +data TopicProduceData = TopicProduceData + { name :: !Text + -- ^ The topic name. + , partitionData :: !(KaArray PartitionProduceData) + -- ^ Each partition to produce to. + } deriving (Show, Eq, Generic) +instance Serializable TopicProduceData + +topicProduceDataToV2 :: TopicProduceData -> TopicProduceDataV2 +topicProduceDataToV2 x = TopicProduceDataV2 + { name = x.name + , partitionData = fmap partitionProduceDataToV2 x.partitionData + } + +topicProduceDataFromV2 :: TopicProduceDataV2 -> TopicProduceData +topicProduceDataFromV2 x = TopicProduceData + { name = x.name + , partitionData = fmap partitionProduceDataFromV2 x.partitionData + } + +data TopicProduceResponse = TopicProduceResponse + { name :: !Text + -- ^ The topic name + , partitionResponses :: !(KaArray PartitionProduceResponse) + -- ^ Each partition that we produced to within the topic. + } deriving (Show, Eq, Generic) +instance Serializable TopicProduceResponse + +topicProduceResponseToV2 :: TopicProduceResponse -> TopicProduceResponseV2 +topicProduceResponseToV2 x = TopicProduceResponseV2 + { name = x.name + , partitionResponses = fmap partitionProduceResponseToV2 x.partitionResponses + } + +topicProduceResponseFromV2 :: TopicProduceResponseV2 -> TopicProduceResponse +topicProduceResponseFromV2 x = TopicProduceResponse + { name = x.name + , partitionResponses = fmap partitionProduceResponseFromV2 x.partitionResponses + } + +data ApiVersionsRequest = ApiVersionsRequest + { clientSoftwareName :: !CompactString + -- ^ The name of the client. + , clientSoftwareVersion :: !CompactString + -- ^ The version of the client. + , taggedFields :: !TaggedFields + } deriving (Show, Eq, Generic) +instance Serializable ApiVersionsRequest + +apiVersionsRequestToV0 :: ApiVersionsRequest -> ApiVersionsRequestV0 +apiVersionsRequestToV0 _ = ApiVersionsRequestV0 +apiVersionsRequestToV1 :: ApiVersionsRequest -> ApiVersionsRequestV1 +apiVersionsRequestToV1 = apiVersionsRequestToV0 +apiVersionsRequestToV2 :: ApiVersionsRequest -> ApiVersionsRequestV2 +apiVersionsRequestToV2 = apiVersionsRequestToV0 +apiVersionsRequestToV3 :: ApiVersionsRequest -> ApiVersionsRequestV3 +apiVersionsRequestToV3 x = ApiVersionsRequestV3 + { clientSoftwareName = x.clientSoftwareName + , clientSoftwareVersion = x.clientSoftwareVersion + , taggedFields = x.taggedFields + } + +apiVersionsRequestFromV0 :: ApiVersionsRequestV0 -> ApiVersionsRequest +apiVersionsRequestFromV0 _ = ApiVersionsRequest + { clientSoftwareName = "" + , clientSoftwareVersion = "" + , taggedFields = EmptyTaggedFields + } +apiVersionsRequestFromV1 :: ApiVersionsRequestV1 -> ApiVersionsRequest +apiVersionsRequestFromV1 = apiVersionsRequestFromV0 +apiVersionsRequestFromV2 :: ApiVersionsRequestV2 -> ApiVersionsRequest +apiVersionsRequestFromV2 = apiVersionsRequestFromV0 +apiVersionsRequestFromV3 :: ApiVersionsRequestV3 -> ApiVersionsRequest +apiVersionsRequestFromV3 x = ApiVersionsRequest + { clientSoftwareName = x.clientSoftwareName + , clientSoftwareVersion = x.clientSoftwareVersion + , taggedFields = x.taggedFields + } + +data ApiVersionsResponse = ApiVersionsResponse + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The top-level error code. + , apiKeys :: !(KaArray ApiVersion) + -- ^ The APIs supported by the broker. + , throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , taggedFields :: !TaggedFields + } deriving (Show, Eq, Generic) +instance Serializable ApiVersionsResponse + +apiVersionsResponseToV0 :: ApiVersionsResponse -> ApiVersionsResponseV0 +apiVersionsResponseToV0 x = ApiVersionsResponseV0 + { errorCode = x.errorCode + , apiKeys = fmap apiVersionToV0 x.apiKeys + } +apiVersionsResponseToV1 :: ApiVersionsResponse -> ApiVersionsResponseV1 +apiVersionsResponseToV1 x = ApiVersionsResponseV1 + { errorCode = x.errorCode + , apiKeys = fmap apiVersionToV1 x.apiKeys + , throttleTimeMs = x.throttleTimeMs + } +apiVersionsResponseToV2 :: ApiVersionsResponse -> ApiVersionsResponseV2 +apiVersionsResponseToV2 = apiVersionsResponseToV1 +apiVersionsResponseToV3 :: ApiVersionsResponse -> ApiVersionsResponseV3 +apiVersionsResponseToV3 x = ApiVersionsResponseV3 + { errorCode = x.errorCode + , apiKeys = fmap apiVersionToV3 (kaArrayToCompact x.apiKeys) + , throttleTimeMs = x.throttleTimeMs + , taggedFields = x.taggedFields + } + +apiVersionsResponseFromV0 :: ApiVersionsResponseV0 -> ApiVersionsResponse +apiVersionsResponseFromV0 x = ApiVersionsResponse + { errorCode = x.errorCode + , apiKeys = fmap apiVersionFromV0 x.apiKeys + , throttleTimeMs = 0 + , taggedFields = EmptyTaggedFields + } +apiVersionsResponseFromV1 :: ApiVersionsResponseV1 -> ApiVersionsResponse +apiVersionsResponseFromV1 x = ApiVersionsResponse + { errorCode = x.errorCode + , apiKeys = fmap apiVersionFromV1 x.apiKeys + , throttleTimeMs = x.throttleTimeMs + , taggedFields = EmptyTaggedFields + } +apiVersionsResponseFromV2 :: ApiVersionsResponseV2 -> ApiVersionsResponse +apiVersionsResponseFromV2 = apiVersionsResponseFromV1 +apiVersionsResponseFromV3 :: ApiVersionsResponseV3 -> ApiVersionsResponse +apiVersionsResponseFromV3 x = ApiVersionsResponse + { errorCode = x.errorCode + , apiKeys = fmap apiVersionFromV3 (kaArrayFromCompact x.apiKeys) + , throttleTimeMs = x.throttleTimeMs + , taggedFields = x.taggedFields + } + +data CreateTopicsRequest = CreateTopicsRequest + { topics :: !(KaArray CreatableTopic) + -- ^ The topics to create. + , timeoutMs :: {-# UNPACK #-} !Int32 + -- ^ How long to wait in milliseconds before timing out the request. + } deriving (Show, Eq, Generic) +instance Serializable CreateTopicsRequest + +createTopicsRequestToV0 :: CreateTopicsRequest -> CreateTopicsRequestV0 +createTopicsRequestToV0 x = CreateTopicsRequestV0 + { topics = fmap creatableTopicToV0 x.topics + , timeoutMs = x.timeoutMs + } + +createTopicsRequestFromV0 :: CreateTopicsRequestV0 -> CreateTopicsRequest +createTopicsRequestFromV0 x = CreateTopicsRequest + { topics = fmap creatableTopicFromV0 x.topics + , timeoutMs = x.timeoutMs + } + +newtype CreateTopicsResponse = CreateTopicsResponse + { topics :: (KaArray CreatableTopicResult) + } deriving (Show, Eq, Generic) +instance Serializable CreateTopicsResponse + +createTopicsResponseToV0 :: CreateTopicsResponse -> CreateTopicsResponseV0 +createTopicsResponseToV0 x = CreateTopicsResponseV0 + { topics = fmap creatableTopicResultToV0 x.topics + } + +createTopicsResponseFromV0 :: CreateTopicsResponseV0 -> CreateTopicsResponse +createTopicsResponseFromV0 x = CreateTopicsResponse + { topics = fmap creatableTopicResultFromV0 x.topics + } + +data DeleteTopicsRequest = DeleteTopicsRequest + { topicNames :: !(KaArray Text) + -- ^ The names of the topics to delete + , timeoutMs :: {-# UNPACK #-} !Int32 + -- ^ The length of time in milliseconds to wait for the deletions to + -- complete. + } deriving (Show, Eq, Generic) +instance Serializable DeleteTopicsRequest + +deleteTopicsRequestToV0 :: DeleteTopicsRequest -> DeleteTopicsRequestV0 +deleteTopicsRequestToV0 x = DeleteTopicsRequestV0 + { topicNames = x.topicNames + , timeoutMs = x.timeoutMs + } + +deleteTopicsRequestFromV0 :: DeleteTopicsRequestV0 -> DeleteTopicsRequest +deleteTopicsRequestFromV0 x = DeleteTopicsRequest + { topicNames = x.topicNames + , timeoutMs = x.timeoutMs + } + +newtype DeleteTopicsResponse = DeleteTopicsResponse + { responses :: (KaArray DeletableTopicResult) + } deriving (Show, Eq, Generic) +instance Serializable DeleteTopicsResponse + +deleteTopicsResponseToV0 :: DeleteTopicsResponse -> DeleteTopicsResponseV0 +deleteTopicsResponseToV0 x = DeleteTopicsResponseV0 + { responses = fmap deletableTopicResultToV0 x.responses + } + +deleteTopicsResponseFromV0 :: DeleteTopicsResponseV0 -> DeleteTopicsResponse +deleteTopicsResponseFromV0 x = DeleteTopicsResponse + { responses = fmap deletableTopicResultFromV0 x.responses + } + +newtype DescribeGroupsRequest = DescribeGroupsRequest + { groups :: (KaArray Text) + } deriving (Show, Eq, Generic) +instance Serializable DescribeGroupsRequest + +describeGroupsRequestToV0 :: DescribeGroupsRequest -> DescribeGroupsRequestV0 +describeGroupsRequestToV0 x = DescribeGroupsRequestV0 + { groups = x.groups + } + +describeGroupsRequestFromV0 :: DescribeGroupsRequestV0 -> DescribeGroupsRequest +describeGroupsRequestFromV0 x = DescribeGroupsRequest + { groups = x.groups + } + +newtype DescribeGroupsResponse = DescribeGroupsResponse + { groups :: (KaArray DescribedGroup) + } deriving (Show, Eq, Generic) +instance Serializable DescribeGroupsResponse + +describeGroupsResponseToV0 :: DescribeGroupsResponse -> DescribeGroupsResponseV0 +describeGroupsResponseToV0 x = DescribeGroupsResponseV0 + { groups = fmap describedGroupToV0 x.groups + } + +describeGroupsResponseFromV0 :: DescribeGroupsResponseV0 -> DescribeGroupsResponse +describeGroupsResponseFromV0 x = DescribeGroupsResponse + { groups = fmap describedGroupFromV0 x.groups + } + +data FetchRequest = FetchRequest + { replicaId :: {-# UNPACK #-} !Int32 + -- ^ The broker ID of the follower, of -1 if this request is from a + -- consumer. + , maxWaitMs :: {-# UNPACK #-} !Int32 + -- ^ The maximum time in milliseconds to wait for the response. + , minBytes :: {-# UNPACK #-} !Int32 + -- ^ The minimum bytes to accumulate in the response. + , topics :: !(KaArray FetchTopic) + -- ^ The topics to fetch. + } deriving (Show, Eq, Generic) +instance Serializable FetchRequest + +fetchRequestToV2 :: FetchRequest -> FetchRequestV2 +fetchRequestToV2 x = FetchRequestV2 + { replicaId = x.replicaId + , maxWaitMs = x.maxWaitMs + , minBytes = x.minBytes + , topics = fmap fetchTopicToV2 x.topics + } + +fetchRequestFromV2 :: FetchRequestV2 -> FetchRequest +fetchRequestFromV2 x = FetchRequest + { replicaId = x.replicaId + , maxWaitMs = x.maxWaitMs + , minBytes = x.minBytes + , topics = fmap fetchTopicFromV2 x.topics + } + +data FetchResponse = FetchResponse + { throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , responses :: !(KaArray FetchableTopicResponse) + -- ^ The response topics. + } deriving (Show, Eq, Generic) +instance Serializable FetchResponse + +fetchResponseToV2 :: FetchResponse -> FetchResponseV2 +fetchResponseToV2 x = FetchResponseV2 + { throttleTimeMs = x.throttleTimeMs + , responses = fmap fetchableTopicResponseToV2 x.responses + } + +fetchResponseFromV2 :: FetchResponseV2 -> FetchResponse +fetchResponseFromV2 x = FetchResponse + { throttleTimeMs = x.throttleTimeMs + , responses = fmap fetchableTopicResponseFromV2 x.responses + } + +newtype FindCoordinatorRequest = FindCoordinatorRequest + { key :: Text + } deriving (Show, Eq, Generic) +instance Serializable FindCoordinatorRequest + +findCoordinatorRequestToV0 :: FindCoordinatorRequest -> FindCoordinatorRequestV0 +findCoordinatorRequestToV0 x = FindCoordinatorRequestV0 + { key = x.key + } + +findCoordinatorRequestFromV0 :: FindCoordinatorRequestV0 -> FindCoordinatorRequest +findCoordinatorRequestFromV0 x = FindCoordinatorRequest + { key = x.key + } + +data FindCoordinatorResponse = FindCoordinatorResponse + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + , nodeId :: {-# UNPACK #-} !Int32 + -- ^ The node id. + , host :: !Text + -- ^ The host name. + , port :: {-# UNPACK #-} !Int32 + -- ^ The port. + } deriving (Show, Eq, Generic) +instance Serializable FindCoordinatorResponse + +findCoordinatorResponseToV0 :: FindCoordinatorResponse -> FindCoordinatorResponseV0 +findCoordinatorResponseToV0 x = FindCoordinatorResponseV0 + { errorCode = x.errorCode + , nodeId = x.nodeId + , host = x.host + , port = x.port + } + +findCoordinatorResponseFromV0 :: FindCoordinatorResponseV0 -> FindCoordinatorResponse +findCoordinatorResponseFromV0 x = FindCoordinatorResponse + { errorCode = x.errorCode + , nodeId = x.nodeId + , host = x.host + , port = x.port + } + +data HeartbeatRequest = HeartbeatRequest + { groupId :: !Text + -- ^ The group id. + , generationId :: {-# UNPACK #-} !Int32 + -- ^ The generation of the group. + , memberId :: !Text + -- ^ The member ID. + } deriving (Show, Eq, Generic) +instance Serializable HeartbeatRequest + +heartbeatRequestToV0 :: HeartbeatRequest -> HeartbeatRequestV0 +heartbeatRequestToV0 x = HeartbeatRequestV0 + { groupId = x.groupId + , generationId = x.generationId + , memberId = x.memberId + } + +heartbeatRequestFromV0 :: HeartbeatRequestV0 -> HeartbeatRequest +heartbeatRequestFromV0 x = HeartbeatRequest + { groupId = x.groupId + , generationId = x.generationId + , memberId = x.memberId + } + +newtype HeartbeatResponse = HeartbeatResponse + { errorCode :: ErrorCode + } deriving (Show, Eq, Generic) +instance Serializable HeartbeatResponse + +heartbeatResponseToV0 :: HeartbeatResponse -> HeartbeatResponseV0 +heartbeatResponseToV0 x = HeartbeatResponseV0 + { errorCode = x.errorCode + } + +heartbeatResponseFromV0 :: HeartbeatResponseV0 -> HeartbeatResponse +heartbeatResponseFromV0 x = HeartbeatResponse + { errorCode = x.errorCode + } + +data JoinGroupRequest = JoinGroupRequest + { groupId :: !Text + -- ^ The group identifier. + , sessionTimeoutMs :: {-# UNPACK #-} !Int32 + -- ^ The coordinator considers the consumer dead if it receives no + -- heartbeat after this timeout in milliseconds. + , memberId :: !Text + -- ^ The member id assigned by the group coordinator. + , protocolType :: !Text + -- ^ The unique name the for class of protocols implemented by the group we + -- want to join. + , protocols :: !(KaArray JoinGroupRequestProtocol) + -- ^ The list of protocols that the member supports. + } deriving (Show, Eq, Generic) +instance Serializable JoinGroupRequest + +joinGroupRequestToV0 :: JoinGroupRequest -> JoinGroupRequestV0 +joinGroupRequestToV0 x = JoinGroupRequestV0 + { groupId = x.groupId + , sessionTimeoutMs = x.sessionTimeoutMs + , memberId = x.memberId + , protocolType = x.protocolType + , protocols = fmap joinGroupRequestProtocolToV0 x.protocols + } + +joinGroupRequestFromV0 :: JoinGroupRequestV0 -> JoinGroupRequest +joinGroupRequestFromV0 x = JoinGroupRequest + { groupId = x.groupId + , sessionTimeoutMs = x.sessionTimeoutMs + , memberId = x.memberId + , protocolType = x.protocolType + , protocols = fmap joinGroupRequestProtocolFromV0 x.protocols + } + +data JoinGroupResponse = JoinGroupResponse + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + , generationId :: {-# UNPACK #-} !Int32 + -- ^ The generation ID of the group. + , protocolName :: !Text + -- ^ The group protocol selected by the coordinator. + , leader :: !Text + -- ^ The leader of the group. + , memberId :: !Text + -- ^ The member ID assigned by the group coordinator. + , members :: !(KaArray JoinGroupResponseMember) + } deriving (Show, Eq, Generic) +instance Serializable JoinGroupResponse + +joinGroupResponseToV0 :: JoinGroupResponse -> JoinGroupResponseV0 +joinGroupResponseToV0 x = JoinGroupResponseV0 + { errorCode = x.errorCode + , generationId = x.generationId + , protocolName = x.protocolName + , leader = x.leader + , memberId = x.memberId + , members = fmap joinGroupResponseMemberToV0 x.members + } + +joinGroupResponseFromV0 :: JoinGroupResponseV0 -> JoinGroupResponse +joinGroupResponseFromV0 x = JoinGroupResponse + { errorCode = x.errorCode + , generationId = x.generationId + , protocolName = x.protocolName + , leader = x.leader + , memberId = x.memberId + , members = fmap joinGroupResponseMemberFromV0 x.members + } + +data LeaveGroupRequest = LeaveGroupRequest + { groupId :: !Text + -- ^ The ID of the group to leave. + , memberId :: !Text + -- ^ The member ID to remove from the group. + } deriving (Show, Eq, Generic) +instance Serializable LeaveGroupRequest + +leaveGroupRequestToV0 :: LeaveGroupRequest -> LeaveGroupRequestV0 +leaveGroupRequestToV0 x = LeaveGroupRequestV0 + { groupId = x.groupId + , memberId = x.memberId + } + +leaveGroupRequestFromV0 :: LeaveGroupRequestV0 -> LeaveGroupRequest +leaveGroupRequestFromV0 x = LeaveGroupRequest + { groupId = x.groupId + , memberId = x.memberId + } + +newtype LeaveGroupResponse = LeaveGroupResponse + { errorCode :: ErrorCode + } deriving (Show, Eq, Generic) +instance Serializable LeaveGroupResponse + +leaveGroupResponseToV0 :: LeaveGroupResponse -> LeaveGroupResponseV0 +leaveGroupResponseToV0 x = LeaveGroupResponseV0 + { errorCode = x.errorCode + } + +leaveGroupResponseFromV0 :: LeaveGroupResponseV0 -> LeaveGroupResponse +leaveGroupResponseFromV0 x = LeaveGroupResponse + { errorCode = x.errorCode + } + +data ListGroupsRequest = ListGroupsRequest + deriving (Show, Eq, Generic) +instance Serializable ListGroupsRequest + +listGroupsRequestToV0 :: ListGroupsRequest -> ListGroupsRequestV0 +listGroupsRequestToV0 _ = ListGroupsRequestV0 + +listGroupsRequestFromV0 :: ListGroupsRequestV0 -> ListGroupsRequest +listGroupsRequestFromV0 _ = ListGroupsRequest + { + } + +data ListGroupsResponse = ListGroupsResponse + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + , groups :: !(KaArray ListedGroup) + -- ^ Each group in the response. + } deriving (Show, Eq, Generic) +instance Serializable ListGroupsResponse + +listGroupsResponseToV0 :: ListGroupsResponse -> ListGroupsResponseV0 +listGroupsResponseToV0 x = ListGroupsResponseV0 + { errorCode = x.errorCode + , groups = fmap listedGroupToV0 x.groups + } + +listGroupsResponseFromV0 :: ListGroupsResponseV0 -> ListGroupsResponse +listGroupsResponseFromV0 x = ListGroupsResponse + { errorCode = x.errorCode + , groups = fmap listedGroupFromV0 x.groups + } + +data ListOffsetsRequest = ListOffsetsRequest + { replicaId :: {-# UNPACK #-} !Int32 + -- ^ The broker ID of the requestor, or -1 if this request is being made by + -- a normal consumer. + , topics :: !(KaArray ListOffsetsTopic) + -- ^ Each topic in the request. + } deriving (Show, Eq, Generic) +instance Serializable ListOffsetsRequest + +listOffsetsRequestToV0 :: ListOffsetsRequest -> ListOffsetsRequestV0 +listOffsetsRequestToV0 x = ListOffsetsRequestV0 + { replicaId = x.replicaId + , topics = fmap listOffsetsTopicToV0 x.topics + } +listOffsetsRequestToV1 :: ListOffsetsRequest -> ListOffsetsRequestV1 +listOffsetsRequestToV1 x = ListOffsetsRequestV1 + { replicaId = x.replicaId + , topics = fmap listOffsetsTopicToV1 x.topics + } + +listOffsetsRequestFromV0 :: ListOffsetsRequestV0 -> ListOffsetsRequest +listOffsetsRequestFromV0 x = ListOffsetsRequest + { replicaId = x.replicaId + , topics = fmap listOffsetsTopicFromV0 x.topics + } +listOffsetsRequestFromV1 :: ListOffsetsRequestV1 -> ListOffsetsRequest +listOffsetsRequestFromV1 x = ListOffsetsRequest + { replicaId = x.replicaId + , topics = fmap listOffsetsTopicFromV1 x.topics + } + +newtype ListOffsetsResponse = ListOffsetsResponse + { topics :: (KaArray ListOffsetsTopicResponse) + } deriving (Show, Eq, Generic) +instance Serializable ListOffsetsResponse + +listOffsetsResponseToV0 :: ListOffsetsResponse -> ListOffsetsResponseV0 +listOffsetsResponseToV0 x = ListOffsetsResponseV0 + { topics = fmap listOffsetsTopicResponseToV0 x.topics + } +listOffsetsResponseToV1 :: ListOffsetsResponse -> ListOffsetsResponseV1 +listOffsetsResponseToV1 x = ListOffsetsResponseV1 + { topics = fmap listOffsetsTopicResponseToV1 x.topics + } + +listOffsetsResponseFromV0 :: ListOffsetsResponseV0 -> ListOffsetsResponse +listOffsetsResponseFromV0 x = ListOffsetsResponse + { topics = fmap listOffsetsTopicResponseFromV0 x.topics + } +listOffsetsResponseFromV1 :: ListOffsetsResponseV1 -> ListOffsetsResponse +listOffsetsResponseFromV1 x = ListOffsetsResponse + { topics = fmap listOffsetsTopicResponseFromV1 x.topics + } + +data MetadataRequest = MetadataRequest + { topics :: !(KaArray MetadataRequestTopic) + -- ^ The topics to fetch metadata for. + , allowAutoTopicCreation :: Bool + -- ^ If this is true, the broker may auto-create topics that we requested + -- which do not already exist, if it is configured to do so. + } deriving (Show, Eq, Generic) +instance Serializable MetadataRequest + +metadataRequestToV0 :: MetadataRequest -> MetadataRequestV0 +metadataRequestToV0 x = MetadataRequestV0 + { topics = fmap metadataRequestTopicToV0 x.topics + } +metadataRequestToV1 :: MetadataRequest -> MetadataRequestV1 +metadataRequestToV1 = metadataRequestToV0 +metadataRequestToV2 :: MetadataRequest -> MetadataRequestV2 +metadataRequestToV2 = metadataRequestToV0 +metadataRequestToV3 :: MetadataRequest -> MetadataRequestV3 +metadataRequestToV3 = metadataRequestToV0 +metadataRequestToV4 :: MetadataRequest -> MetadataRequestV4 +metadataRequestToV4 x = MetadataRequestV4 + { topics = fmap metadataRequestTopicToV4 x.topics + , allowAutoTopicCreation = x.allowAutoTopicCreation + } + +metadataRequestFromV0 :: MetadataRequestV0 -> MetadataRequest +metadataRequestFromV0 x = MetadataRequest + { topics = fmap metadataRequestTopicFromV0 x.topics + , allowAutoTopicCreation = True + } +metadataRequestFromV1 :: MetadataRequestV1 -> MetadataRequest +metadataRequestFromV1 = metadataRequestFromV0 +metadataRequestFromV2 :: MetadataRequestV2 -> MetadataRequest +metadataRequestFromV2 = metadataRequestFromV0 +metadataRequestFromV3 :: MetadataRequestV3 -> MetadataRequest +metadataRequestFromV3 = metadataRequestFromV0 +metadataRequestFromV4 :: MetadataRequestV4 -> MetadataRequest +metadataRequestFromV4 x = MetadataRequest + { topics = fmap metadataRequestTopicFromV4 x.topics + , allowAutoTopicCreation = x.allowAutoTopicCreation + } + +data MetadataResponse = MetadataResponse + { brokers :: !(KaArray MetadataResponseBroker) + -- ^ Each broker in the response. + , topics :: !(KaArray MetadataResponseTopic) + -- ^ Each topic in the response. + , controllerId :: {-# UNPACK #-} !Int32 + -- ^ The ID of the controller broker. + , clusterId :: !NullableString + -- ^ The cluster ID that responding broker belongs to. + , throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + } deriving (Show, Eq, Generic) +instance Serializable MetadataResponse + +metadataResponseToV0 :: MetadataResponse -> MetadataResponseV0 +metadataResponseToV0 x = MetadataResponseV0 + { brokers = fmap metadataResponseBrokerToV0 x.brokers + , topics = fmap metadataResponseTopicToV0 x.topics + } +metadataResponseToV1 :: MetadataResponse -> MetadataResponseV1 +metadataResponseToV1 x = MetadataResponseV1 + { brokers = fmap metadataResponseBrokerToV1 x.brokers + , controllerId = x.controllerId + , topics = fmap metadataResponseTopicToV1 x.topics + } +metadataResponseToV2 :: MetadataResponse -> MetadataResponseV2 +metadataResponseToV2 x = MetadataResponseV2 + { brokers = fmap metadataResponseBrokerToV2 x.brokers + , clusterId = x.clusterId + , controllerId = x.controllerId + , topics = fmap metadataResponseTopicToV2 x.topics + } +metadataResponseToV3 :: MetadataResponse -> MetadataResponseV3 +metadataResponseToV3 x = MetadataResponseV3 + { throttleTimeMs = x.throttleTimeMs + , brokers = fmap metadataResponseBrokerToV3 x.brokers + , clusterId = x.clusterId + , controllerId = x.controllerId + , topics = fmap metadataResponseTopicToV3 x.topics + } +metadataResponseToV4 :: MetadataResponse -> MetadataResponseV4 +metadataResponseToV4 = metadataResponseToV3 + +metadataResponseFromV0 :: MetadataResponseV0 -> MetadataResponse +metadataResponseFromV0 x = MetadataResponse + { brokers = fmap metadataResponseBrokerFromV0 x.brokers + , topics = fmap metadataResponseTopicFromV0 x.topics + , controllerId = (-1) + , clusterId = Nothing + , throttleTimeMs = 0 + } +metadataResponseFromV1 :: MetadataResponseV1 -> MetadataResponse +metadataResponseFromV1 x = MetadataResponse + { brokers = fmap metadataResponseBrokerFromV1 x.brokers + , topics = fmap metadataResponseTopicFromV1 x.topics + , controllerId = x.controllerId + , clusterId = Nothing + , throttleTimeMs = 0 + } +metadataResponseFromV2 :: MetadataResponseV2 -> MetadataResponse +metadataResponseFromV2 x = MetadataResponse + { brokers = fmap metadataResponseBrokerFromV2 x.brokers + , topics = fmap metadataResponseTopicFromV2 x.topics + , controllerId = x.controllerId + , clusterId = x.clusterId + , throttleTimeMs = 0 + } +metadataResponseFromV3 :: MetadataResponseV3 -> MetadataResponse +metadataResponseFromV3 x = MetadataResponse + { brokers = fmap metadataResponseBrokerFromV3 x.brokers + , topics = fmap metadataResponseTopicFromV3 x.topics + , controllerId = x.controllerId + , clusterId = x.clusterId + , throttleTimeMs = x.throttleTimeMs + } +metadataResponseFromV4 :: MetadataResponseV4 -> MetadataResponse +metadataResponseFromV4 = metadataResponseFromV3 + +data OffsetCommitRequest = OffsetCommitRequest + { groupId :: !Text + -- ^ The unique group identifier. + , topics :: !(KaArray OffsetCommitRequestTopic) + -- ^ The topics to commit offsets for. + , generationId :: {-# UNPACK #-} !Int32 + -- ^ The generation of the group. + , memberId :: !Text + -- ^ The member ID assigned by the group coordinator. + , retentionTimeMs :: {-# UNPACK #-} !Int64 + -- ^ The time period in ms to retain the offset. + } deriving (Show, Eq, Generic) +instance Serializable OffsetCommitRequest + +offsetCommitRequestToV0 :: OffsetCommitRequest -> OffsetCommitRequestV0 +offsetCommitRequestToV0 x = OffsetCommitRequestV0 + { groupId = x.groupId + , topics = fmap offsetCommitRequestTopicToV0 x.topics + } +offsetCommitRequestToV1 :: OffsetCommitRequest -> OffsetCommitRequestV1 +offsetCommitRequestToV1 x = OffsetCommitRequestV1 + { groupId = x.groupId + , generationId = x.generationId + , memberId = x.memberId + , topics = fmap offsetCommitRequestTopicToV1 x.topics + } +offsetCommitRequestToV2 :: OffsetCommitRequest -> OffsetCommitRequestV2 +offsetCommitRequestToV2 x = OffsetCommitRequestV2 + { groupId = x.groupId + , generationId = x.generationId + , memberId = x.memberId + , retentionTimeMs = x.retentionTimeMs + , topics = fmap offsetCommitRequestTopicToV2 x.topics + } + +offsetCommitRequestFromV0 :: OffsetCommitRequestV0 -> OffsetCommitRequest +offsetCommitRequestFromV0 x = OffsetCommitRequest + { groupId = x.groupId + , topics = fmap offsetCommitRequestTopicFromV0 x.topics + , generationId = (-1) + , memberId = "" + , retentionTimeMs = (-1) + } +offsetCommitRequestFromV1 :: OffsetCommitRequestV1 -> OffsetCommitRequest +offsetCommitRequestFromV1 x = OffsetCommitRequest + { groupId = x.groupId + , topics = fmap offsetCommitRequestTopicFromV1 x.topics + , generationId = x.generationId + , memberId = x.memberId + , retentionTimeMs = (-1) + } +offsetCommitRequestFromV2 :: OffsetCommitRequestV2 -> OffsetCommitRequest +offsetCommitRequestFromV2 x = OffsetCommitRequest + { groupId = x.groupId + , topics = fmap offsetCommitRequestTopicFromV2 x.topics + , generationId = x.generationId + , memberId = x.memberId + , retentionTimeMs = x.retentionTimeMs + } + +newtype OffsetCommitResponse = OffsetCommitResponse + { topics :: (KaArray OffsetCommitResponseTopic) + } deriving (Show, Eq, Generic) +instance Serializable OffsetCommitResponse + +offsetCommitResponseToV0 :: OffsetCommitResponse -> OffsetCommitResponseV0 +offsetCommitResponseToV0 x = OffsetCommitResponseV0 + { topics = fmap offsetCommitResponseTopicToV0 x.topics + } +offsetCommitResponseToV1 :: OffsetCommitResponse -> OffsetCommitResponseV1 +offsetCommitResponseToV1 = offsetCommitResponseToV0 +offsetCommitResponseToV2 :: OffsetCommitResponse -> OffsetCommitResponseV2 +offsetCommitResponseToV2 = offsetCommitResponseToV0 + +offsetCommitResponseFromV0 :: OffsetCommitResponseV0 -> OffsetCommitResponse +offsetCommitResponseFromV0 x = OffsetCommitResponse + { topics = fmap offsetCommitResponseTopicFromV0 x.topics + } +offsetCommitResponseFromV1 :: OffsetCommitResponseV1 -> OffsetCommitResponse +offsetCommitResponseFromV1 = offsetCommitResponseFromV0 +offsetCommitResponseFromV2 :: OffsetCommitResponseV2 -> OffsetCommitResponse +offsetCommitResponseFromV2 = offsetCommitResponseFromV0 + +data OffsetFetchRequest = OffsetFetchRequest + { groupId :: !Text + -- ^ The group to fetch offsets for. + , topics :: !(KaArray OffsetFetchRequestTopic) + -- ^ Each topic we would like to fetch offsets for, or null to fetch + -- offsets for all topics. + } deriving (Show, Eq, Generic) +instance Serializable OffsetFetchRequest + +offsetFetchRequestToV0 :: OffsetFetchRequest -> OffsetFetchRequestV0 +offsetFetchRequestToV0 x = OffsetFetchRequestV0 + { groupId = x.groupId + , topics = fmap offsetFetchRequestTopicToV0 x.topics + } +offsetFetchRequestToV1 :: OffsetFetchRequest -> OffsetFetchRequestV1 +offsetFetchRequestToV1 = offsetFetchRequestToV0 +offsetFetchRequestToV2 :: OffsetFetchRequest -> OffsetFetchRequestV2 +offsetFetchRequestToV2 = offsetFetchRequestToV0 + +offsetFetchRequestFromV0 :: OffsetFetchRequestV0 -> OffsetFetchRequest +offsetFetchRequestFromV0 x = OffsetFetchRequest + { groupId = x.groupId + , topics = fmap offsetFetchRequestTopicFromV0 x.topics + } +offsetFetchRequestFromV1 :: OffsetFetchRequestV1 -> OffsetFetchRequest +offsetFetchRequestFromV1 = offsetFetchRequestFromV0 +offsetFetchRequestFromV2 :: OffsetFetchRequestV2 -> OffsetFetchRequest +offsetFetchRequestFromV2 = offsetFetchRequestFromV0 + +data OffsetFetchResponse = OffsetFetchResponse + { topics :: !(KaArray OffsetFetchResponseTopic) + -- ^ The responses per topic. + , errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The top-level error code, or 0 if there was no error. + } deriving (Show, Eq, Generic) +instance Serializable OffsetFetchResponse + +offsetFetchResponseToV0 :: OffsetFetchResponse -> OffsetFetchResponseV0 +offsetFetchResponseToV0 x = OffsetFetchResponseV0 + { topics = fmap offsetFetchResponseTopicToV0 x.topics + } +offsetFetchResponseToV1 :: OffsetFetchResponse -> OffsetFetchResponseV1 +offsetFetchResponseToV1 = offsetFetchResponseToV0 +offsetFetchResponseToV2 :: OffsetFetchResponse -> OffsetFetchResponseV2 +offsetFetchResponseToV2 x = OffsetFetchResponseV2 + { topics = fmap offsetFetchResponseTopicToV2 x.topics + , errorCode = x.errorCode + } + +offsetFetchResponseFromV0 :: OffsetFetchResponseV0 -> OffsetFetchResponse +offsetFetchResponseFromV0 x = OffsetFetchResponse + { topics = fmap offsetFetchResponseTopicFromV0 x.topics + , errorCode = 0 + } +offsetFetchResponseFromV1 :: OffsetFetchResponseV1 -> OffsetFetchResponse +offsetFetchResponseFromV1 = offsetFetchResponseFromV0 +offsetFetchResponseFromV2 :: OffsetFetchResponseV2 -> OffsetFetchResponse +offsetFetchResponseFromV2 x = OffsetFetchResponse + { topics = fmap offsetFetchResponseTopicFromV2 x.topics + , errorCode = x.errorCode + } + +data ProduceRequest = ProduceRequest + { acks :: {-# UNPACK #-} !Int16 + -- ^ The number of acknowledgments the producer requires the leader to have + -- received before considering a request complete. Allowed values: 0 for no + -- acknowledgments, 1 for only the leader and -1 for the full ISR. + , timeoutMs :: {-# UNPACK #-} !Int32 + -- ^ The timeout to await a response in milliseconds. + , topicData :: !(KaArray TopicProduceData) + -- ^ Each topic to produce to. + } deriving (Show, Eq, Generic) +instance Serializable ProduceRequest + +produceRequestToV2 :: ProduceRequest -> ProduceRequestV2 +produceRequestToV2 x = ProduceRequestV2 + { acks = x.acks + , timeoutMs = x.timeoutMs + , topicData = fmap topicProduceDataToV2 x.topicData + } + +produceRequestFromV2 :: ProduceRequestV2 -> ProduceRequest +produceRequestFromV2 x = ProduceRequest + { acks = x.acks + , timeoutMs = x.timeoutMs + , topicData = fmap topicProduceDataFromV2 x.topicData + } + +data ProduceResponse = ProduceResponse + { responses :: !(KaArray TopicProduceResponse) + -- ^ Each produce response + , throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + } deriving (Show, Eq, Generic) +instance Serializable ProduceResponse + +produceResponseToV2 :: ProduceResponse -> ProduceResponseV2 +produceResponseToV2 x = ProduceResponseV2 + { responses = fmap topicProduceResponseToV2 x.responses + , throttleTimeMs = x.throttleTimeMs + } + +produceResponseFromV2 :: ProduceResponseV2 -> ProduceResponse +produceResponseFromV2 x = ProduceResponse + { responses = fmap topicProduceResponseFromV2 x.responses + , throttleTimeMs = x.throttleTimeMs + } + +data SyncGroupRequest = SyncGroupRequest + { groupId :: !Text + -- ^ The unique group identifier. + , generationId :: {-# UNPACK #-} !Int32 + -- ^ The generation of the group. + , memberId :: !Text + -- ^ The member ID assigned by the group. + , assignments :: !(KaArray SyncGroupRequestAssignment) + -- ^ Each assignment. + } deriving (Show, Eq, Generic) +instance Serializable SyncGroupRequest + +syncGroupRequestToV0 :: SyncGroupRequest -> SyncGroupRequestV0 +syncGroupRequestToV0 x = SyncGroupRequestV0 + { groupId = x.groupId + , generationId = x.generationId + , memberId = x.memberId + , assignments = fmap syncGroupRequestAssignmentToV0 x.assignments + } + +syncGroupRequestFromV0 :: SyncGroupRequestV0 -> SyncGroupRequest +syncGroupRequestFromV0 x = SyncGroupRequest + { groupId = x.groupId + , generationId = x.generationId + , memberId = x.memberId + , assignments = fmap syncGroupRequestAssignmentFromV0 x.assignments + } + +data SyncGroupResponse = SyncGroupResponse + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + , assignment :: !ByteString + -- ^ The member assignment. + } deriving (Show, Eq, Generic) +instance Serializable SyncGroupResponse + +syncGroupResponseToV0 :: SyncGroupResponse -> SyncGroupResponseV0 +syncGroupResponseToV0 x = SyncGroupResponseV0 + { errorCode = x.errorCode + , assignment = x.assignment + } + +syncGroupResponseFromV0 :: SyncGroupResponseV0 -> SyncGroupResponse +syncGroupResponseFromV0 x = SyncGroupResponse + { errorCode = x.errorCode + , assignment = x.assignment + } + +------------------------------------------------------------------------------- + +newtype ApiVersionsResponseEx = ApiVersionsResponseEx ApiVersionsResponse + deriving (Show, Eq) +instance Exception ApiVersionsResponseEx + +catchApiVersionsResponseEx :: IO ApiVersionsResponse -> IO ApiVersionsResponse +catchApiVersionsResponseEx act = act `catch` \(ApiVersionsResponseEx resp) -> pure resp + +newtype CreateTopicsResponseEx = CreateTopicsResponseEx CreateTopicsResponse + deriving (Show, Eq) +instance Exception CreateTopicsResponseEx + +catchCreateTopicsResponseEx :: IO CreateTopicsResponse -> IO CreateTopicsResponse +catchCreateTopicsResponseEx act = act `catch` \(CreateTopicsResponseEx resp) -> pure resp + +newtype DeleteTopicsResponseEx = DeleteTopicsResponseEx DeleteTopicsResponse + deriving (Show, Eq) +instance Exception DeleteTopicsResponseEx + +catchDeleteTopicsResponseEx :: IO DeleteTopicsResponse -> IO DeleteTopicsResponse +catchDeleteTopicsResponseEx act = act `catch` \(DeleteTopicsResponseEx resp) -> pure resp + +newtype DescribeGroupsResponseEx = DescribeGroupsResponseEx DescribeGroupsResponse + deriving (Show, Eq) +instance Exception DescribeGroupsResponseEx + +catchDescribeGroupsResponseEx :: IO DescribeGroupsResponse -> IO DescribeGroupsResponse +catchDescribeGroupsResponseEx act = act `catch` \(DescribeGroupsResponseEx resp) -> pure resp + +newtype FetchResponseEx = FetchResponseEx FetchResponse + deriving (Show, Eq) +instance Exception FetchResponseEx + +catchFetchResponseEx :: IO FetchResponse -> IO FetchResponse +catchFetchResponseEx act = act `catch` \(FetchResponseEx resp) -> pure resp + +newtype FindCoordinatorResponseEx = FindCoordinatorResponseEx FindCoordinatorResponse + deriving (Show, Eq) +instance Exception FindCoordinatorResponseEx + +catchFindCoordinatorResponseEx :: IO FindCoordinatorResponse -> IO FindCoordinatorResponse +catchFindCoordinatorResponseEx act = act `catch` \(FindCoordinatorResponseEx resp) -> pure resp + +newtype HeartbeatResponseEx = HeartbeatResponseEx HeartbeatResponse + deriving (Show, Eq) +instance Exception HeartbeatResponseEx + +catchHeartbeatResponseEx :: IO HeartbeatResponse -> IO HeartbeatResponse +catchHeartbeatResponseEx act = act `catch` \(HeartbeatResponseEx resp) -> pure resp + +newtype JoinGroupResponseEx = JoinGroupResponseEx JoinGroupResponse + deriving (Show, Eq) +instance Exception JoinGroupResponseEx + +catchJoinGroupResponseEx :: IO JoinGroupResponse -> IO JoinGroupResponse +catchJoinGroupResponseEx act = act `catch` \(JoinGroupResponseEx resp) -> pure resp + +newtype LeaveGroupResponseEx = LeaveGroupResponseEx LeaveGroupResponse + deriving (Show, Eq) +instance Exception LeaveGroupResponseEx + +catchLeaveGroupResponseEx :: IO LeaveGroupResponse -> IO LeaveGroupResponse +catchLeaveGroupResponseEx act = act `catch` \(LeaveGroupResponseEx resp) -> pure resp + +newtype ListGroupsResponseEx = ListGroupsResponseEx ListGroupsResponse + deriving (Show, Eq) +instance Exception ListGroupsResponseEx + +catchListGroupsResponseEx :: IO ListGroupsResponse -> IO ListGroupsResponse +catchListGroupsResponseEx act = act `catch` \(ListGroupsResponseEx resp) -> pure resp + +newtype ListOffsetsResponseEx = ListOffsetsResponseEx ListOffsetsResponse + deriving (Show, Eq) +instance Exception ListOffsetsResponseEx + +catchListOffsetsResponseEx :: IO ListOffsetsResponse -> IO ListOffsetsResponse +catchListOffsetsResponseEx act = act `catch` \(ListOffsetsResponseEx resp) -> pure resp + +newtype MetadataResponseEx = MetadataResponseEx MetadataResponse + deriving (Show, Eq) +instance Exception MetadataResponseEx + +catchMetadataResponseEx :: IO MetadataResponse -> IO MetadataResponse +catchMetadataResponseEx act = act `catch` \(MetadataResponseEx resp) -> pure resp + +newtype OffsetCommitResponseEx = OffsetCommitResponseEx OffsetCommitResponse + deriving (Show, Eq) +instance Exception OffsetCommitResponseEx + +catchOffsetCommitResponseEx :: IO OffsetCommitResponse -> IO OffsetCommitResponse +catchOffsetCommitResponseEx act = act `catch` \(OffsetCommitResponseEx resp) -> pure resp + +newtype OffsetFetchResponseEx = OffsetFetchResponseEx OffsetFetchResponse + deriving (Show, Eq) +instance Exception OffsetFetchResponseEx + +catchOffsetFetchResponseEx :: IO OffsetFetchResponse -> IO OffsetFetchResponse +catchOffsetFetchResponseEx act = act `catch` \(OffsetFetchResponseEx resp) -> pure resp + +newtype ProduceResponseEx = ProduceResponseEx ProduceResponse + deriving (Show, Eq) +instance Exception ProduceResponseEx + +catchProduceResponseEx :: IO ProduceResponse -> IO ProduceResponse +catchProduceResponseEx act = act `catch` \(ProduceResponseEx resp) -> pure resp + +newtype SyncGroupResponseEx = SyncGroupResponseEx SyncGroupResponse + deriving (Show, Eq) +instance Exception SyncGroupResponseEx + +catchSyncGroupResponseEx :: IO SyncGroupResponse -> IO SyncGroupResponse +catchSyncGroupResponseEx act = act `catch` \(SyncGroupResponseEx resp) -> pure resp diff --git a/script/kafka_gen.py b/script/kafka_gen.py index c6b46c63a..5cbb7adaa 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 # PYTHON_ARGCOMPLETE_OK +# Requires Python >= 3.9 +# # Parse Apache Kafka Message Definitions # # See: https://github.com/apache/kafka/tree/3.5/clients/src/main/resources/common/message @@ -10,14 +12,16 @@ # - handle ignorable # - handle entityType -import argparse from dataclasses import dataclass -import os -import json from glob import glob -import subprocess -import re from typing import List, Optional +import argparse +import itertools +import json +import os +import pathlib +import re +import subprocess import textwrap # If you want to enable tab completion of this script, you must install @@ -70,6 +74,34 @@ "array": "!(CompactKaArray {})", } + +def get_field_default(field_type, default=None): + if default is not None: + if default == "null": + return "Nothing" + if field_type.startswith("int") and int(default) < 0: + return f"({default})" + if default == "false": + return "False" + if default == "true": + return "True" + return default + if field_type.startswith("int"): + return 0 + if field_type == "float": + return 0 + if field_type == "string": + return '""' + if field_type == "bool": + return "False" + if field_type == "bytes": + return '""' + if field_type == "records": + return "Nothing" + if field_type == "array": + return "KaArray (Just V.empty)" + + GLOBAL_API_VERSION_PATCH = (0, 0) API_VERSION_PATCHES = { "ApiVersions": (0, 3), @@ -130,22 +162,38 @@ def format_field_doc(doc=None, indent=4): ) -def format_hs_list(xs, indent=0, prefix=""): +def format_hs_block(xs, indent=0, prefix="", start="[", end="]"): indents = " " * indent indents_with_prefix = indents + (" " * len(prefix)) - result = indents + prefix + "[ " + result = indents + prefix + start + " " result += ("\n" + indents_with_prefix + ", ").join(xs) result += "\n" - result += indents_with_prefix + "]" + result += indents_with_prefix + end return result +def format_hs_list(xs, indent=0, prefix=""): + return format_hs_block(xs, indent=indent, prefix=prefix) + + +def format_hs_data_cons(xs, indent=0, prefix=""): + return format_hs_block(xs, indent=indent, prefix=prefix, start="{", end="}") + + @dataclass class HsDataField: name: str - ty: str + ty: str # haskell type + ver_ty: str # haskell type with version + ka_type: Optional[str] = None + ka_type_arr_of: Optional[str] = None # for ka_type is array + ka_type_nullable: Optional[bool] = None doc: Optional[str] = None is_tagged: bool = False + default: Optional = None + + def format(self): + pass class HsData: @@ -163,8 +211,12 @@ def __init__( self._init_fields(fields) self.version = version self.doc = doc - self._name = name + f"V{version}" - self._cons = cons + f"V{version} " if cons else self._name + if self.version is None: + self._name = name + self._cons = cons if cons else self._name + else: + self._name = name + f"V{version}" + self._cons = cons + f"V{version} " if cons else self._name def _init_fields(self, fields): self.fields = [] @@ -192,7 +244,13 @@ def format(self): # 2. We assume that flexible message always has tagged_fields if self.tagged_fields or self.is_flexible: self.fields.append( - HsDataField("taggedFields", "!TaggedFields", is_tagged=True) + HsDataField( + "taggedFields", + "!TaggedFields", + "!TaggedFields", + ka_type="TaggedFields", + is_tagged=True, + ) ) if len(self.fields) == 0: @@ -201,15 +259,15 @@ def format(self): elif len(self.fields) == 1: data_type = f"newtype {self._name} = {self._cons}" data_fields = "\n , ".join( - f"{f.name} :: {remove_strict(f.ty)}" for f in self.fields + f"{f.name} :: {remove_strict(f.ver_ty)}" for f in self.fields ) data_fields = " { " + data_fields + "\n }" else: data_type = f"data {self._name} = {self._cons}" data_fields = "\n , ".join( - f"{f.name} :: {f.ty}\n{format_field_doc(f.doc)}" + f"{f.name} :: {f.ver_ty}\n{format_field_doc(f.doc)}" if f.doc - else f"{f.name} :: {f.ty}" + else f"{f.name} :: {f.ver_ty}" for f in self.fields ) data_fields = " { " + data_fields + "\n }" @@ -342,13 +400,14 @@ def parse_version(spec): def parse_field(field, api_version=0, flexible=False): - about = field.get("about") # TODO + about = field.get("about") name = RENAMES.get(field["name"], field["name"]) type_type = field["type"] type_name = None type_maps = TYPE_MAPS with_extra_version_suffix = False is_tagged = False + default = field.get("default") # Versions min_field_version, max_field_version = parse_version( @@ -409,6 +468,7 @@ def parse_field(field, api_version=0, flexible=False): # Array type match_array = re.match(r"^\[\](?P.+)$", type_type) + match_name = None if match_array: type_type = "array" match_name = match_array.group("type") @@ -435,23 +495,41 @@ def parse_field(field, api_version=0, flexible=False): ), ) ) - hs_data = HsData( - type_name, data_sub_fields, api_version, is_flexible=flexible + append_hs_datas( + SUB_DATA_TYPES, + HsData( + type_name, + data_sub_fields, + api_version, + is_flexible=flexible, + ), ) - append_hs_datas(SUB_DATA_TYPES, hs_data) data_name = lower_fst(name) + ver_type_name = type_name if with_extra_version_suffix: - _type_name = f"{type_name}V{api_version}" - type_name = DATA_TYPE_RENAMES.get(_type_name, _type_name) + ver_type_name = f"{ver_type_name}V{api_version}" + ver_type_name = DATA_TYPE_RENAMES.get(ver_type_name, ver_type_name) data_type = type_maps[type_type].format(type_name) + ver_data_type = type_maps[type_type].format(ver_type_name) data_field = HsDataField( data_name, data_type, + ver_data_type, + ka_type=type_type, + ka_type_arr_of=match_name, + # Here we ignore the nullableVersions for array type because we + # always use KaArray(nullable) in haskell + # + # e.g. MetadataRequestV0.topics is a non-null array, but in haskell + # we use KaArray instead + ka_type_nullable=True if type_type == "array" else in_null_version, doc=about, is_tagged=is_tagged, + default=default, ) + return data_field @@ -510,12 +588,141 @@ def parse(msg): # ----------------------------------------------------------------------------- -def gen_haskell_header(): +def convert_field_type(f, is_nullable=False, is_flexible=False): + if f.ka_type == "string" and is_nullable: + return "!NullableString" + return f.ty + + +def _convert_field_array(f, ver, label, direction, is_flexible=False): + convertCompact = ( + "kaArrayToCompact" if direction == "To" else "kaArrayFromCompact" + ) + if f.ka_type_arr_of in {*TYPE_MAPS.keys(), "TaggedFields"}: + if is_flexible: + return f"{convertCompact} {label}.{f.name}" + return label + "." + f.name + else: + converter = lower_fst(f.ka_type_arr_of) + direction + "V" + str(ver) + if is_flexible: + return f"fmap {converter} ({convertCompact} {label}.{f.name})" + return f"fmap {converter} {label}.{f.name}" + + +def convert_field_to(f, ver, is_flexible, label): + if f.ka_type == "array": + return _convert_field_array( + f, ver, label, "To", is_flexible=is_flexible + ) + + if f.ka_type in {*TYPE_MAPS.keys(), "TaggedFields"}: + return label + "." + f.name + + return label + "." + f.name + + +def convert_field_from(src_fields, ver, dest, is_flexible, label): + src = next((i for i in src_fields if i.name == dest.name), None) + + if src is not None: + if src.ka_type == "array": + return _convert_field_array( + src, ver, label, "From", is_flexible=is_flexible + ) + return label + "." + src.name + + if dest.ka_type == "TaggedFields": # TODO + return "EmptyTaggedFields" + + return get_field_default(dest.ka_type, dest.default) + + +def format_total_data_types(ds): + totals = [] + sub_data_types = sorted(ds, key=lambda k: k.name) + for k, data_group in itertools.groupby( + sub_data_types, key=lambda k: k.name + ): + data_group = list(data_group) + fs = [] + # preprocess + is_flexible = False + is_nullable = set() + for d in data_group: + if isinstance(d.fields, int): + continue + if d.is_flexible: + is_flexible = True + for field in d.fields: + if field.ka_type_nullable: + is_nullable.add(field.name) + # process + for d in data_group: + if isinstance(d.fields, int): + continue + for field in d.fields: + if field.name not in map(lambda x: x.name, fs): + field.ver_ty = convert_field_type( + field, + is_nullable=(field.name in is_nullable), + is_flexible=is_flexible, + ) + fs.append(field) + totals.append(HsData(k, fs, None, is_flexible=is_flexible).format()) + # --- + convert_to_results = [] + for d in data_group: + result = f"{lower_fst(d.name)}ToV{d.version} :: {d.name} -> {k}V{d.version}" + result += "\n" + if isinstance(d.fields, int): + result += f"{lower_fst(d.name)}ToV{d.version} = {lower_fst(d.name)}ToV{d.fields}" + else: + cs = [ + f"{f.name} = {convert_field_to(f, d.version, d.is_flexible, 'x')}" + for f in d.fields + ] + var = "x" if len(cs) != 0 else "_" + result += ( + f"{lower_fst(d.name)}ToV{d.version} {var} = {k}V{d.version}" + ) + if len(cs) != 0: + result += "\n" + result += format_hs_data_cons(cs, indent=2) + convert_to_results.append(result) + totals.append("\n".join(convert_to_results)) + # --- + convert_from_results = [] + for d in data_group: + result = f"{lower_fst(d.name)}FromV{d.version} :: {k}V{d.version} -> {d.name}" + result += "\n" + if isinstance(d.fields, int): + result += f"{lower_fst(d.name)}FromV{d.version} = {lower_fst(d.name)}FromV{d.fields}" + else: + var = "x" if len(d.fields) != 0 else "_" + result += ( + f"{lower_fst(d.name)}FromV{d.version} {var} = {d.name}" + ) + result += "\n" + cs = [ + f"{f.name} = {convert_field_from(d.fields, d.version, f, d.is_flexible, 'x')}" + for f in fs + ] + result += format_hs_data_cons(cs, indent=2) + convert_from_results.append(result) + totals.append("\n".join(convert_from_results)) + + return "\n\n".join(s for s in totals) + + +# ----------------------------------------------------------------------------- + + +def gen_struct_haskell_header(): return """ ------------------------------------------------------------------------------- -- Autogenerated by kafka message json schema -- --- $ ./script/kafka_gen.py run > hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +-- $ ./script/kafka_gen.py run -- -- DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING @@ -525,9 +732,9 @@ def gen_haskell_header(): module Kafka.Protocol.Message.Struct where -import Data.ByteString (ByteString) +import Data.ByteString (ByteString) import Data.Int -import Data.Text (Text) +import Data.Text (Text) import GHC.Generics import Kafka.Protocol.Encoding @@ -536,8 +743,33 @@ def gen_haskell_header(): """.strip() -def gen_splitter(): - return "\n-------------------------------------------------------------------------------\n" +def gen_total_haskell_header(): + return """ +------------------------------------------------------------------------------- +-- Autogenerated by kafka message json schema +-- +-- $ ./script/kafka_gen.py run +-- +-- DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE TypeFamilies #-} + +module Kafka.Protocol.Message.Total where + +import Control.Exception +import Data.ByteString (ByteString) +import Data.Int +import Data.Text (Text) +import qualified Data.Vector as V +import GHC.Generics + +import Kafka.Protocol.Encoding +import Kafka.Protocol.Error +import Kafka.Protocol.Message.Struct +""".strip() def gen_sub_data_types(): @@ -635,24 +867,52 @@ def gen_api_header_version(): return f"{hs_type}\n{hs_impl}\n{hs_math_other}\n{hs_inline}" +def gen_total_sub_data_types(): + return format_total_data_types(SUB_DATA_TYPES) + + +def gen_total_data_types(): + return format_total_data_types(DATA_TYPES) + + +def gen_total_response_exception(): + def gen(n, resp): + return f""" +newtype {n}Ex = {n}Ex {resp} + deriving (Show, Eq) +instance Exception {n}Ex + +catch{n}Ex :: IO {resp} -> IO {resp} +catch{n}Ex act = act `catch` \\({n}Ex resp) -> pure resp +""".strip() + + results = {} + for d in DATA_TYPES: + if d.name.endswith("Response"): + if d.name not in results: + results[d.name] = gen(d.name, d.name) + + return "\n\n".join(results.values()) + + def gen_struct(): return f""" -{gen_haskell_header()} -\ -{gen_splitter()} -\ +{gen_struct_haskell_header()} + +------------------------------------------------------------------------------- + {gen_sub_data_types()} -\ -{gen_splitter()} -\ + +------------------------------------------------------------------------------- + {gen_data_types()} -\ -{gen_splitter()} -\ + +------------------------------------------------------------------------------- + {gen_services()} -\ -{gen_splitter()} -\ + +------------------------------------------------------------------------------- + {gen_api_keys()} {gen_supported_api_versions()} @@ -661,6 +921,37 @@ def gen_struct(): """.strip() +def gen_total(): + return f""" +{gen_total_haskell_header()} + +------------------------------------------------------------------------------- + +{gen_total_sub_data_types()} + +{gen_total_data_types()} + +------------------------------------------------------------------------------- + +{gen_total_response_exception()} +""" + + +def write_generates(outputs, filepath, stylish=True): + if stylish: + result = subprocess.run( + "stylish-haskell", + input=outputs.encode(), + stdout=subprocess.PIPE, + ) + if result and result.stdout: + with open(filepath, "w") as f: + print(result.stdout.decode().strip(), file=f) + else: + with open(filepath, "w") as f: + print(outputs.strip(), file=f) + + # ----------------------------------------------------------------------------- @@ -704,11 +995,19 @@ def cli_get_json(path): default="./hstream-kafka/message", dest="files", ) - # TODO: since python3.9 there is BooleanOptionalAction available in argparse parser_run.add_argument( - "--no-format", - action="store_true", - help="Don't run stylish-haskell to format the result", + "--gen-dir", + type=pathlib.Path, + help="Directory to generate haskell files. (Default: %(default)s)", + default="./hstream-kafka/protocol/Kafka/Protocol/Message", + dest="gen_dir", + ) + + parser_run.add_argument( + "--stylish", + action=argparse.BooleanOptionalAction, + default=True, + help="Run stylish-haskell to format the result", ) parser_run.add_argument( "--dry-run", @@ -721,15 +1020,19 @@ def cli_get_json(path): if args.sub_command == "run": run_parse(args.files) - outputs = gen_struct() + struct_outputs = gen_struct() + total_outputs = gen_total() + if not args.dry_run: - if not args.no_format: - result = subprocess.run( - "stylish-haskell", - input=outputs.encode(), - stdout=subprocess.PIPE, - ) - if result and result.stdout: - print(result.stdout.decode().strip()) - else: - print(outputs) + write_generates( + struct_outputs, + os.path.join(args.gen_dir, "Struct.hs"), + stylish=args.stylish, + ) + write_generates( + total_outputs, + os.path.join(args.gen_dir, "Total.hs"), + stylish=args.stylish, + ) + else: + parser.print_help()