diff --git a/conf/hstream.yaml b/conf/hstream.yaml index 3e4090491..a6cea5a8d 100644 --- a/conf/hstream.yaml +++ b/conf/hstream.yaml @@ -248,12 +248,19 @@ kafka: # gossip-interval: 1000000 # 1 sec # probe-interval: 2000000 # 2 sec # roundtrip-timeout: 500000 # 0.5 sec - # - #Kafka options + + # Topic options # #num-partitions: 1 #default-replication-factor: 1 + # Internal storage options + # + #storage: + # fetch-mode: 1 # TODO: Currently, only mode 1 is supported + # fetch-reader-timeout: 50 # 50ms, default timeout of each read, 0 means nonblocking + # fetch-maxlen: 1000 # default max size of each read + # Configuration for HStream Store # The configuration for hstore is **Optional**. When the values are not provided, # hstreamdb will use the following configuration as the default configuration. diff --git a/hstream-kafka/HStream/Kafka/Server/Config.hs b/hstream-kafka/HStream/Kafka/Server/Config.hs index 1c810b299..43b8e90b0 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config.hs @@ -12,6 +12,7 @@ module HStream.Kafka.Server.Config , TlsConfig (..) , SecurityProtocolMap, defaultProtocolMap , advertisedListenersToPB + , StorageOptions (..) ) where import Control.Exception (throwIO) diff --git a/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs b/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs index 1412b691a..befaffa8a 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs @@ -83,13 +83,6 @@ parseJSONToOptions CliOptions{..} obj = do joinWorkerConcurrency <- clusterCfgObj .:? "join-worker-concurrency" .!= joinWorkerConcurrency defaultGossipOpts let _gossipOpts = GossipOpts {..} - -- Store Config - storeCfgObj <- obj .:? "hstore" .!= mempty - storeLogLevel <- readWithErrLog "store log-level" <$> storeCfgObj .:? "log-level" .!= "info" - - let !_ldConfigPath = cliStoreConfigPath - let !_ldLogLevel = fromMaybe storeLogLevel cliLdLogLevel - -- TLS config nodeEnableTls <- nodeCfgObj .:? "enable-tls" .!= False nodeTlsKeyPath <- nodeCfgObj .:? "tls-key-path" @@ -111,7 +104,19 @@ parseJSONToOptions CliOptions{..} obj = do nodeEnableSaslAuth <- nodeCfgObj .:? "enable-sasl" .!= False let !_enableSaslAuth = cliEnableSaslAuth || nodeEnableSaslAuth - return ServerOpts {..} + -- Store Config + storeCfgObj <- obj .:? "hstore" .!= mempty + storeLogLevel <- readWithErrLog "store log-level" <$> storeCfgObj .:? "log-level" .!= "info" + let !_ldConfigPath = cliStoreConfigPath + let !_ldLogLevel = fromMaybe storeLogLevel cliLdLogLevel + + -- Storage config + storageCfg <- nodeCfgObj .:? "storage" .!= mempty + fetchReaderTimeout <- storageCfg .:? "fetch-reader-timeout" .!= 50 + fetchMaxLen <- storageCfg .:? "fetch-maxlen" .!= 1000 + let _storage = StorageOptions{..} + + return ServerOpts{..} ------------------------------------------------------------------------------- diff --git a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs index e4ebc5276..66838911b 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs @@ -9,6 +9,7 @@ module HStream.Kafka.Server.Config.Types , ListenersSecurityProtocolMap , TlsConfig (..) , SecurityProtocolMap, defaultProtocolMap + , StorageOptions (..) -- * Helpers , advertisedListenersToPB @@ -68,15 +69,17 @@ data ServerOpts = ServerOpts , _partitionNums :: !Int , _maxRecordSize :: !Int , _seedNodes :: ![(ByteString, Int)] - - , _ldLogLevel :: !LDLogLevel - , _ldConfigPath :: !CBytes - - , _compression :: !Compression + , _disableAutoCreateTopic :: !Bool , _enableSaslAuth :: !Bool , _kafkaBrokerConfigs :: !KC.KafkaBrokerConfigs + + -- Store Options + , _storage :: !StorageOptions + , _compression :: !Compression + , _ldLogLevel :: !LDLogLevel + , _ldConfigPath :: !CBytes } deriving (Show, Eq) ------------------------------------------------------------------------------- @@ -186,3 +189,8 @@ parseMetaStoreAddr t = | s == "file" -> FileAddr . Text.unpack $ ip | otherwise -> errorWithoutStackTrace $ "Invalid meta store address, unsupported scheme: " <> show s Left eMsg -> errorWithoutStackTrace eMsg + +data StorageOptions = StorageOptions + { fetchReaderTimeout :: Int + , fetchMaxLen :: Int + } deriving (Show, Eq) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler.hsc b/hstream-kafka/HStream/Kafka/Server/Handler.hsc index 02584c384..10231906e 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler.hsc +++ b/hstream-kafka/HStream/Kafka/Server/Handler.hsc @@ -57,7 +57,7 @@ import qualified Kafka.Protocol.Service as K #cv_handler ApiVersions, 0, 3 #cv_handler Produce, 0, 2 -#cv_handler Fetch, 0, 2 +#cv_handler Fetch, 0, 3 #cv_handler DescribeConfigs, 0, 0 #cv_handler SaslHandshake, 0, 1 @@ -69,7 +69,7 @@ handlers sc = -- Write , #mk_handler Produce, 0, 2 -- Read - , #mk_handler Fetch, 0, 2 + , #mk_handler Fetch, 0, 3 , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "metadata") (handleMetadataV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "metadata") (handleMetadataV1 sc) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index 0c5a01363..a469a5e76 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -6,6 +6,7 @@ module HStream.Kafka.Server.Handler.Consume import Control.Exception import Control.Monad +import Data.ByteString (ByteString) import qualified Data.ByteString as BS import qualified Data.ByteString.Builder as BB import Data.Either (isRight) @@ -25,6 +26,8 @@ import HStream.Kafka.Metrics.ConsumeStats (readLatencySnd, topicTotalSendBytes, topicTotalSendMessages, totalConsumeRequest) +import HStream.Kafka.Server.Config (ServerOpts (..), + StorageOptions (..)) import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Logger as Log import qualified HStream.Store as S @@ -52,11 +55,12 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do -- kafka broker just throw java.lang.RuntimeException if topics is null, here -- we do the same. let K.NonNullKaArray topicReqs = r.topics - topics <- V.forM topicReqs $ \K.FetchTopic{..} -> do - orderedParts <- S.listStreamPartitionsOrdered scLDClient (S.transToTopicStreamName topic) - let K.NonNullKaArray partitionReqs = partitions - ps <- V.forM partitionReqs $ \p -> do - P.withLabel totalConsumeRequest (topic, T.pack . show $ p.partition) $ + topics <- V.forM topicReqs $ \topic{- K.FetchTopic -} -> do + orderedParts <- S.listStreamPartitionsOrdered scLDClient + (S.transToTopicStreamName topic.topic) + let K.NonNullKaArray partitionReqs = topic.partitions + ps <- V.forM partitionReqs $ \p{- K.FetchPartition -} -> do + P.withLabel totalConsumeRequest (topic.topic, T.pack . show $ p.partition) $ \counter -> void $ P.addCounter counter 1 let m_logid = orderedParts V.!? fromIntegral p.partition case m_logid of @@ -69,10 +73,10 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do elsn <- getPartitionLsn scLDClient scOffsetManager logid p.partition p.fetchOffset pure (logid, elsn, p) - pure (topic, ps) + pure (topic.topic, ps) let numOfReads = V.sum $ - V.map (V.length . (V.filter $ \(_, x, _) -> isRight x) . snd) topics + V.map (V.length . V.filter (\(_, x, _) -> isRight x) . snd) topics when (numOfReads < 1) $ do respTopics <- V.forM topics $ \(topic, partitions) -> do respPartitionDatas <- V.forM partitions $ \(_, elsn, _) -> do @@ -97,24 +101,22 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do S.readerStartReading reader logid startlsn S.LSN_MAX -- Read records from storage - if r.minBytes <= 0 || r.maxWaitMs <= 0 - then S.readerSetTimeout reader 0 -- nonblocking - else S.readerSetTimeout reader r.maxWaitMs - S.readerSetWaitOnlyWhenNoData reader - (_, records) <- foldWhileM (0, []) $ \(size, acc) -> do - -- TODO: If we can record latency at a more granular level, such as for - -- a consumer group, or a stream? - rs <- P.observeDuration readLatencySnd $ S.readerRead reader 100 - if null rs - then pure ((size, acc), False) - else do let size' = size + (sum $ map (K.recordBytesSize . (.recordPayload)) rs) - acc' = acc <> rs - if size' >= fromIntegral r.minBytes - then pure ((size', acc'), False) - else pure ((size', acc'), True) + -- + -- TODO: + -- - dynamically change reader settings according to the client request + -- (e.g. maxWaitMs, minBytes...) + -- - handle maxBytes + -- + -- FIXME: Do not setWaitOnlyWhenNoData if you are mostly focusing on + -- throughput + -- + -- Mode1 + records <- readMode1 reader -- Process read records -- TODO: what if client send two same topic but with different partitions? + -- + -- {logid: [RecordFormat]} readRecords <- HT.initialize numOfReads :: IO RecordTable forM_ records $ \record -> do recordFormat <- K.runGet @K.RecordFormat record.recordPayload @@ -156,6 +158,47 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas) pure $ K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -} + where + readMode0 :: S.LDReader -> IO [S.DataRecord ByteString] + readMode0 reader = do + if r.minBytes <= 0 || r.maxWaitMs <= 0 + then S.readerSetTimeout reader 0 -- nonblocking + else S.readerSetTimeout reader r.maxWaitMs + S.readerSetWaitOnlyWhenNoData reader + (_, records) <- foldWhileM (0, []) $ \(size, acc) -> do + rs <- P.observeDuration readLatencySnd $ S.readerRead reader 100 + if null rs + then pure ((size, acc), False) + else do let size' = size + sum (map (K.recordBytesSize . (.recordPayload)) rs) + acc' = acc <> rs + if size' >= fromIntegral r.minBytes + then pure ((size', acc'), False) + else pure ((size', acc'), True) + pure records + + readMode1 :: S.LDReader -> IO [S.DataRecord ByteString] + readMode1 reader = do + let storageOpts = serverOpts._storage + defTimeout = fromIntegral storageOpts.fetchReaderTimeout + + if r.minBytes <= 0 || r.maxWaitMs <= 0 -- respond immediately + then do S.readerSetTimeout reader 0 -- nonblocking + S.readerRead reader storageOpts.fetchMaxLen + else + if r.maxWaitMs > defTimeout + then do + S.readerSetTimeout reader defTimeout + rs1 <- S.readerRead reader storageOpts.fetchMaxLen + let size = sum (map (K.recordBytesSize . (.recordPayload)) rs1) + if size >= fromIntegral r.minBytes + then pure rs1 + else do S.readerSetTimeout reader (r.maxWaitMs - defTimeout) + rs2 <- S.readerRead reader storageOpts.fetchMaxLen + pure $ rs1 <> rs2 + else do + S.readerSetTimeout reader r.maxWaitMs + S.readerRead reader storageOpts.fetchMaxLen + ------------------------------------------------------------------------------- -- Return tuple of (startLsn, tailLsn, highwaterOffset) diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs index 20fb2cd88..fef0234f7 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs @@ -218,6 +218,10 @@ type FetchPartitionV2 = FetchPartitionV0 type FetchTopicV2 = FetchTopicV0 +type FetchPartitionV3 = FetchPartitionV0 + +type FetchTopicV3 = FetchTopicV0 + data PartitionDataV0 = PartitionDataV0 { partitionIndex :: {-# UNPACK #-} !Int32 -- ^ The partition index. @@ -246,6 +250,10 @@ type PartitionDataV2 = PartitionDataV0 type FetchableTopicResponseV2 = FetchableTopicResponseV0 +type PartitionDataV3 = PartitionDataV0 + +type FetchableTopicResponseV3 = FetchableTopicResponseV0 + data JoinGroupRequestProtocolV0 = JoinGroupRequestProtocolV0 { name :: !Text -- ^ The protocol name. @@ -736,6 +744,22 @@ type FetchRequestV1 = FetchRequestV0 type FetchRequestV2 = FetchRequestV0 +data FetchRequestV3 = FetchRequestV3 + { replicaId :: {-# UNPACK #-} !Int32 + -- ^ The broker ID of the follower, of -1 if this request is from a + -- consumer. + , maxWaitMs :: {-# UNPACK #-} !Int32 + -- ^ The maximum time in milliseconds to wait for the response. + , minBytes :: {-# UNPACK #-} !Int32 + -- ^ The minimum bytes to accumulate in the response. + , maxBytes :: {-# UNPACK #-} !Int32 + -- ^ The maximum bytes to fetch. See KIP-74 for cases where this limit may + -- not be honored. + , topics :: !(KaArray FetchTopicV0) + -- ^ The topics to fetch. + } deriving (Show, Eq, Generic) +instance Serializable FetchRequestV3 + newtype FetchResponseV0 = FetchResponseV0 { responses :: (KaArray FetchableTopicResponseV0) } deriving (Show, Eq, Generic) @@ -752,6 +776,8 @@ instance Serializable FetchResponseV1 type FetchResponseV2 = FetchResponseV1 +type FetchResponseV3 = FetchResponseV1 + newtype FindCoordinatorRequestV0 = FindCoordinatorRequestV0 { key :: Text } deriving (Show, Eq, Generic) @@ -1390,10 +1416,18 @@ data HStreamKafkaV3 instance Service HStreamKafkaV3 where type ServiceName HStreamKafkaV3 = "HStreamKafkaV3" type ServiceMethods HStreamKafkaV3 = - '[ "metadata" + '[ "fetch" + , "metadata" , "apiVersions" ] +instance HasMethodImpl HStreamKafkaV3 "fetch" where + type MethodName HStreamKafkaV3 "fetch" = "fetch" + type MethodKey HStreamKafkaV3 "fetch" = 1 + type MethodVersion HStreamKafkaV3 "fetch" = 3 + type MethodInput HStreamKafkaV3 "fetch" = FetchRequestV3 + type MethodOutput HStreamKafkaV3 "fetch" = FetchResponseV3 + instance HasMethodImpl HStreamKafkaV3 "metadata" where type MethodName HStreamKafkaV3 "metadata" = "metadata" type MethodKey HStreamKafkaV3 "metadata" = 3 @@ -1453,7 +1487,7 @@ instance Show ApiKey where supportedApiVersions :: [ApiVersionV0] supportedApiVersions = [ ApiVersionV0 (ApiKey 0) 0 2 - , ApiVersionV0 (ApiKey 1) 0 2 + , ApiVersionV0 (ApiKey 1) 0 3 , ApiVersionV0 (ApiKey 2) 0 1 , ApiVersionV0 (ApiKey 3) 0 4 , ApiVersionV0 (ApiKey 8) 0 2 @@ -1480,6 +1514,7 @@ getHeaderVersion (ApiKey 0) 2 = (1, 0) getHeaderVersion (ApiKey 1) 0 = (1, 0) getHeaderVersion (ApiKey 1) 1 = (1, 0) getHeaderVersion (ApiKey 1) 2 = (1, 0) +getHeaderVersion (ApiKey 1) 3 = (1, 0) getHeaderVersion (ApiKey 2) 0 = (1, 0) getHeaderVersion (ApiKey 2) 1 = (1, 0) getHeaderVersion (ApiKey 3) 0 = (1, 0) diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs index ad48125e2..a1ed3d5de 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs @@ -370,6 +370,8 @@ fetchPartitionToV1 :: FetchPartition -> FetchPartitionV1 fetchPartitionToV1 = fetchPartitionToV0 fetchPartitionToV2 :: FetchPartition -> FetchPartitionV2 fetchPartitionToV2 = fetchPartitionToV0 +fetchPartitionToV3 :: FetchPartition -> FetchPartitionV3 +fetchPartitionToV3 = fetchPartitionToV0 fetchPartitionFromV0 :: FetchPartitionV0 -> FetchPartition fetchPartitionFromV0 x = FetchPartition @@ -381,6 +383,8 @@ fetchPartitionFromV1 :: FetchPartitionV1 -> FetchPartition fetchPartitionFromV1 = fetchPartitionFromV0 fetchPartitionFromV2 :: FetchPartitionV2 -> FetchPartition fetchPartitionFromV2 = fetchPartitionFromV0 +fetchPartitionFromV3 :: FetchPartitionV3 -> FetchPartition +fetchPartitionFromV3 = fetchPartitionFromV0 data FetchTopic = FetchTopic { topic :: !Text @@ -399,6 +403,8 @@ fetchTopicToV1 :: FetchTopic -> FetchTopicV1 fetchTopicToV1 = fetchTopicToV0 fetchTopicToV2 :: FetchTopic -> FetchTopicV2 fetchTopicToV2 = fetchTopicToV0 +fetchTopicToV3 :: FetchTopic -> FetchTopicV3 +fetchTopicToV3 = fetchTopicToV0 fetchTopicFromV0 :: FetchTopicV0 -> FetchTopic fetchTopicFromV0 x = FetchTopic @@ -409,6 +415,8 @@ fetchTopicFromV1 :: FetchTopicV1 -> FetchTopic fetchTopicFromV1 = fetchTopicFromV0 fetchTopicFromV2 :: FetchTopicV2 -> FetchTopic fetchTopicFromV2 = fetchTopicFromV0 +fetchTopicFromV3 :: FetchTopicV3 -> FetchTopic +fetchTopicFromV3 = fetchTopicFromV0 data FetchableTopicResponse = FetchableTopicResponse { topic :: !Text @@ -427,6 +435,8 @@ fetchableTopicResponseToV1 :: FetchableTopicResponse -> FetchableTopicResponseV1 fetchableTopicResponseToV1 = fetchableTopicResponseToV0 fetchableTopicResponseToV2 :: FetchableTopicResponse -> FetchableTopicResponseV2 fetchableTopicResponseToV2 = fetchableTopicResponseToV0 +fetchableTopicResponseToV3 :: FetchableTopicResponse -> FetchableTopicResponseV3 +fetchableTopicResponseToV3 = fetchableTopicResponseToV0 fetchableTopicResponseFromV0 :: FetchableTopicResponseV0 -> FetchableTopicResponse fetchableTopicResponseFromV0 x = FetchableTopicResponse @@ -437,6 +447,8 @@ fetchableTopicResponseFromV1 :: FetchableTopicResponseV1 -> FetchableTopicRespon fetchableTopicResponseFromV1 = fetchableTopicResponseFromV0 fetchableTopicResponseFromV2 :: FetchableTopicResponseV2 -> FetchableTopicResponse fetchableTopicResponseFromV2 = fetchableTopicResponseFromV0 +fetchableTopicResponseFromV3 :: FetchableTopicResponseV3 -> FetchableTopicResponse +fetchableTopicResponseFromV3 = fetchableTopicResponseFromV0 data FinalizedFeatureKey = FinalizedFeatureKey { name :: !CompactString @@ -1100,6 +1112,8 @@ partitionDataToV1 :: PartitionData -> PartitionDataV1 partitionDataToV1 = partitionDataToV0 partitionDataToV2 :: PartitionData -> PartitionDataV2 partitionDataToV2 = partitionDataToV0 +partitionDataToV3 :: PartitionData -> PartitionDataV3 +partitionDataToV3 = partitionDataToV0 partitionDataFromV0 :: PartitionDataV0 -> PartitionData partitionDataFromV0 x = PartitionData @@ -1112,6 +1126,8 @@ partitionDataFromV1 :: PartitionDataV1 -> PartitionData partitionDataFromV1 = partitionDataFromV0 partitionDataFromV2 :: PartitionDataV2 -> PartitionData partitionDataFromV2 = partitionDataFromV0 +partitionDataFromV3 :: PartitionDataV3 -> PartitionData +partitionDataFromV3 = partitionDataFromV0 data PartitionProduceData = PartitionProduceData { index :: {-# UNPACK #-} !Int32 @@ -1541,6 +1557,9 @@ data FetchRequest = FetchRequest -- ^ The minimum bytes to accumulate in the response. , topics :: !(KaArray FetchTopic) -- ^ The topics to fetch. + , maxBytes :: {-# UNPACK #-} !Int32 + -- ^ The maximum bytes to fetch. See KIP-74 for cases where this limit may + -- not be honored. } deriving (Show, Eq, Generic) instance Serializable FetchRequest @@ -1555,6 +1574,14 @@ fetchRequestToV1 :: FetchRequest -> FetchRequestV1 fetchRequestToV1 = fetchRequestToV0 fetchRequestToV2 :: FetchRequest -> FetchRequestV2 fetchRequestToV2 = fetchRequestToV0 +fetchRequestToV3 :: FetchRequest -> FetchRequestV3 +fetchRequestToV3 x = FetchRequestV3 + { replicaId = x.replicaId + , maxWaitMs = x.maxWaitMs + , minBytes = x.minBytes + , maxBytes = x.maxBytes + , topics = fmap fetchTopicToV3 x.topics + } fetchRequestFromV0 :: FetchRequestV0 -> FetchRequest fetchRequestFromV0 x = FetchRequest @@ -1562,11 +1589,20 @@ fetchRequestFromV0 x = FetchRequest , maxWaitMs = x.maxWaitMs , minBytes = x.minBytes , topics = fmap fetchTopicFromV0 x.topics + , maxBytes = 2147483647 } fetchRequestFromV1 :: FetchRequestV1 -> FetchRequest fetchRequestFromV1 = fetchRequestFromV0 fetchRequestFromV2 :: FetchRequestV2 -> FetchRequest fetchRequestFromV2 = fetchRequestFromV0 +fetchRequestFromV3 :: FetchRequestV3 -> FetchRequest +fetchRequestFromV3 x = FetchRequest + { replicaId = x.replicaId + , maxWaitMs = x.maxWaitMs + , minBytes = x.minBytes + , topics = fmap fetchTopicFromV3 x.topics + , maxBytes = x.maxBytes + } data FetchResponse = FetchResponse { responses :: !(KaArray FetchableTopicResponse) @@ -1588,6 +1624,8 @@ fetchResponseToV1 x = FetchResponseV1 } fetchResponseToV2 :: FetchResponse -> FetchResponseV2 fetchResponseToV2 = fetchResponseToV1 +fetchResponseToV3 :: FetchResponse -> FetchResponseV3 +fetchResponseToV3 = fetchResponseToV1 fetchResponseFromV0 :: FetchResponseV0 -> FetchResponse fetchResponseFromV0 x = FetchResponse @@ -1601,6 +1639,8 @@ fetchResponseFromV1 x = FetchResponse } fetchResponseFromV2 :: FetchResponseV2 -> FetchResponse fetchResponseFromV2 = fetchResponseFromV1 +fetchResponseFromV3 :: FetchResponseV3 -> FetchResponse +fetchResponseFromV3 = fetchResponseFromV1 newtype FindCoordinatorRequest = FindCoordinatorRequest { key :: Text diff --git a/script/kafka_gen.py b/script/kafka_gen.py index ba9ac5316..d2c7444e0 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -110,7 +110,7 @@ def get_field_default(field_type, default=None): "ApiVersions": (0, 3), "Metadata": (0, 4), "Produce": (0, 2), - "Fetch": (0, 2), + "Fetch": (0, 3), "OffsetFetch": (0, 2), "OffsetCommit": (0, 2), "ListOffsets": (0, 1),