Skip to content

Commit

Permalink
kafka: using uniform handleApiVersions (#1675)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Nov 9, 2023
1 parent c5870d9 commit 8369261
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,48 @@ import HStream.Kafka.Server.Types (ServerContext (..))
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

-------------------------------------------------------------------------------

#define hsc_lowerfirst(x) \
{ \
const char* s = (x); \
hsc_putchar(hsc_tolower(*s)); \
hsc_printf("%s", ++s); \
}

#define hsc_cv_handler(key, start, end) \
{ \
for (int i = start; i <= end; i++) { \
hsc_printf("handle%sV%d :: K.RequestContext -> K.%sRequestV%d -> IO " \
"K.%sResponseV%d \n", \
#key, i, #key, i, #key, i); \
hsc_printf("handle%sV%d ctx req = K.", #key, i); \
hsc_lowerfirst(#key); \
hsc_printf("ResponseToV%d <$> handle%s ctx (K.", i, #key); \
hsc_lowerfirst(#key); \
hsc_printf("RequestFromV%d req)\n", i); \
} \
}

#define hsc_mk_handler(key, start, end) \
{ \
for (int i = start; i <= end; i++) { \
if (i != start) { \
hsc_printf(" , "); \
} \
hsc_printf("K.hd (K.RPC :: K.RPC K.HStreamKafkaV%d \"", i); \
hsc_lowerfirst(#key); \
hsc_printf("\") handle%sV%d\n", #key, i); \
} \
}

-------------------------------------------------------------------------------

#cv_handler ApiVersions, 0, 3

handlers :: ServerContext -> [K.ServiceHandler]
handlers sc =
[ K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "apiVersions") handleApiversionsV0
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "apiVersions") handleApiversionsV1
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV2 "apiVersions") handleApiversionsV2
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV3 "apiVersions") handleApiversionsV3
[ #mk_handler ApiVersions, 0, 3

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "metadata") (handleMetadataV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "metadata") (handleMetadataV1 sc)
Expand Down
42 changes: 12 additions & 30 deletions hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@

module HStream.Kafka.Server.Handler.Basic
( -- 18: ApiVersions
handleApiversionsV0
, handleApiversionsV1
, handleApiversionsV2
, handleApiversionsV3
handleApiVersions
-- 3: Metadata
, handleMetadataV0
, handleMetadataV1
Expand Down Expand Up @@ -39,32 +36,17 @@ import qualified Kafka.Protocol.Service as K
--------------------
-- 18: ApiVersions
--------------------
handleApiversionsV0
:: K.RequestContext -> K.ApiVersionsRequestV0 -> IO K.ApiVersionsResponseV0
handleApiversionsV0 _ _ = do
let apiKeys = K.KaArray $ Just $ V.fromList K.supportedApiVersions
pure $ K.ApiVersionsResponseV0 K.NONE apiKeys

handleApiversionsV1
:: K.RequestContext -> K.ApiVersionsRequestV1 -> IO K.ApiVersionsResponseV1
handleApiversionsV1 _ _ = do
let apiKeys = K.KaArray $ Just $ V.fromList K.supportedApiVersions
pure $ K.ApiVersionsResponseV1 K.NONE apiKeys 0{- throttle_time_ms -}

handleApiversionsV2
:: K.RequestContext -> K.ApiVersionsRequestV2 -> IO K.ApiVersionsResponseV2
handleApiversionsV2 = handleApiversionsV1

handleApiversionsV3
:: K.RequestContext -> K.ApiVersionsRequestV3 -> IO K.ApiVersionsResponseV3
handleApiversionsV3 _ req = do
let apiKeys = K.CompactKaArray

handleApiVersions
:: K.RequestContext -> K.ApiVersionsRequest -> IO K.ApiVersionsResponse
handleApiVersions _ _ = do
let apiKeys = K.KaArray
. Just
. (V.map apiVersionV0ToV3)
. (V.map apiVersionV0To)
. V.fromList
$ K.supportedApiVersions
pure $ K.ApiVersionsResponseV3 K.NONE apiKeys 0{- throttle_time_ms -}
K.EmptyTaggedFields
pure $ K.ApiVersionsResponse K.NONE apiKeys 0{- throttle_time_ms -}
K.EmptyTaggedFields

--------------------
-- 3: Metadata
Expand Down Expand Up @@ -216,6 +198,6 @@ handleMetadataV4 ctx@ServerContext{..} _ req = do

-------------------------------------------------------------------------------

apiVersionV0ToV3 :: K.ApiVersionV0 -> K.ApiVersionV3
apiVersionV0ToV3 K.ApiVersionV0{..} =
let taggedFields = K.EmptyTaggedFields in K.ApiVersionV3{..}
apiVersionV0To :: K.ApiVersionV0 -> K.ApiVersion
apiVersionV0To K.ApiVersionV0{..} =
let taggedFields = K.EmptyTaggedFields in K.ApiVersion{..}
1 change: 1 addition & 0 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ library
cxx-options: -std=c++17
extra-libraries: rdkafka++
hs-source-dirs: .
build-tool-depends: hsc2hs:hsc2hs
build-depends:
, attoparsec
, base >=4.11 && <5
Expand Down
13 changes: 13 additions & 0 deletions hstream-kafka/tests/blackbox/test_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pytest
from kafka.protocol.admin import ApiVersionRequest, ApiVersionResponse

from common import send_req


def test_send_api_versions(kafka_port, hstream_kafka_port):
for v in range(0, 2):
req = ApiVersionRequest[v]()
kafka_resp = send_req(kafka_port, req)
hstream_kafka_resp = send_req(hstream_kafka_port, req)
assert isinstance(kafka_resp, ApiVersionResponse[v])
assert isinstance(hstream_kafka_resp, ApiVersionResponse[v])

0 comments on commit 8369261

Please sign in to comment.