Skip to content

Commit

Permalink
kafka: using uniform handleFetch (#1678)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Nov 10, 2023
1 parent afc4f44 commit 8d0fab2
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 96 deletions.
13 changes: 7 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import qualified Kafka.Protocol.Service as K
#define hsc_cv_handler(key, start, end) \
{ \
for (int i = start; i <= end; i++) { \
hsc_printf("handle%sV%d :: K.RequestContext -> K.%sRequestV%d -> IO " \
hsc_printf("handle%sV%d :: ServerContext -> K.RequestContext -> " \
"K.%sRequestV%d -> IO " \
"K.%sResponseV%d \n", \
#key, i, #key, i, #key, i); \
hsc_printf("handle%sV%d ctx req = K.", #key, i); \
hsc_printf("handle%sV%d sc ctx req = K.", #key, i); \
hsc_lowerfirst(#key); \
hsc_printf("ResponseToV%d <$> handle%s ctx (K.", i, #key); \
hsc_printf("ResponseToV%d <$> handle%s sc ctx (K.", i, #key); \
hsc_lowerfirst(#key); \
hsc_printf("RequestFromV%d req)\n", i); \
} \
Expand All @@ -44,17 +45,19 @@ import qualified Kafka.Protocol.Service as K
} \
hsc_printf("K.hd (K.RPC :: K.RPC K.HStreamKafkaV%d \"", i); \
hsc_lowerfirst(#key); \
hsc_printf("\") handle%sV%d\n", #key, i); \
hsc_printf("\") (handle%sV%d sc)\n", #key, i); \
} \
}

-------------------------------------------------------------------------------

#cv_handler ApiVersions, 0, 3
#cv_handler Fetch, 0, 2

handlers :: ServerContext -> [K.ServiceHandler]
handlers sc =
[ #mk_handler ApiVersions, 0, 3
, #mk_handler Fetch, 0, 2

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "metadata") (handleMetadataV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "metadata") (handleMetadataV1 sc)
Expand All @@ -68,8 +71,6 @@ handlers sc =

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV2 "produce") (handleProduceV2 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV2 "fetch") (handleFetchV2 sc)

-- Offsets
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "listOffsets") (handleListOffsetsV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "listOffsets") (handleListOffsetsV1 sc)
Expand Down
7 changes: 5 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ import qualified Kafka.Protocol.Service as K
--------------------

handleApiVersions
:: K.RequestContext -> K.ApiVersionsRequest -> IO K.ApiVersionsResponse
handleApiVersions _ _ = do
:: ServerContext
-> K.RequestContext
-> K.ApiVersionsRequest
-> IO K.ApiVersionsResponse
handleApiVersions _ _ _ = do
let apiKeys = K.KaArray
. Just
. (V.map apiVersionV0To)
Expand Down
49 changes: 19 additions & 30 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{-# LANGUAGE OverloadedRecordDot #-}

module HStream.Kafka.Server.Handler.Consume
( handleFetchV2
( handleFetch
) where

import Control.Exception
Expand Down Expand Up @@ -37,14 +37,14 @@ type RecordTable =
(GV.Growing V.Vector GV.RealWorld K.RecordFormat)

-- NOTE: this behaviour is not the same as kafka broker
handleFetchV2
handleFetch
:: ServerContext -> K.RequestContext
-> K.FetchRequestV2 -> IO K.FetchResponseV2
handleFetchV2 ServerContext{..} _ r = catchFetchV2 $ do
-> K.FetchRequest -> IO K.FetchResponse
handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
-- kafka broker just throw java.lang.RuntimeException if topics is null, here
-- we do the same.
let K.NonNullKaArray topicReqs = r.topics
topics <- V.forM topicReqs $ \K.FetchTopicV2{..} -> do
topics <- V.forM topicReqs $ \K.FetchTopic{..} -> do
orderedParts <- S.listStreamPartitionsOrdered scLDClient (S.transToTopicStreamName topic)
let K.NonNullKaArray partitionReqs = partitions
ps <- V.forM partitionReqs $ \p -> do
Expand All @@ -61,9 +61,9 @@ handleFetchV2 ServerContext{..} _ r = catchFetchV2 $ do
case elsn of
Left pd -> pure pd
Right _ -> error "LogicError: this should not be right"
pure $ K.FetchableTopicResponseV2 topic (K.NonNullKaArray respPartitionDatas)
let resp = K.FetchResponseV2 0{- TODO: throttleTimeMs -} (K.NonNullKaArray respTopics)
throwIO $ RetFetchRespV2 resp
pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas)
let resp = K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -}
throwIO $ K.FetchResponseEx resp

-- New reader
reader <- S.newLDReader scLDClient (fromIntegral numOfReads) Nothing
Expand Down Expand Up @@ -109,7 +109,7 @@ handleFetchV2 ServerContext{..} _ r = catchFetchV2 $ do
Right (_startlsn, _endlsn, hioffset) -> do
mgv <- HT.lookup readRecords logid
case mgv of
Nothing -> pure $ K.PartitionDataV2 p.partition K.NONE hioffset (Just "")
Nothing -> pure $ K.PartitionData p.partition K.NONE hioffset (Just "")
Just gv -> do
v <- GV.unsafeFreeze gv
-- This should not be Nothing, because if we found the key in
Expand All @@ -127,20 +127,9 @@ handleFetchV2 ServerContext{..} _ r = catchFetchV2 $ do
let b = V.foldl (<>) (BB.byteString fstRecordBytes)
(V.map (BB.byteString . K.unCompactBytes . (.recordBytes)) vs)
bs = BS.toStrict $ BB.toLazyByteString b
pure $ K.PartitionDataV2 p.partition K.NONE hioffset (Just bs)
pure $ K.FetchableTopicResponseV2 topic (K.NonNullKaArray respPartitionDatas)
pure $ K.FetchResponseV2 0{- TODO: throttleTimeMs -} (K.NonNullKaArray respTopics)

-------------------------------------------------------------------------------

-- TODO: move to Kafka.Protocol.Message.Struct
newtype RetFetchRespV2 = RetFetchRespV2 K.FetchResponseV2
deriving (Show, Eq)

instance Exception RetFetchRespV2

catchFetchV2 :: IO K.FetchResponseV2 -> IO K.FetchResponseV2
catchFetchV2 act = act `catch` \(RetFetchRespV2 resp) -> pure resp
pure $ K.PartitionData p.partition K.NONE hioffset (Just bs)
pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas)
pure $ K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -}

-------------------------------------------------------------------------------

Expand All @@ -152,7 +141,7 @@ getPartitionLsn
-> K.OffsetManager
-> S.C_LogID -> Int32
-> Int64 -- ^ kafka start offset
-> IO (Either K.PartitionDataV2 (S.LSN, S.LSN, Int64))
-> IO (Either K.PartitionData (S.LSN, S.LSN, Int64))
getPartitionLsn ldclient om logid partition offset = do
m <- K.getLatestOffsetWithLsn om logid
case m of
Expand All @@ -167,19 +156,19 @@ getPartitionLsn ldclient om logid partition offset = do
| offset == highwaterOffset ->
pure $ Right (tailLsn + 1, tailLsn, highwaterOffset)
| offset > highwaterOffset ->
pure $ Left $ errorPartitionResponseV2 partition K.OFFSET_OUT_OF_RANGE
pure $ Left $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE
-- ghc is not smart enough to detact my partten matching is complete
| otherwise -> error "This should not be reached (getPartitionLsn)"
Nothing -> do
Log.debug $ "Partition " <> Log.build logid <> " is empty"
if offset == 0
then pure $ Right (S.LSN_MIN, S.LSN_INVALID, 0)
else pure $ Left $ errorPartitionResponseV2 partition K.OFFSET_OUT_OF_RANGE
else pure $ Left $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE

errorPartitionResponseV2 :: Int32 -> K.ErrorCode -> K.PartitionDataV2
errorPartitionResponseV2 partitionIndex ec =
K.PartitionDataV2 partitionIndex ec (-1) (Just "")
{-# INLINE errorPartitionResponseV2 #-}
errorPartitionResponse :: Int32 -> K.ErrorCode -> K.PartitionData
errorPartitionResponse partitionIndex ec =
K.PartitionData partitionIndex ec (-1) (Just "")
{-# INLINE errorPartitionResponse #-}

foldWhileM :: Monad m => a -> (a -> m (a, Bool)) -> m a
foldWhileM !a f = do
Expand Down
83 changes: 64 additions & 19 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ data DescribedGroupV0 = DescribedGroupV0
} deriving (Show, Eq, Generic)
instance Serializable DescribedGroupV0

data FetchPartitionV2 = FetchPartitionV2
data FetchPartitionV0 = FetchPartitionV0
{ partition :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, fetchOffset :: {-# UNPACK #-} !Int64
Expand All @@ -159,17 +159,25 @@ data FetchPartitionV2 = FetchPartitionV2
-- ^ The maximum bytes to fetch from this partition. See KIP-74 for cases
-- where this limit may not be honored.
} deriving (Show, Eq, Generic)
instance Serializable FetchPartitionV2
instance Serializable FetchPartitionV0

data FetchTopicV2 = FetchTopicV2
data FetchTopicV0 = FetchTopicV0
{ topic :: !Text
-- ^ The name of the topic to fetch.
, partitions :: !(KaArray FetchPartitionV2)
, partitions :: !(KaArray FetchPartitionV0)
-- ^ The partitions to fetch.
} deriving (Show, Eq, Generic)
instance Serializable FetchTopicV2
instance Serializable FetchTopicV0

data PartitionDataV2 = PartitionDataV2
type FetchPartitionV1 = FetchPartitionV0

type FetchTopicV1 = FetchTopicV0

type FetchPartitionV2 = FetchPartitionV0

type FetchTopicV2 = FetchTopicV0

data PartitionDataV0 = PartitionDataV0
{ partitionIndex :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, errorCode :: {-# UNPACK #-} !ErrorCode
Expand All @@ -179,15 +187,23 @@ data PartitionDataV2 = PartitionDataV2
, recordBytes :: !NullableBytes
-- ^ The record data.
} deriving (Show, Eq, Generic)
instance Serializable PartitionDataV2
instance Serializable PartitionDataV0

data FetchableTopicResponseV2 = FetchableTopicResponseV2
data FetchableTopicResponseV0 = FetchableTopicResponseV0
{ topic :: !Text
-- ^ The topic name.
, partitions :: !(KaArray PartitionDataV2)
, partitions :: !(KaArray PartitionDataV0)
-- ^ The topic partitions.
} deriving (Show, Eq, Generic)
instance Serializable FetchableTopicResponseV2
instance Serializable FetchableTopicResponseV0

type PartitionDataV1 = PartitionDataV0

type FetchableTopicResponseV1 = FetchableTopicResponseV0

type PartitionDataV2 = PartitionDataV0

type FetchableTopicResponseV2 = FetchableTopicResponseV0

data JoinGroupRequestProtocolV0 = JoinGroupRequestProtocolV0
{ name :: !Text
Expand Down Expand Up @@ -618,27 +634,38 @@ newtype DescribeGroupsResponseV0 = DescribeGroupsResponseV0
} deriving (Show, Eq, Generic)
instance Serializable DescribeGroupsResponseV0

data FetchRequestV2 = FetchRequestV2
data FetchRequestV0 = FetchRequestV0
{ replicaId :: {-# UNPACK #-} !Int32
-- ^ The broker ID of the follower, of -1 if this request is from a
-- consumer.
, maxWaitMs :: {-# UNPACK #-} !Int32
-- ^ The maximum time in milliseconds to wait for the response.
, minBytes :: {-# UNPACK #-} !Int32
-- ^ The minimum bytes to accumulate in the response.
, topics :: !(KaArray FetchTopicV2)
, topics :: !(KaArray FetchTopicV0)
-- ^ The topics to fetch.
} deriving (Show, Eq, Generic)
instance Serializable FetchRequestV2
instance Serializable FetchRequestV0

data FetchResponseV2 = FetchResponseV2
type FetchRequestV1 = FetchRequestV0

type FetchRequestV2 = FetchRequestV0

newtype FetchResponseV0 = FetchResponseV0
{ responses :: (KaArray FetchableTopicResponseV0)
} deriving (Show, Eq, Generic)
instance Serializable FetchResponseV0

data FetchResponseV1 = FetchResponseV1
{ throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
, responses :: !(KaArray FetchableTopicResponseV2)
, responses :: !(KaArray FetchableTopicResponseV0)
-- ^ The response topics.
} deriving (Show, Eq, Generic)
instance Serializable FetchResponseV2
instance Serializable FetchResponseV1

type FetchResponseV2 = FetchResponseV1

newtype FindCoordinatorRequestV0 = FindCoordinatorRequestV0
{ key :: Text
Expand Down Expand Up @@ -942,7 +969,8 @@ data HStreamKafkaV0
instance Service HStreamKafkaV0 where
type ServiceName HStreamKafkaV0 = "HStreamKafkaV0"
type ServiceMethods HStreamKafkaV0 =
'[ "listOffsets"
'[ "fetch"
, "listOffsets"
, "metadata"
, "offsetCommit"
, "offsetFetch"
Expand All @@ -958,6 +986,13 @@ instance Service HStreamKafkaV0 where
, "deleteTopics"
]

instance HasMethodImpl HStreamKafkaV0 "fetch" where
type MethodName HStreamKafkaV0 "fetch" = "fetch"
type MethodKey HStreamKafkaV0 "fetch" = 1
type MethodVersion HStreamKafkaV0 "fetch" = 0
type MethodInput HStreamKafkaV0 "fetch" = FetchRequestV0
type MethodOutput HStreamKafkaV0 "fetch" = FetchResponseV0

instance HasMethodImpl HStreamKafkaV0 "listOffsets" where
type MethodName HStreamKafkaV0 "listOffsets" = "listOffsets"
type MethodKey HStreamKafkaV0 "listOffsets" = 2
Expand Down Expand Up @@ -1061,13 +1096,21 @@ data HStreamKafkaV1
instance Service HStreamKafkaV1 where
type ServiceName HStreamKafkaV1 = "HStreamKafkaV1"
type ServiceMethods HStreamKafkaV1 =
'[ "listOffsets"
'[ "fetch"
, "listOffsets"
, "metadata"
, "offsetCommit"
, "offsetFetch"
, "apiVersions"
]

instance HasMethodImpl HStreamKafkaV1 "fetch" where
type MethodName HStreamKafkaV1 "fetch" = "fetch"
type MethodKey HStreamKafkaV1 "fetch" = 1
type MethodVersion HStreamKafkaV1 "fetch" = 1
type MethodInput HStreamKafkaV1 "fetch" = FetchRequestV1
type MethodOutput HStreamKafkaV1 "fetch" = FetchResponseV1

instance HasMethodImpl HStreamKafkaV1 "listOffsets" where
type MethodName HStreamKafkaV1 "listOffsets" = "listOffsets"
type MethodKey HStreamKafkaV1 "listOffsets" = 2
Expand Down Expand Up @@ -1223,7 +1266,7 @@ instance Show ApiKey where
supportedApiVersions :: [ApiVersionV0]
supportedApiVersions =
[ ApiVersionV0 (ApiKey 0) 2 2
, ApiVersionV0 (ApiKey 1) 2 2
, ApiVersionV0 (ApiKey 1) 0 2
, ApiVersionV0 (ApiKey 2) 0 1
, ApiVersionV0 (ApiKey 3) 0 4
, ApiVersionV0 (ApiKey 8) 0 2
Expand All @@ -1242,6 +1285,8 @@ supportedApiVersions =

getHeaderVersion :: ApiKey -> Int16 -> (Int16, Int16)
getHeaderVersion (ApiKey 0) 2 = (1, 0)
getHeaderVersion (ApiKey 1) 0 = (1, 0)
getHeaderVersion (ApiKey 1) 1 = (1, 0)
getHeaderVersion (ApiKey 1) 2 = (1, 0)
getHeaderVersion (ApiKey 2) 0 = (1, 0)
getHeaderVersion (ApiKey 2) 1 = (1, 0)
Expand Down
Loading

0 comments on commit 8d0fab2

Please sign in to comment.