From 83692613609fed88e0972fafc968c1dbfdfd053c Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Wed, 8 Nov 2023 22:54:01 -0800 Subject: [PATCH] kafka: using uniform handleApiVersions (#1675) --- .../Kafka/Server/{Handler.hs => Handler.hsc} | 44 +++++++++++++++++-- .../HStream/Kafka/Server/Handler/Basic.hs | 42 +++++------------- hstream-kafka/hstream-kafka.cabal | 1 + hstream-kafka/tests/blackbox/test_basic.py | 13 ++++++ 4 files changed, 66 insertions(+), 34 deletions(-) rename hstream-kafka/HStream/Kafka/Server/{Handler.hs => Handler.hsc} (52%) create mode 100644 hstream-kafka/tests/blackbox/test_basic.py diff --git a/hstream-kafka/HStream/Kafka/Server/Handler.hs b/hstream-kafka/HStream/Kafka/Server/Handler.hsc similarity index 52% rename from hstream-kafka/HStream/Kafka/Server/Handler.hs rename to hstream-kafka/HStream/Kafka/Server/Handler.hsc index f1dc0fdab..719aaf131 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler.hsc @@ -13,12 +13,48 @@ import HStream.Kafka.Server.Types (ServerContext (..)) import qualified Kafka.Protocol.Message as K import qualified Kafka.Protocol.Service as K +------------------------------------------------------------------------------- + +#define hsc_lowerfirst(x) \ + { \ + const char* s = (x); \ + hsc_putchar(hsc_tolower(*s)); \ + hsc_printf("%s", ++s); \ + } + +#define hsc_cv_handler(key, start, end) \ + { \ + for (int i = start; i <= end; i++) { \ + hsc_printf("handle%sV%d :: K.RequestContext -> K.%sRequestV%d -> IO " \ + "K.%sResponseV%d \n", \ + #key, i, #key, i, #key, i); \ + hsc_printf("handle%sV%d ctx req = K.", #key, i); \ + hsc_lowerfirst(#key); \ + hsc_printf("ResponseToV%d <$> handle%s ctx (K.", i, #key); \ + hsc_lowerfirst(#key); \ + hsc_printf("RequestFromV%d req)\n", i); \ + } \ + } + +#define hsc_mk_handler(key, start, end) \ + { \ + for (int i = start; i <= end; i++) { \ + if (i != start) { \ + hsc_printf(" , "); \ + } \ + hsc_printf("K.hd (K.RPC :: K.RPC K.HStreamKafkaV%d \"", i); \ + hsc_lowerfirst(#key); \ + hsc_printf("\") handle%sV%d\n", #key, i); \ + } \ + } + +------------------------------------------------------------------------------- + +#cv_handler ApiVersions, 0, 3 + handlers :: ServerContext -> [K.ServiceHandler] handlers sc = - [ K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "apiVersions") handleApiversionsV0 - , K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "apiVersions") handleApiversionsV1 - , K.hd (K.RPC :: K.RPC K.HStreamKafkaV2 "apiVersions") handleApiversionsV2 - , K.hd (K.RPC :: K.RPC K.HStreamKafkaV3 "apiVersions") handleApiversionsV3 + [ #mk_handler ApiVersions, 0, 3 , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "metadata") (handleMetadataV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "metadata") (handleMetadataV1 sc) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs index 5e3d739ae..4f289e4eb 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs @@ -3,10 +3,7 @@ module HStream.Kafka.Server.Handler.Basic ( -- 18: ApiVersions - handleApiversionsV0 - , handleApiversionsV1 - , handleApiversionsV2 - , handleApiversionsV3 + handleApiVersions -- 3: Metadata , handleMetadataV0 , handleMetadataV1 @@ -39,32 +36,17 @@ import qualified Kafka.Protocol.Service as K -------------------- -- 18: ApiVersions -------------------- -handleApiversionsV0 - :: K.RequestContext -> K.ApiVersionsRequestV0 -> IO K.ApiVersionsResponseV0 -handleApiversionsV0 _ _ = do - let apiKeys = K.KaArray $ Just $ V.fromList K.supportedApiVersions - pure $ K.ApiVersionsResponseV0 K.NONE apiKeys - -handleApiversionsV1 - :: K.RequestContext -> K.ApiVersionsRequestV1 -> IO K.ApiVersionsResponseV1 -handleApiversionsV1 _ _ = do - let apiKeys = K.KaArray $ Just $ V.fromList K.supportedApiVersions - pure $ K.ApiVersionsResponseV1 K.NONE apiKeys 0{- throttle_time_ms -} - -handleApiversionsV2 - :: K.RequestContext -> K.ApiVersionsRequestV2 -> IO K.ApiVersionsResponseV2 -handleApiversionsV2 = handleApiversionsV1 - -handleApiversionsV3 - :: K.RequestContext -> K.ApiVersionsRequestV3 -> IO K.ApiVersionsResponseV3 -handleApiversionsV3 _ req = do - let apiKeys = K.CompactKaArray + +handleApiVersions + :: K.RequestContext -> K.ApiVersionsRequest -> IO K.ApiVersionsResponse +handleApiVersions _ _ = do + let apiKeys = K.KaArray . Just - . (V.map apiVersionV0ToV3) + . (V.map apiVersionV0To) . V.fromList $ K.supportedApiVersions - pure $ K.ApiVersionsResponseV3 K.NONE apiKeys 0{- throttle_time_ms -} - K.EmptyTaggedFields + pure $ K.ApiVersionsResponse K.NONE apiKeys 0{- throttle_time_ms -} + K.EmptyTaggedFields -------------------- -- 3: Metadata @@ -216,6 +198,6 @@ handleMetadataV4 ctx@ServerContext{..} _ req = do ------------------------------------------------------------------------------- -apiVersionV0ToV3 :: K.ApiVersionV0 -> K.ApiVersionV3 -apiVersionV0ToV3 K.ApiVersionV0{..} = - let taggedFields = K.EmptyTaggedFields in K.ApiVersionV3{..} +apiVersionV0To :: K.ApiVersionV0 -> K.ApiVersion +apiVersionV0To K.ApiVersionV0{..} = + let taggedFields = K.EmptyTaggedFields in K.ApiVersion{..} diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index 61b5930aa..327ada5b4 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -136,6 +136,7 @@ library cxx-options: -std=c++17 extra-libraries: rdkafka++ hs-source-dirs: . + build-tool-depends: hsc2hs:hsc2hs build-depends: , attoparsec , base >=4.11 && <5 diff --git a/hstream-kafka/tests/blackbox/test_basic.py b/hstream-kafka/tests/blackbox/test_basic.py new file mode 100644 index 000000000..886412d3d --- /dev/null +++ b/hstream-kafka/tests/blackbox/test_basic.py @@ -0,0 +1,13 @@ +import pytest +from kafka.protocol.admin import ApiVersionRequest, ApiVersionResponse + +from common import send_req + + +def test_send_api_versions(kafka_port, hstream_kafka_port): + for v in range(0, 2): + req = ApiVersionRequest[v]() + kafka_resp = send_req(kafka_port, req) + hstream_kafka_resp = send_req(hstream_kafka_port, req) + assert isinstance(kafka_resp, ApiVersionResponse[v]) + assert isinstance(hstream_kafka_resp, ApiVersionResponse[v])