From 8d0fab20d9f9bce7769162da4e273c044b3de8f1 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Thu, 9 Nov 2023 20:44:09 -0800 Subject: [PATCH 1/3] kafka: using uniform handleFetch (#1678) --- .../HStream/Kafka/Server/Handler.hsc | 13 +- .../HStream/Kafka/Server/Handler/Basic.hs | 7 +- .../HStream/Kafka/Server/Handler/Consume.hs | 49 +++---- .../protocol/Kafka/Protocol/Message/Struct.hs | 83 +++++++++--- .../protocol/Kafka/Protocol/Message/Total.hs | 125 +++++++++++++----- script/kafka_gen.py | 9 +- 6 files changed, 190 insertions(+), 96 deletions(-) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler.hsc b/hstream-kafka/HStream/Kafka/Server/Handler.hsc index 719aaf131..5c514e449 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler.hsc +++ b/hstream-kafka/HStream/Kafka/Server/Handler.hsc @@ -25,12 +25,13 @@ import qualified Kafka.Protocol.Service as K #define hsc_cv_handler(key, start, end) \ { \ for (int i = start; i <= end; i++) { \ - hsc_printf("handle%sV%d :: K.RequestContext -> K.%sRequestV%d -> IO " \ + hsc_printf("handle%sV%d :: ServerContext -> K.RequestContext -> " \ + "K.%sRequestV%d -> IO " \ "K.%sResponseV%d \n", \ #key, i, #key, i, #key, i); \ - hsc_printf("handle%sV%d ctx req = K.", #key, i); \ + hsc_printf("handle%sV%d sc ctx req = K.", #key, i); \ hsc_lowerfirst(#key); \ - hsc_printf("ResponseToV%d <$> handle%s ctx (K.", i, #key); \ + hsc_printf("ResponseToV%d <$> handle%s sc ctx (K.", i, #key); \ hsc_lowerfirst(#key); \ hsc_printf("RequestFromV%d req)\n", i); \ } \ @@ -44,17 +45,19 @@ import qualified Kafka.Protocol.Service as K } \ hsc_printf("K.hd (K.RPC :: K.RPC K.HStreamKafkaV%d \"", i); \ hsc_lowerfirst(#key); \ - hsc_printf("\") handle%sV%d\n", #key, i); \ + hsc_printf("\") (handle%sV%d sc)\n", #key, i); \ } \ } ------------------------------------------------------------------------------- #cv_handler ApiVersions, 0, 3 +#cv_handler Fetch, 0, 2 handlers :: ServerContext -> [K.ServiceHandler] handlers sc = [ #mk_handler ApiVersions, 0, 3 + , #mk_handler Fetch, 0, 2 , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "metadata") (handleMetadataV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "metadata") (handleMetadataV1 sc) @@ -68,8 +71,6 @@ handlers sc = , K.hd (K.RPC :: K.RPC K.HStreamKafkaV2 "produce") (handleProduceV2 sc) - , K.hd (K.RPC :: K.RPC K.HStreamKafkaV2 "fetch") (handleFetchV2 sc) - -- Offsets , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "listOffsets") (handleListOffsetsV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "listOffsets") (handleListOffsetsV1 sc) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs index 4f289e4eb..966c1bcda 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs @@ -38,8 +38,11 @@ import qualified Kafka.Protocol.Service as K -------------------- handleApiVersions - :: K.RequestContext -> K.ApiVersionsRequest -> IO K.ApiVersionsResponse -handleApiVersions _ _ = do + :: ServerContext + -> K.RequestContext + -> K.ApiVersionsRequest + -> IO K.ApiVersionsResponse +handleApiVersions _ _ _ = do let apiKeys = K.KaArray . Just . (V.map apiVersionV0To) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index 6e42f3252..df064f666 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -1,7 +1,7 @@ {-# LANGUAGE OverloadedRecordDot #-} module HStream.Kafka.Server.Handler.Consume - ( handleFetchV2 + ( handleFetch ) where import Control.Exception @@ -37,14 +37,14 @@ type RecordTable = (GV.Growing V.Vector GV.RealWorld K.RecordFormat) -- NOTE: this behaviour is not the same as kafka broker -handleFetchV2 +handleFetch :: ServerContext -> K.RequestContext - -> K.FetchRequestV2 -> IO K.FetchResponseV2 -handleFetchV2 ServerContext{..} _ r = catchFetchV2 $ do + -> K.FetchRequest -> IO K.FetchResponse +handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do -- kafka broker just throw java.lang.RuntimeException if topics is null, here -- we do the same. let K.NonNullKaArray topicReqs = r.topics - topics <- V.forM topicReqs $ \K.FetchTopicV2{..} -> do + topics <- V.forM topicReqs $ \K.FetchTopic{..} -> do orderedParts <- S.listStreamPartitionsOrdered scLDClient (S.transToTopicStreamName topic) let K.NonNullKaArray partitionReqs = partitions ps <- V.forM partitionReqs $ \p -> do @@ -61,9 +61,9 @@ handleFetchV2 ServerContext{..} _ r = catchFetchV2 $ do case elsn of Left pd -> pure pd Right _ -> error "LogicError: this should not be right" - pure $ K.FetchableTopicResponseV2 topic (K.NonNullKaArray respPartitionDatas) - let resp = K.FetchResponseV2 0{- TODO: throttleTimeMs -} (K.NonNullKaArray respTopics) - throwIO $ RetFetchRespV2 resp + pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas) + let resp = K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -} + throwIO $ K.FetchResponseEx resp -- New reader reader <- S.newLDReader scLDClient (fromIntegral numOfReads) Nothing @@ -109,7 +109,7 @@ handleFetchV2 ServerContext{..} _ r = catchFetchV2 $ do Right (_startlsn, _endlsn, hioffset) -> do mgv <- HT.lookup readRecords logid case mgv of - Nothing -> pure $ K.PartitionDataV2 p.partition K.NONE hioffset (Just "") + Nothing -> pure $ K.PartitionData p.partition K.NONE hioffset (Just "") Just gv -> do v <- GV.unsafeFreeze gv -- This should not be Nothing, because if we found the key in @@ -127,20 +127,9 @@ handleFetchV2 ServerContext{..} _ r = catchFetchV2 $ do let b = V.foldl (<>) (BB.byteString fstRecordBytes) (V.map (BB.byteString . K.unCompactBytes . (.recordBytes)) vs) bs = BS.toStrict $ BB.toLazyByteString b - pure $ K.PartitionDataV2 p.partition K.NONE hioffset (Just bs) - pure $ K.FetchableTopicResponseV2 topic (K.NonNullKaArray respPartitionDatas) - pure $ K.FetchResponseV2 0{- TODO: throttleTimeMs -} (K.NonNullKaArray respTopics) - -------------------------------------------------------------------------------- - --- TODO: move to Kafka.Protocol.Message.Struct -newtype RetFetchRespV2 = RetFetchRespV2 K.FetchResponseV2 - deriving (Show, Eq) - -instance Exception RetFetchRespV2 - -catchFetchV2 :: IO K.FetchResponseV2 -> IO K.FetchResponseV2 -catchFetchV2 act = act `catch` \(RetFetchRespV2 resp) -> pure resp + pure $ K.PartitionData p.partition K.NONE hioffset (Just bs) + pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas) + pure $ K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -} ------------------------------------------------------------------------------- @@ -152,7 +141,7 @@ getPartitionLsn -> K.OffsetManager -> S.C_LogID -> Int32 -> Int64 -- ^ kafka start offset - -> IO (Either K.PartitionDataV2 (S.LSN, S.LSN, Int64)) + -> IO (Either K.PartitionData (S.LSN, S.LSN, Int64)) getPartitionLsn ldclient om logid partition offset = do m <- K.getLatestOffsetWithLsn om logid case m of @@ -167,19 +156,19 @@ getPartitionLsn ldclient om logid partition offset = do | offset == highwaterOffset -> pure $ Right (tailLsn + 1, tailLsn, highwaterOffset) | offset > highwaterOffset -> - pure $ Left $ errorPartitionResponseV2 partition K.OFFSET_OUT_OF_RANGE + pure $ Left $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE -- ghc is not smart enough to detact my partten matching is complete | otherwise -> error "This should not be reached (getPartitionLsn)" Nothing -> do Log.debug $ "Partition " <> Log.build logid <> " is empty" if offset == 0 then pure $ Right (S.LSN_MIN, S.LSN_INVALID, 0) - else pure $ Left $ errorPartitionResponseV2 partition K.OFFSET_OUT_OF_RANGE + else pure $ Left $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE -errorPartitionResponseV2 :: Int32 -> K.ErrorCode -> K.PartitionDataV2 -errorPartitionResponseV2 partitionIndex ec = - K.PartitionDataV2 partitionIndex ec (-1) (Just "") -{-# INLINE errorPartitionResponseV2 #-} +errorPartitionResponse :: Int32 -> K.ErrorCode -> K.PartitionData +errorPartitionResponse partitionIndex ec = + K.PartitionData partitionIndex ec (-1) (Just "") +{-# INLINE errorPartitionResponse #-} foldWhileM :: Monad m => a -> (a -> m (a, Bool)) -> m a foldWhileM !a f = do diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs index 895001e90..ab33035a4 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs @@ -150,7 +150,7 @@ data DescribedGroupV0 = DescribedGroupV0 } deriving (Show, Eq, Generic) instance Serializable DescribedGroupV0 -data FetchPartitionV2 = FetchPartitionV2 +data FetchPartitionV0 = FetchPartitionV0 { partition :: {-# UNPACK #-} !Int32 -- ^ The partition index. , fetchOffset :: {-# UNPACK #-} !Int64 @@ -159,17 +159,25 @@ data FetchPartitionV2 = FetchPartitionV2 -- ^ The maximum bytes to fetch from this partition. See KIP-74 for cases -- where this limit may not be honored. } deriving (Show, Eq, Generic) -instance Serializable FetchPartitionV2 +instance Serializable FetchPartitionV0 -data FetchTopicV2 = FetchTopicV2 +data FetchTopicV0 = FetchTopicV0 { topic :: !Text -- ^ The name of the topic to fetch. - , partitions :: !(KaArray FetchPartitionV2) + , partitions :: !(KaArray FetchPartitionV0) -- ^ The partitions to fetch. } deriving (Show, Eq, Generic) -instance Serializable FetchTopicV2 +instance Serializable FetchTopicV0 -data PartitionDataV2 = PartitionDataV2 +type FetchPartitionV1 = FetchPartitionV0 + +type FetchTopicV1 = FetchTopicV0 + +type FetchPartitionV2 = FetchPartitionV0 + +type FetchTopicV2 = FetchTopicV0 + +data PartitionDataV0 = PartitionDataV0 { partitionIndex :: {-# UNPACK #-} !Int32 -- ^ The partition index. , errorCode :: {-# UNPACK #-} !ErrorCode @@ -179,15 +187,23 @@ data PartitionDataV2 = PartitionDataV2 , recordBytes :: !NullableBytes -- ^ The record data. } deriving (Show, Eq, Generic) -instance Serializable PartitionDataV2 +instance Serializable PartitionDataV0 -data FetchableTopicResponseV2 = FetchableTopicResponseV2 +data FetchableTopicResponseV0 = FetchableTopicResponseV0 { topic :: !Text -- ^ The topic name. - , partitions :: !(KaArray PartitionDataV2) + , partitions :: !(KaArray PartitionDataV0) -- ^ The topic partitions. } deriving (Show, Eq, Generic) -instance Serializable FetchableTopicResponseV2 +instance Serializable FetchableTopicResponseV0 + +type PartitionDataV1 = PartitionDataV0 + +type FetchableTopicResponseV1 = FetchableTopicResponseV0 + +type PartitionDataV2 = PartitionDataV0 + +type FetchableTopicResponseV2 = FetchableTopicResponseV0 data JoinGroupRequestProtocolV0 = JoinGroupRequestProtocolV0 { name :: !Text @@ -618,7 +634,7 @@ newtype DescribeGroupsResponseV0 = DescribeGroupsResponseV0 } deriving (Show, Eq, Generic) instance Serializable DescribeGroupsResponseV0 -data FetchRequestV2 = FetchRequestV2 +data FetchRequestV0 = FetchRequestV0 { replicaId :: {-# UNPACK #-} !Int32 -- ^ The broker ID of the follower, of -1 if this request is from a -- consumer. @@ -626,19 +642,30 @@ data FetchRequestV2 = FetchRequestV2 -- ^ The maximum time in milliseconds to wait for the response. , minBytes :: {-# UNPACK #-} !Int32 -- ^ The minimum bytes to accumulate in the response. - , topics :: !(KaArray FetchTopicV2) + , topics :: !(KaArray FetchTopicV0) -- ^ The topics to fetch. } deriving (Show, Eq, Generic) -instance Serializable FetchRequestV2 +instance Serializable FetchRequestV0 -data FetchResponseV2 = FetchResponseV2 +type FetchRequestV1 = FetchRequestV0 + +type FetchRequestV2 = FetchRequestV0 + +newtype FetchResponseV0 = FetchResponseV0 + { responses :: (KaArray FetchableTopicResponseV0) + } deriving (Show, Eq, Generic) +instance Serializable FetchResponseV0 + +data FetchResponseV1 = FetchResponseV1 { throttleTimeMs :: {-# UNPACK #-} !Int32 -- ^ The duration in milliseconds for which the request was throttled due -- to a quota violation, or zero if the request did not violate any quota. - , responses :: !(KaArray FetchableTopicResponseV2) + , responses :: !(KaArray FetchableTopicResponseV0) -- ^ The response topics. } deriving (Show, Eq, Generic) -instance Serializable FetchResponseV2 +instance Serializable FetchResponseV1 + +type FetchResponseV2 = FetchResponseV1 newtype FindCoordinatorRequestV0 = FindCoordinatorRequestV0 { key :: Text @@ -942,7 +969,8 @@ data HStreamKafkaV0 instance Service HStreamKafkaV0 where type ServiceName HStreamKafkaV0 = "HStreamKafkaV0" type ServiceMethods HStreamKafkaV0 = - '[ "listOffsets" + '[ "fetch" + , "listOffsets" , "metadata" , "offsetCommit" , "offsetFetch" @@ -958,6 +986,13 @@ instance Service HStreamKafkaV0 where , "deleteTopics" ] +instance HasMethodImpl HStreamKafkaV0 "fetch" where + type MethodName HStreamKafkaV0 "fetch" = "fetch" + type MethodKey HStreamKafkaV0 "fetch" = 1 + type MethodVersion HStreamKafkaV0 "fetch" = 0 + type MethodInput HStreamKafkaV0 "fetch" = FetchRequestV0 + type MethodOutput HStreamKafkaV0 "fetch" = FetchResponseV0 + instance HasMethodImpl HStreamKafkaV0 "listOffsets" where type MethodName HStreamKafkaV0 "listOffsets" = "listOffsets" type MethodKey HStreamKafkaV0 "listOffsets" = 2 @@ -1061,13 +1096,21 @@ data HStreamKafkaV1 instance Service HStreamKafkaV1 where type ServiceName HStreamKafkaV1 = "HStreamKafkaV1" type ServiceMethods HStreamKafkaV1 = - '[ "listOffsets" + '[ "fetch" + , "listOffsets" , "metadata" , "offsetCommit" , "offsetFetch" , "apiVersions" ] +instance HasMethodImpl HStreamKafkaV1 "fetch" where + type MethodName HStreamKafkaV1 "fetch" = "fetch" + type MethodKey HStreamKafkaV1 "fetch" = 1 + type MethodVersion HStreamKafkaV1 "fetch" = 1 + type MethodInput HStreamKafkaV1 "fetch" = FetchRequestV1 + type MethodOutput HStreamKafkaV1 "fetch" = FetchResponseV1 + instance HasMethodImpl HStreamKafkaV1 "listOffsets" where type MethodName HStreamKafkaV1 "listOffsets" = "listOffsets" type MethodKey HStreamKafkaV1 "listOffsets" = 2 @@ -1223,7 +1266,7 @@ instance Show ApiKey where supportedApiVersions :: [ApiVersionV0] supportedApiVersions = [ ApiVersionV0 (ApiKey 0) 2 2 - , ApiVersionV0 (ApiKey 1) 2 2 + , ApiVersionV0 (ApiKey 1) 0 2 , ApiVersionV0 (ApiKey 2) 0 1 , ApiVersionV0 (ApiKey 3) 0 4 , ApiVersionV0 (ApiKey 8) 0 2 @@ -1242,6 +1285,8 @@ supportedApiVersions = getHeaderVersion :: ApiKey -> Int16 -> (Int16, Int16) getHeaderVersion (ApiKey 0) 2 = (1, 0) +getHeaderVersion (ApiKey 1) 0 = (1, 0) +getHeaderVersion (ApiKey 1) 1 = (1, 0) getHeaderVersion (ApiKey 1) 2 = (1, 0) getHeaderVersion (ApiKey 2) 0 = (1, 0) getHeaderVersion (ApiKey 2) 1 = (1, 0) diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs index c411d7ef1..714b64b82 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs @@ -269,19 +269,27 @@ data FetchPartition = FetchPartition } deriving (Show, Eq, Generic) instance Serializable FetchPartition -fetchPartitionToV2 :: FetchPartition -> FetchPartitionV2 -fetchPartitionToV2 x = FetchPartitionV2 +fetchPartitionToV0 :: FetchPartition -> FetchPartitionV0 +fetchPartitionToV0 x = FetchPartitionV0 { partition = x.partition , fetchOffset = x.fetchOffset , partitionMaxBytes = x.partitionMaxBytes } +fetchPartitionToV1 :: FetchPartition -> FetchPartitionV1 +fetchPartitionToV1 = fetchPartitionToV0 +fetchPartitionToV2 :: FetchPartition -> FetchPartitionV2 +fetchPartitionToV2 = fetchPartitionToV0 -fetchPartitionFromV2 :: FetchPartitionV2 -> FetchPartition -fetchPartitionFromV2 x = FetchPartition +fetchPartitionFromV0 :: FetchPartitionV0 -> FetchPartition +fetchPartitionFromV0 x = FetchPartition { partition = x.partition , fetchOffset = x.fetchOffset , partitionMaxBytes = x.partitionMaxBytes } +fetchPartitionFromV1 :: FetchPartitionV1 -> FetchPartition +fetchPartitionFromV1 = fetchPartitionFromV0 +fetchPartitionFromV2 :: FetchPartitionV2 -> FetchPartition +fetchPartitionFromV2 = fetchPartitionFromV0 data FetchTopic = FetchTopic { topic :: !Text @@ -291,17 +299,25 @@ data FetchTopic = FetchTopic } deriving (Show, Eq, Generic) instance Serializable FetchTopic -fetchTopicToV2 :: FetchTopic -> FetchTopicV2 -fetchTopicToV2 x = FetchTopicV2 +fetchTopicToV0 :: FetchTopic -> FetchTopicV0 +fetchTopicToV0 x = FetchTopicV0 { topic = x.topic - , partitions = fmap fetchPartitionToV2 x.partitions + , partitions = fmap fetchPartitionToV0 x.partitions } +fetchTopicToV1 :: FetchTopic -> FetchTopicV1 +fetchTopicToV1 = fetchTopicToV0 +fetchTopicToV2 :: FetchTopic -> FetchTopicV2 +fetchTopicToV2 = fetchTopicToV0 -fetchTopicFromV2 :: FetchTopicV2 -> FetchTopic -fetchTopicFromV2 x = FetchTopic +fetchTopicFromV0 :: FetchTopicV0 -> FetchTopic +fetchTopicFromV0 x = FetchTopic { topic = x.topic - , partitions = fmap fetchPartitionFromV2 x.partitions + , partitions = fmap fetchPartitionFromV0 x.partitions } +fetchTopicFromV1 :: FetchTopicV1 -> FetchTopic +fetchTopicFromV1 = fetchTopicFromV0 +fetchTopicFromV2 :: FetchTopicV2 -> FetchTopic +fetchTopicFromV2 = fetchTopicFromV0 data FetchableTopicResponse = FetchableTopicResponse { topic :: !Text @@ -311,17 +327,25 @@ data FetchableTopicResponse = FetchableTopicResponse } deriving (Show, Eq, Generic) instance Serializable FetchableTopicResponse -fetchableTopicResponseToV2 :: FetchableTopicResponse -> FetchableTopicResponseV2 -fetchableTopicResponseToV2 x = FetchableTopicResponseV2 +fetchableTopicResponseToV0 :: FetchableTopicResponse -> FetchableTopicResponseV0 +fetchableTopicResponseToV0 x = FetchableTopicResponseV0 { topic = x.topic - , partitions = fmap partitionDataToV2 x.partitions + , partitions = fmap partitionDataToV0 x.partitions } +fetchableTopicResponseToV1 :: FetchableTopicResponse -> FetchableTopicResponseV1 +fetchableTopicResponseToV1 = fetchableTopicResponseToV0 +fetchableTopicResponseToV2 :: FetchableTopicResponse -> FetchableTopicResponseV2 +fetchableTopicResponseToV2 = fetchableTopicResponseToV0 -fetchableTopicResponseFromV2 :: FetchableTopicResponseV2 -> FetchableTopicResponse -fetchableTopicResponseFromV2 x = FetchableTopicResponse +fetchableTopicResponseFromV0 :: FetchableTopicResponseV0 -> FetchableTopicResponse +fetchableTopicResponseFromV0 x = FetchableTopicResponse { topic = x.topic - , partitions = fmap partitionDataFromV2 x.partitions + , partitions = fmap partitionDataFromV0 x.partitions } +fetchableTopicResponseFromV1 :: FetchableTopicResponseV1 -> FetchableTopicResponse +fetchableTopicResponseFromV1 = fetchableTopicResponseFromV0 +fetchableTopicResponseFromV2 :: FetchableTopicResponseV2 -> FetchableTopicResponse +fetchableTopicResponseFromV2 = fetchableTopicResponseFromV0 data FinalizedFeatureKey = FinalizedFeatureKey { name :: !CompactString @@ -974,21 +998,29 @@ data PartitionData = PartitionData } deriving (Show, Eq, Generic) instance Serializable PartitionData -partitionDataToV2 :: PartitionData -> PartitionDataV2 -partitionDataToV2 x = PartitionDataV2 +partitionDataToV0 :: PartitionData -> PartitionDataV0 +partitionDataToV0 x = PartitionDataV0 { partitionIndex = x.partitionIndex , errorCode = x.errorCode , highWatermark = x.highWatermark , recordBytes = x.recordBytes } +partitionDataToV1 :: PartitionData -> PartitionDataV1 +partitionDataToV1 = partitionDataToV0 +partitionDataToV2 :: PartitionData -> PartitionDataV2 +partitionDataToV2 = partitionDataToV0 -partitionDataFromV2 :: PartitionDataV2 -> PartitionData -partitionDataFromV2 x = PartitionData +partitionDataFromV0 :: PartitionDataV0 -> PartitionData +partitionDataFromV0 x = PartitionData { partitionIndex = x.partitionIndex , errorCode = x.errorCode , highWatermark = x.highWatermark , recordBytes = x.recordBytes } +partitionDataFromV1 :: PartitionDataV1 -> PartitionData +partitionDataFromV1 = partitionDataFromV0 +partitionDataFromV2 :: PartitionDataV2 -> PartitionData +partitionDataFromV2 = partitionDataFromV0 data PartitionProduceData = PartitionProduceData { index :: {-# UNPACK #-} !Int32 @@ -1338,42 +1370,63 @@ data FetchRequest = FetchRequest } deriving (Show, Eq, Generic) instance Serializable FetchRequest -fetchRequestToV2 :: FetchRequest -> FetchRequestV2 -fetchRequestToV2 x = FetchRequestV2 +fetchRequestToV0 :: FetchRequest -> FetchRequestV0 +fetchRequestToV0 x = FetchRequestV0 { replicaId = x.replicaId , maxWaitMs = x.maxWaitMs , minBytes = x.minBytes - , topics = fmap fetchTopicToV2 x.topics + , topics = fmap fetchTopicToV0 x.topics } +fetchRequestToV1 :: FetchRequest -> FetchRequestV1 +fetchRequestToV1 = fetchRequestToV0 +fetchRequestToV2 :: FetchRequest -> FetchRequestV2 +fetchRequestToV2 = fetchRequestToV0 -fetchRequestFromV2 :: FetchRequestV2 -> FetchRequest -fetchRequestFromV2 x = FetchRequest +fetchRequestFromV0 :: FetchRequestV0 -> FetchRequest +fetchRequestFromV0 x = FetchRequest { replicaId = x.replicaId , maxWaitMs = x.maxWaitMs , minBytes = x.minBytes - , topics = fmap fetchTopicFromV2 x.topics + , topics = fmap fetchTopicFromV0 x.topics } +fetchRequestFromV1 :: FetchRequestV1 -> FetchRequest +fetchRequestFromV1 = fetchRequestFromV0 +fetchRequestFromV2 :: FetchRequestV2 -> FetchRequest +fetchRequestFromV2 = fetchRequestFromV0 data FetchResponse = FetchResponse - { throttleTimeMs :: {-# UNPACK #-} !Int32 + { responses :: !(KaArray FetchableTopicResponse) + -- ^ The response topics. + , throttleTimeMs :: {-# UNPACK #-} !Int32 -- ^ The duration in milliseconds for which the request was throttled due -- to a quota violation, or zero if the request did not violate any quota. - , responses :: !(KaArray FetchableTopicResponse) - -- ^ The response topics. } deriving (Show, Eq, Generic) instance Serializable FetchResponse -fetchResponseToV2 :: FetchResponse -> FetchResponseV2 -fetchResponseToV2 x = FetchResponseV2 +fetchResponseToV0 :: FetchResponse -> FetchResponseV0 +fetchResponseToV0 x = FetchResponseV0 + { responses = fmap fetchableTopicResponseToV0 x.responses + } +fetchResponseToV1 :: FetchResponse -> FetchResponseV1 +fetchResponseToV1 x = FetchResponseV1 { throttleTimeMs = x.throttleTimeMs - , responses = fmap fetchableTopicResponseToV2 x.responses + , responses = fmap fetchableTopicResponseToV1 x.responses } +fetchResponseToV2 :: FetchResponse -> FetchResponseV2 +fetchResponseToV2 = fetchResponseToV1 -fetchResponseFromV2 :: FetchResponseV2 -> FetchResponse -fetchResponseFromV2 x = FetchResponse - { throttleTimeMs = x.throttleTimeMs - , responses = fmap fetchableTopicResponseFromV2 x.responses +fetchResponseFromV0 :: FetchResponseV0 -> FetchResponse +fetchResponseFromV0 x = FetchResponse + { responses = fmap fetchableTopicResponseFromV0 x.responses + , throttleTimeMs = 0 + } +fetchResponseFromV1 :: FetchResponseV1 -> FetchResponse +fetchResponseFromV1 x = FetchResponse + { responses = fmap fetchableTopicResponseFromV1 x.responses + , throttleTimeMs = x.throttleTimeMs } +fetchResponseFromV2 :: FetchResponseV2 -> FetchResponse +fetchResponseFromV2 = fetchResponseFromV1 newtype FindCoordinatorRequest = FindCoordinatorRequest { key :: Text diff --git a/script/kafka_gen.py b/script/kafka_gen.py index 5cbb7adaa..3048c05f7 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -79,8 +79,11 @@ def get_field_default(field_type, default=None): if default is not None: if default == "null": return "Nothing" - if field_type.startswith("int") and int(default) < 0: - return f"({default})" + if field_type.startswith("int"): + if default.startswith("0x"): + return f"{int(default, 16)}" + if int(default) < 0: + return f"({default})" if default == "false": return "False" if default == "true": @@ -107,7 +110,7 @@ def get_field_default(field_type, default=None): "ApiVersions": (0, 3), "Metadata": (0, 4), "Produce": (2, 2), - "Fetch": (2, 2), + "Fetch": (0, 2), "OffsetFetch": (0, 2), "OffsetCommit": (0, 2), "ListOffsets": (0, 1), From aae088b07160ef110d457a6d06001005591b93c3 Mon Sep 17 00:00:00 2001 From: daleiz <30970925+daleiz@users.noreply.github.com> Date: Fri, 10 Nov 2023 13:29:25 +0800 Subject: [PATCH 2/3] Fix lookupNodePersist: wait for cluster epoch to sync before executing a new assignment (#1676) --- common/server/HStream/Common/Server/Lookup.hs | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/common/server/HStream/Common/Server/Lookup.hs b/common/server/HStream/Common/Server/Lookup.hs index 98c9f19b1..e274e40df 100644 --- a/common/server/HStream/Common/Server/Lookup.hs +++ b/common/server/HStream/Common/Server/Lookup.hs @@ -64,24 +64,22 @@ lookupNodePersist metaHandle gossipContext loadBalanceHashRing case find ((nodeId == ) . A.serverNodeId) serverList of Just theNode -> return theNode Nothing -> do - (epoch', hashRing) <- readTVarIO loadBalanceHashRing - if epoch' > epoch - then do - theNode' <- getResNode hashRing key advertisedListenersKey - try (M.updateMeta @TaskAllocation metaId - (TaskAllocation epoch' (A.serverNodeId theNode')) - (Just version) metaHandle) >>= \case - Left (e :: SomeException) -> do - -- TODO: add a retry limit here - Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e - <> ", retry..." - lookupNodePersist metaHandle gossipContext loadBalanceHashRing - key metaId advertisedListenersKey - Right () -> return theNode' - else do - let errmsg = "the server has not yet synced with the latest member list" - Log.warning $ "lookupNodePersist: " <> Log.buildString errmsg - throwIO $ HE.ResourceAllocationException errmsg + (epoch', hashRing) <- atomically $ do + (epoch', hashRing) <- readTVar loadBalanceHashRing + if epoch' > epoch + then pure (epoch', hashRing) + else retry + theNode' <- getResNode hashRing key advertisedListenersKey + try (M.updateMeta @TaskAllocation metaId + (TaskAllocation epoch' (A.serverNodeId theNode')) + (Just version) metaHandle) >>= \case + Left (e :: SomeException) -> do + -- TODO: add a retry limit here + Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e + <> ", retry..." + lookupNodePersist metaHandle gossipContext loadBalanceHashRing + key metaId advertisedListenersKey + Right () -> return theNode' data KafkaResource = KafkaResTopic Text From ff7b2b639eceadce3e6970293f1158fb529fdfa6 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Fri, 10 Nov 2023 01:22:41 -0800 Subject: [PATCH 3/3] Dockerfile: minor tweaks (#1679) --- docker/Dockerfile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker/Dockerfile b/docker/Dockerfile index 37c387895..d0fce03df 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -112,6 +112,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libevent-dev \ libevent-openssl-2.1-7 \ libgoogle-glog-dev \ + libgsasl7 \ libmysqlclient-dev \ libpython3.10 \ librdkafka++1 \ @@ -166,6 +167,7 @@ RUN mkdir -p /etc/bash_completion.d && \ grep -wq '^source /etc/profile.d/bash_completion.sh' /etc/bash.bashrc || echo 'source /etc/profile.d/bash_completion.sh' >> /etc/bash.bashrc && \ /usr/local/bin/hadmin --bash-completion-script /usr/local/bin/hadmin > /etc/bash_completion.d/hadmin && \ /usr/local/bin/hadmin-store --bash-completion-script /usr/local/bin/hadmin-store > /etc/bash_completion.d/hadmin-store && \ + /usr/local/bin/hstream-kafka-cli --bash-completion-script /usr/local/bin/hstream-kafka-cli > /etc/bash_completion.d/hstream-kafka-cli && \ /usr/local/bin/hstream --bash-completion-script /usr/local/bin/hstream > /etc/bash_completion.d/hstream EXPOSE 6560 6570