From 1448e6c7d45d6387e53826d101282277d16fe698 Mon Sep 17 00:00:00 2001 From: Commelina Date: Thu, 9 Nov 2023 14:55:30 +0800 Subject: [PATCH] kafka: implement basic SASL auth --- hstream-kafka/HStream/Kafka/Network.hs | 87 +++++++++++---- .../HStream/Kafka/Server/Config/FromCli.hs | 6 + .../HStream/Kafka/Server/Config/FromJson.hs | 4 + .../HStream/Kafka/Server/Config/Types.hs | 5 + .../HStream/Kafka/Server/Handler.hsc | 28 ++++- .../HStream/Kafka/Server/Handler/Security.hs | 104 ++++++++++++++++++ hstream-kafka/hstream-kafka.cabal | 2 + .../message/SaslAuthenticateRequest.json | 29 +++++ .../message/SaslAuthenticateResponse.json | 34 ++++++ .../message/SaslHandshakeRequest.json | 31 ++++++ .../message/SaslHandshakeResponse.json | 32 ++++++ .../protocol/Kafka/Protocol/Message/Struct.hs | 64 +++++++++++ .../protocol/Kafka/Protocol/Message/Total.hs | 97 ++++++++++++++++ .../protocol/Kafka/Protocol/Service.hs | 11 +- hstream/app/kafka-server.hs | 12 +- script/kafka_gen.py | 1 + 16 files changed, 515 insertions(+), 32 deletions(-) create mode 100644 hstream-kafka/HStream/Kafka/Server/Handler/Security.hs create mode 100644 hstream-kafka/message/SaslAuthenticateRequest.json create mode 100644 hstream-kafka/message/SaslAuthenticateResponse.json create mode 100644 hstream-kafka/message/SaslHandshakeRequest.json create mode 100644 hstream-kafka/message/SaslHandshakeResponse.json diff --git a/hstream-kafka/HStream/Kafka/Network.hs b/hstream-kafka/HStream/Kafka/Network.hs index 7abe2c8ae..d87a040da 100644 --- a/hstream-kafka/HStream/Kafka/Network.hs +++ b/hstream-kafka/HStream/Kafka/Network.hs @@ -7,6 +7,7 @@ module HStream.Kafka.Network ( -- * Server ServerOptions (..) + , SaslOptions (..) , defaultServerOpts , runServer -- * Client @@ -18,24 +19,26 @@ module HStream.Kafka.Network ) where import Control.Concurrent -import qualified Control.Exception as E +import qualified Control.Exception as E import Control.Monad -import Data.ByteString (ByteString) -import qualified Data.ByteString as BS -import qualified Data.ByteString.Lazy as BSL +import Data.ByteString (ByteString) +import qualified Data.ByteString as BS +import qualified Data.ByteString.Lazy as BSL import Data.Int -import Data.List (find, intersperse) -import Data.Maybe (fromMaybe, isJust, - isNothing) -import qualified Network.Socket as N -import qualified Network.Socket.ByteString as N -import qualified Network.Socket.ByteString.Lazy as NL -import Numeric (showHex, showInt) +import Data.List (find, intersperse) +import Data.Maybe (fromMaybe, isJust, + isNothing) +import qualified Network.Socket as N +import qualified Network.Socket.ByteString as N +import qualified Network.Socket.ByteString.Lazy as NL +import Numeric (showHex, showInt) -import HStream.Kafka.Common.OffsetManager (initOffsetReader) -import HStream.Kafka.Server.Types (ServerContext (..)) -import qualified HStream.Logger as Log +import HStream.Kafka.Common.KafkaException (ErrorCodeException (..)) +import HStream.Kafka.Common.OffsetManager (initOffsetReader) +import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified HStream.Logger as Log import Kafka.Protocol.Encoding +import qualified Kafka.Protocol.Error as K import Kafka.Protocol.Message import Kafka.Protocol.Service @@ -45,11 +48,15 @@ import Kafka.Protocol.Service -- TODO data SslOptions +-- TODO +data SaslOptions = SaslOptions + data ServerOptions = ServerOptions - { serverHost :: !String - , serverPort :: !Int - , serverSslOptions :: !(Maybe SslOptions) - , serverOnStarted :: !(Maybe (IO ())) + { serverHost :: !String + , serverPort :: !Int + , serverSslOptions :: !(Maybe SslOptions) + , serverSaslOptions :: !(Maybe SaslOptions) + , serverOnStarted :: !(Maybe (IO ())) } defaultServerOpts :: ServerOptions @@ -57,6 +64,7 @@ defaultServerOpts = ServerOptions { serverHost = "0.0.0.0" , serverPort = 9092 , serverSslOptions = Nothing + , serverSaslOptions = Nothing , serverOnStarted = Nothing } @@ -66,34 +74,66 @@ runServer :: ServerOptions -> ServerContext -> (ServerContext -> [ServiceHandler]) + -> (ServerContext -> [ServiceHandler]) -> IO () -runServer opts sc mkHandlers = +runServer opts sc mkPreAuthedHandlers mkAuthedHandlers = startTCPServer opts $ \(s, peer) -> do -- Since the Reader is thread-unsafe, for each connection we create a new -- Reader. om <- initOffsetReader $ scOffsetManager sc let sc' = sc{scOffsetManager = om} i <- N.recv s 1024 - talk (peer, (mkHandlers sc')) i Nothing s + -- Decide if we require SASL authentication + case (serverSaslOptions opts) of + Nothing -> talk (peer, mkAuthedHandlers sc') i Nothing s + _ -> do i' <- authTalk (peer, (mkPreAuthedHandlers sc')) i Nothing s + talk (peer, mkAuthedHandlers sc') i' Nothing s where + authTalk _ "" _ _ = pure mempty -- client exit + authTalk !(peer, hds) i m_more s = do + reqBsResult <- case m_more of + Nothing -> runParser @ByteString get i + Just mf -> mf i + case reqBsResult of + Done "" reqBs -> do authed <- newEmptyMVar + respBs <- runHandler peer hds reqBs authed + NL.sendAll s respBs + tryTakeMVar authed >>= \case + Just True -> N.recv s 1024 + Just False -> E.throwIO $ ErrorCodeException K.SASL_AUTHENTICATION_FAILED + Nothing -> do msg <- N.recv s 1024 + authTalk (peer, hds) msg Nothing s + Done l reqBs -> do authed <- newEmptyMVar + respBs <- runHandler peer hds reqBs authed + NL.sendAll s respBs + tryTakeMVar authed >>= \case + Just True -> return l + Just False -> E.throwIO $ ErrorCodeException K.SASL_AUTHENTICATION_FAILED + Nothing -> authTalk (peer, hds) l Nothing s + More f -> do msg <- N.recv s 1024 + authTalk (peer, hds) msg (Just f) s + Fail _ err -> E.throwIO $ DecodeError $ "Fail, " <> err + talk _ "" _ _ = pure () -- client exit talk !(peer, hds) i m_more s = do reqBsResult <- case m_more of Nothing -> runParser @ByteString get i Just mf -> mf i case reqBsResult of - Done "" reqBs -> do respBs <- runHandler peer hds reqBs + Done "" reqBs -> do respBs <- runHandler peer hds reqBs undefined -- FIXME: unused 'authed' as 'undefined'. Better way? NL.sendAll s respBs msg <- N.recv s 1024 talk (peer, hds) msg Nothing s - Done l reqBs -> do respBs <- runHandler peer hds reqBs + Done l reqBs -> do respBs <- runHandler peer hds reqBs undefined -- FIXME: unused 'authed' as 'undefined'. Better way? NL.sendAll s respBs talk (peer, hds) l Nothing s More f -> do msg <- N.recv s 1024 talk (peer, hds) msg (Just f) s Fail _ err -> E.throwIO $ DecodeError $ "Fail, " <> err - runHandler peer handlers reqBs = do + -- 'authed :: MVar Bool'. Empty at the beginning and is only used by SASL auth. + -- FIXME: better way? + runHandler peer handlers reqBs authed = do headerResult <- runParser @RequestHeader get reqBs case headerResult of Done l RequestHeader{..} -> do @@ -112,6 +152,7 @@ runServer opts sc mkHandlers = RequestContext { clientId = requestClientId , clientHost = showSockAddrHost peer + , clientAuthDone = authed } resp <- rpcHandler' reqContext req Log.debug $ "Server response: " <> Log.buildString' resp diff --git a/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs b/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs index 589d66353..cee5bd12d 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs @@ -94,6 +94,8 @@ cliOptionsParser = do cliStoreCompression <- optional storeCompressionParser + cliEnableSaslAuth <- enableSaslAuthParser + return CliOptions{..} ------------------------------------------------------------------------------- @@ -249,6 +251,10 @@ storeConfigPathParser = strOption <> metavar "PATH" <> value "/data/store/logdevice.conf" <> help "Storage config path" +enableSaslAuthParser :: O.Parser Bool +enableSaslAuthParser = flag False True + $ long "enable-sasl" + <> help "Enable SASL authentication" ------------------------------------------------------------------------------- parserOpt :: (Text -> Either String a) -> O.Mod O.OptionFields a -> O.Parser a diff --git a/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs b/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs index 2a6c997ef..e2b23f3f7 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs @@ -96,6 +96,10 @@ parseJSONToOptions CliOptions{..} obj = do let !_securityProtocolMap = defaultProtocolMap tlsConfig let !_listenersSecurityProtocolMap = Map.union cliListenersSecurityProtocolMap nodeListenersSecurityProtocolMap + -- SASL config + nodeEnableSaslAuth <- nodeCfgObj .:? "enable-sasl" .!= False + let !_enableSaslAuth = cliEnableSaslAuth || nodeEnableSaslAuth + return ServerOpts {..} ------------------------------------------------------------------------------- diff --git a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs index 5ebbf7d1a..aa8fb49cb 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs @@ -71,6 +71,8 @@ data ServerOpts = ServerOpts , _ldConfigPath :: !CBytes , _compression :: !Compression + + , _enableSaslAuth :: !Bool } deriving (Show, Eq) ------------------------------------------------------------------------------- @@ -117,6 +119,9 @@ data CliOptions = CliOptions -- Internal options , cliStoreCompression :: !(Maybe Compression) + + -- SASL Authentication + , cliEnableSaslAuth :: !Bool } deriving Show ------------------------------------------------------------------------------- diff --git a/hstream-kafka/HStream/Kafka/Server/Handler.hsc b/hstream-kafka/HStream/Kafka/Server/Handler.hsc index 5c514e449..78e3e4b52 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler.hsc +++ b/hstream-kafka/HStream/Kafka/Server/Handler.hsc @@ -1,17 +1,21 @@ {-# LANGUAGE CPP #-} {-# LANGUAGE DataKinds #-} -module HStream.Kafka.Server.Handler (handlers) where +module HStream.Kafka.Server.Handler + ( handlers + , unAuthedHandlers + ) where import HStream.Kafka.Server.Handler.Basic import HStream.Kafka.Server.Handler.Consume import HStream.Kafka.Server.Handler.Group import HStream.Kafka.Server.Handler.Offset import HStream.Kafka.Server.Handler.Produce +import HStream.Kafka.Server.Handler.Security import HStream.Kafka.Server.Handler.Topic -import HStream.Kafka.Server.Types (ServerContext (..)) -import qualified Kafka.Protocol.Message as K -import qualified Kafka.Protocol.Service as K +import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified Kafka.Protocol.Message as K +import qualified Kafka.Protocol.Service as K ------------------------------------------------------------------------------- @@ -54,6 +58,9 @@ import qualified Kafka.Protocol.Service as K #cv_handler ApiVersions, 0, 3 #cv_handler Fetch, 0, 2 +#cv_handler SaslHandshake, 0, 1 +#cv_handler SaslAuthenticate, 0, 0 + handlers :: ServerContext -> [K.ServiceHandler] handlers sc = [ #mk_handler ApiVersions, 0, 3 @@ -91,4 +98,17 @@ handlers sc = , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "heartbeat") (handleHeartbeatV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "listGroups") (handleListGroupsV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "describeGroups") (handleDescribeGroupsV0 sc) + + , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "saslHandshake") (handleAfterAuthSaslHandshakeV0 sc) + , K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "saslHandshake") (handleAfterAuthSaslHandshakeV1 sc) + + , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "saslAuthenticate") (handleAfterAuthSaslAuthenticateV0 sc) + ] + +unAuthedHandlers :: ServerContext -> [K.ServiceHandler] +unAuthedHandlers sc = + [ #mk_handler ApiVersions, 0, 3 + + , #mk_handler SaslHandshake, 0, 1 + , #mk_handler SaslAuthenticate, 0, 0 ] diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Security.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Security.hs new file mode 100644 index 000000000..753803f2d --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Security.hs @@ -0,0 +1,104 @@ +module HStream.Kafka.Server.Handler.Security + ( handleSaslHandshake + , handleSaslAuthenticate + + , handleAfterAuthSaslHandshakeV0 + , handleAfterAuthSaslHandshakeV1 + , handleAfterAuthSaslAuthenticateV0 + ) where + +import Control.Concurrent.MVar +import Control.Monad.IO.Class (liftIO) +import Data.ByteString (ByteString) +import qualified Data.Text as T +import qualified Data.Text.Encoding as T +import qualified Data.Vector as V +import Network.Protocol.SASL.GNU + +import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified HStream.Logger as Log +import qualified Kafka.Protocol.Encoding as K +import qualified Kafka.Protocol.Error as K +import qualified Kafka.Protocol.Message as K +import qualified Kafka.Protocol.Service as K + +------------------------------------------------------------------------------- +saslPlainCallback :: Property -> Session Progress +saslPlainCallback PropertyAuthID = do + authID <- getProperty PropertyAuthID + liftIO . Log.debug $ "SASL PLAIN invoke callback with PropertyAuthID. I got " <> Log.buildString' authID + return Complete +saslPlainCallback PropertyPassword = do + password <- getProperty PropertyPassword + liftIO . Log.debug $ "SASL PLAIN invoke callback with PropertyPassword. I got" <> Log.buildString' password + return Complete +saslPlainCallback PropertyAuthzID = do + authzID <- getProperty PropertyAuthzID + liftIO . Log.debug $ "SASL PLAIN invoke callback with PropertyAuthzID. I got" <> Log.buildString' authzID + return Complete +saslPlainCallback ValidateSimple = do + liftIO . Log.debug $ "SASL PLAIN invoke callback with ValidateSimple..." + authID <- getProperty PropertyAuthID + password <- getProperty PropertyPassword + if authID == Just "admin" && password == Just "passwd" -- FIXME: do actual check + then return Complete + else throw AuthenticationError +saslPlainCallback prop = do + liftIO . Log.warning $ "SASL PLAIN invoke callback with " <> Log.buildString' prop <> ". But I do not know how to handle it..." + return Complete + +saslPlainSession :: ByteString -> Session ByteString +saslPlainSession input = do + mechanism <- mechanismName + liftIO . Log.debug $ "SASL: I am using " <> Log.buildString' mechanism + liftIO . Log.debug $ "SASL PLAIN: I got C: " <> Log.build input + (serverMsg, prog) <- step input + case prog of + Complete -> do + liftIO . Log.debug $ "SASL PLAIN: Complete. S: " <> Log.build serverMsg + return serverMsg + NeedsMore -> do + liftIO . Log.warning $ "SASL PLAIN: I need more... But why? S: " <> Log.build serverMsg + throw AuthenticationError + +saslPlain :: ByteString -> SASL (Either Error ByteString) +saslPlain input = do + setCallback saslPlainCallback + runServer (Mechanism "PLAIN") (saslPlainSession input) + +------------------------------------------------------------------------------- +handleSaslHandshake :: ServerContext -> K.RequestContext -> K.SaslHandshakeRequest -> IO K.SaslHandshakeResponse +handleSaslHandshake _ _ K.SaslHandshakeRequest{..} = do + let reqMechanism = Mechanism (T.encodeUtf8 mechanism) + isMechSupported <- runSASL (serverSupports reqMechanism) + if isMechSupported then do + Log.debug $ "SASL: client requests " <> Log.buildString' mechanism + return $ K.SaslHandshakeResponse K.NONE (K.KaArray $ Just (V.singleton "PLAIN")) + else do + Log.warning $ "SASL: client requests " <> Log.buildString' mechanism <> ", but I do not support it..." + return $ K.SaslHandshakeResponse K.UNSUPPORTED_SASL_MECHANISM (K.KaArray $ Just (V.singleton "PLAIN")) + +handleSaslAuthenticate :: ServerContext -> K.RequestContext -> K.SaslAuthenticateRequest -> IO K.SaslAuthenticateResponse +handleSaslAuthenticate _ reqCtx K.SaslAuthenticateRequest{..} = do + respBytes_e <- runSASL (saslPlain authBytes) + case respBytes_e of + Left err -> do + Log.warning $ "SASL: auth failed, " <> Log.buildString' err + putMVar (K.clientAuthDone reqCtx) False + return $ K.SaslAuthenticateResponse K.SASL_AUTHENTICATION_FAILED (Just . T.pack $ show err) mempty + Right respBytes -> do + putMVar (K.clientAuthDone reqCtx) True + return $ K.SaslAuthenticateResponse K.NONE mempty respBytes + +------------------------------------------------------------------------------- +handleAfterAuthSaslHandshakeV0 :: ServerContext -> K.RequestContext -> K.SaslHandshakeRequestV0 -> IO K.SaslHandshakeResponseV0 +handleAfterAuthSaslHandshakeV0 _ _ _ = return $ K.SaslHandshakeResponseV0 K.ILLEGAL_SASL_STATE (K.KaArray Nothing) + +handleAfterAuthSaslHandshakeV1 :: ServerContext -> K.RequestContext -> K.SaslHandshakeRequestV1 -> IO K.SaslHandshakeResponseV1 +handleAfterAuthSaslHandshakeV1 = handleAfterAuthSaslHandshakeV0 + +handleAfterAuthSaslAuthenticateV0 :: ServerContext -> K.RequestContext -> K.SaslAuthenticateRequestV0 -> IO K.SaslAuthenticateResponseV0 +handleAfterAuthSaslAuthenticateV0 _ _ _ = + return $ K.SaslAuthenticateResponseV0 K.ILLEGAL_SASL_STATE + (Just "SaslAuthenticate request received after successful authentication") + mempty diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index 327ada5b4..5b0288008 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -130,6 +130,7 @@ library HStream.Kafka.Server.Handler.Group HStream.Kafka.Server.Handler.Offset HStream.Kafka.Server.Handler.Produce + HStream.Kafka.Server.Handler.Security HStream.Kafka.Server.Handler.Topic cxx-sources: cbits/hs_kafka_client.cpp @@ -145,6 +146,7 @@ library , containers , directory , foreign + , gsasl >=0.3.0 , hashable , hashtables , haskeline diff --git a/hstream-kafka/message/SaslAuthenticateRequest.json b/hstream-kafka/message/SaslAuthenticateRequest.json new file mode 100644 index 000000000..3f5558b81 --- /dev/null +++ b/hstream-kafka/message/SaslAuthenticateRequest.json @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 36, + "type": "request", + "listeners": ["zkBroker", "broker", "controller"], + "name": "SaslAuthenticateRequest", + // Version 1 is the same as version 0. + // Version 2 adds flexible version support + "validVersions": "0-2", + "flexibleVersions": "2+", + "fields": [ + { "name": "AuthBytes", "type": "bytes", "versions": "0+", + "about": "The SASL authentication bytes from the client, as defined by the SASL mechanism." } + ] +} diff --git a/hstream-kafka/message/SaslAuthenticateResponse.json b/hstream-kafka/message/SaslAuthenticateResponse.json new file mode 100644 index 000000000..0e26a51e8 --- /dev/null +++ b/hstream-kafka/message/SaslAuthenticateResponse.json @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 36, + "type": "response", + "name": "SaslAuthenticateResponse", + // Version 1 adds the session lifetime. + // Version 2 adds flexible version support + "validVersions": "0-2", + "flexibleVersions": "2+", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The error message, or null if there was no error." }, + { "name": "AuthBytes", "type": "bytes", "versions": "0+", + "about": "The SASL authentication bytes from the server, as defined by the SASL mechanism." }, + { "name": "SessionLifetimeMs", "type": "int64", "versions": "1+", "default": "0", "ignorable": true, + "about": "The SASL authentication bytes from the server, as defined by the SASL mechanism." } + ] +} diff --git a/hstream-kafka/message/SaslHandshakeRequest.json b/hstream-kafka/message/SaslHandshakeRequest.json new file mode 100644 index 000000000..a370a80df --- /dev/null +++ b/hstream-kafka/message/SaslHandshakeRequest.json @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 17, + "type": "request", + "listeners": ["zkBroker", "broker", "controller"], + "name": "SaslHandshakeRequest", + // Version 1 supports SASL_AUTHENTICATE. + // NOTE: Version cannot be easily bumped due to incorrect + // client negotiation for clients <= 2.4. + // See https://issues.apache.org/jira/browse/KAFKA-9577 + "validVersions": "0-1", + "flexibleVersions": "none", + "fields": [ + { "name": "Mechanism", "type": "string", "versions": "0+", + "about": "The SASL mechanism chosen by the client." } + ] +} diff --git a/hstream-kafka/message/SaslHandshakeResponse.json b/hstream-kafka/message/SaslHandshakeResponse.json new file mode 100644 index 000000000..a1567c669 --- /dev/null +++ b/hstream-kafka/message/SaslHandshakeResponse.json @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 17, + "type": "response", + "name": "SaslHandshakeResponse", + // Version 1 is the same as version 0. + // NOTE: Version cannot be easily bumped due to incorrect + // client negotiation for clients <= 2.4. + // See https://issues.apache.org/jira/browse/KAFKA-9577 + "validVersions": "0-1", + "flexibleVersions": "none", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error." }, + { "name": "Mechanisms", "type": "[]string", "versions": "0+", + "about": "The mechanisms enabled in the server." } + ] +} diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs index ab33035a4..9cf147835 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs @@ -942,6 +942,39 @@ data ProduceResponseV2 = ProduceResponseV2 } deriving (Show, Eq, Generic) instance Serializable ProduceResponseV2 +newtype SaslAuthenticateRequestV0 = SaslAuthenticateRequestV0 + { authBytes :: ByteString + } deriving (Show, Eq, Generic) +instance Serializable SaslAuthenticateRequestV0 + +data SaslAuthenticateResponseV0 = SaslAuthenticateResponseV0 + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + , errorMessage :: !NullableString + -- ^ The error message, or null if there was no error. + , authBytes :: !ByteString + -- ^ The SASL authentication bytes from the server, as defined by the SASL + -- mechanism. + } deriving (Show, Eq, Generic) +instance Serializable SaslAuthenticateResponseV0 + +newtype SaslHandshakeRequestV0 = SaslHandshakeRequestV0 + { mechanism :: Text + } deriving (Show, Eq, Generic) +instance Serializable SaslHandshakeRequestV0 + +type SaslHandshakeRequestV1 = SaslHandshakeRequestV0 + +data SaslHandshakeResponseV0 = SaslHandshakeResponseV0 + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + , mechanisms :: !(KaArray Text) + -- ^ The mechanisms enabled in the server. + } deriving (Show, Eq, Generic) +instance Serializable SaslHandshakeResponseV0 + +type SaslHandshakeResponseV1 = SaslHandshakeResponseV0 + data SyncGroupRequestV0 = SyncGroupRequestV0 { groupId :: !Text -- ^ The unique group identifier. @@ -981,9 +1014,11 @@ instance Service HStreamKafkaV0 where , "syncGroup" , "describeGroups" , "listGroups" + , "saslHandshake" , "apiVersions" , "createTopics" , "deleteTopics" + , "saslAuthenticate" ] instance HasMethodImpl HStreamKafkaV0 "fetch" where @@ -1070,6 +1105,13 @@ instance HasMethodImpl HStreamKafkaV0 "listGroups" where type MethodInput HStreamKafkaV0 "listGroups" = ListGroupsRequestV0 type MethodOutput HStreamKafkaV0 "listGroups" = ListGroupsResponseV0 +instance HasMethodImpl HStreamKafkaV0 "saslHandshake" where + type MethodName HStreamKafkaV0 "saslHandshake" = "saslHandshake" + type MethodKey HStreamKafkaV0 "saslHandshake" = 17 + type MethodVersion HStreamKafkaV0 "saslHandshake" = 0 + type MethodInput HStreamKafkaV0 "saslHandshake" = SaslHandshakeRequestV0 + type MethodOutput HStreamKafkaV0 "saslHandshake" = SaslHandshakeResponseV0 + instance HasMethodImpl HStreamKafkaV0 "apiVersions" where type MethodName HStreamKafkaV0 "apiVersions" = "apiVersions" type MethodKey HStreamKafkaV0 "apiVersions" = 18 @@ -1091,6 +1133,13 @@ instance HasMethodImpl HStreamKafkaV0 "deleteTopics" where type MethodInput HStreamKafkaV0 "deleteTopics" = DeleteTopicsRequestV0 type MethodOutput HStreamKafkaV0 "deleteTopics" = DeleteTopicsResponseV0 +instance HasMethodImpl HStreamKafkaV0 "saslAuthenticate" where + type MethodName HStreamKafkaV0 "saslAuthenticate" = "saslAuthenticate" + type MethodKey HStreamKafkaV0 "saslAuthenticate" = 36 + type MethodVersion HStreamKafkaV0 "saslAuthenticate" = 0 + type MethodInput HStreamKafkaV0 "saslAuthenticate" = SaslAuthenticateRequestV0 + type MethodOutput HStreamKafkaV0 "saslAuthenticate" = SaslAuthenticateResponseV0 + data HStreamKafkaV1 instance Service HStreamKafkaV1 where @@ -1101,6 +1150,7 @@ instance Service HStreamKafkaV1 where , "metadata" , "offsetCommit" , "offsetFetch" + , "saslHandshake" , "apiVersions" ] @@ -1139,6 +1189,13 @@ instance HasMethodImpl HStreamKafkaV1 "offsetFetch" where type MethodInput HStreamKafkaV1 "offsetFetch" = OffsetFetchRequestV1 type MethodOutput HStreamKafkaV1 "offsetFetch" = OffsetFetchResponseV1 +instance HasMethodImpl HStreamKafkaV1 "saslHandshake" where + type MethodName HStreamKafkaV1 "saslHandshake" = "saslHandshake" + type MethodKey HStreamKafkaV1 "saslHandshake" = 17 + type MethodVersion HStreamKafkaV1 "saslHandshake" = 1 + type MethodInput HStreamKafkaV1 "saslHandshake" = SaslHandshakeRequestV1 + type MethodOutput HStreamKafkaV1 "saslHandshake" = SaslHandshakeResponseV1 + instance HasMethodImpl HStreamKafkaV1 "apiVersions" where type MethodName HStreamKafkaV1 "apiVersions" = "apiVersions" type MethodKey HStreamKafkaV1 "apiVersions" = 18 @@ -1258,9 +1315,11 @@ instance Show ApiKey where show (ApiKey 14) = "SyncGroup(14)" show (ApiKey 15) = "DescribeGroups(15)" show (ApiKey 16) = "ListGroups(16)" + show (ApiKey 17) = "SaslHandshake(17)" show (ApiKey 18) = "ApiVersions(18)" show (ApiKey 19) = "CreateTopics(19)" show (ApiKey 20) = "DeleteTopics(20)" + show (ApiKey 36) = "SaslAuthenticate(36)" show (ApiKey n) = "Unknown " <> show n supportedApiVersions :: [ApiVersionV0] @@ -1278,9 +1337,11 @@ supportedApiVersions = , ApiVersionV0 (ApiKey 14) 0 0 , ApiVersionV0 (ApiKey 15) 0 0 , ApiVersionV0 (ApiKey 16) 0 0 + , ApiVersionV0 (ApiKey 17) 0 1 , ApiVersionV0 (ApiKey 18) 0 3 , ApiVersionV0 (ApiKey 19) 0 0 , ApiVersionV0 (ApiKey 20) 0 0 + , ApiVersionV0 (ApiKey 36) 0 0 ] getHeaderVersion :: ApiKey -> Int16 -> (Int16, Int16) @@ -1308,11 +1369,14 @@ getHeaderVersion (ApiKey 13) 0 = (1, 0) getHeaderVersion (ApiKey 14) 0 = (1, 0) getHeaderVersion (ApiKey 15) 0 = (1, 0) getHeaderVersion (ApiKey 16) 0 = (1, 0) +getHeaderVersion (ApiKey 17) 0 = (1, 0) +getHeaderVersion (ApiKey 17) 1 = (1, 0) getHeaderVersion (ApiKey 18) 0 = (1, 0) getHeaderVersion (ApiKey 18) 1 = (1, 0) getHeaderVersion (ApiKey 18) 2 = (1, 0) getHeaderVersion (ApiKey 18) 3 = (2, 0) getHeaderVersion (ApiKey 19) 0 = (1, 0) getHeaderVersion (ApiKey 20) 0 = (1, 0) +getHeaderVersion (ApiKey 36) 0 = (1, 0) getHeaderVersion k v = error $ "Unknown " <> show k <> " v" <> show v {-# INLINE getHeaderVersion #-} diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs index 714b64b82..5f85d9406 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs @@ -2013,6 +2013,89 @@ produceResponseFromV2 x = ProduceResponse , throttleTimeMs = x.throttleTimeMs } +newtype SaslAuthenticateRequest = SaslAuthenticateRequest + { authBytes :: ByteString + } deriving (Show, Eq, Generic) +instance Serializable SaslAuthenticateRequest + +saslAuthenticateRequestToV0 :: SaslAuthenticateRequest -> SaslAuthenticateRequestV0 +saslAuthenticateRequestToV0 x = SaslAuthenticateRequestV0 + { authBytes = x.authBytes + } + +saslAuthenticateRequestFromV0 :: SaslAuthenticateRequestV0 -> SaslAuthenticateRequest +saslAuthenticateRequestFromV0 x = SaslAuthenticateRequest + { authBytes = x.authBytes + } + +data SaslAuthenticateResponse = SaslAuthenticateResponse + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + , errorMessage :: !NullableString + -- ^ The error message, or null if there was no error. + , authBytes :: !ByteString + -- ^ The SASL authentication bytes from the server, as defined by the SASL + -- mechanism. + } deriving (Show, Eq, Generic) +instance Serializable SaslAuthenticateResponse + +saslAuthenticateResponseToV0 :: SaslAuthenticateResponse -> SaslAuthenticateResponseV0 +saslAuthenticateResponseToV0 x = SaslAuthenticateResponseV0 + { errorCode = x.errorCode + , errorMessage = x.errorMessage + , authBytes = x.authBytes + } + +saslAuthenticateResponseFromV0 :: SaslAuthenticateResponseV0 -> SaslAuthenticateResponse +saslAuthenticateResponseFromV0 x = SaslAuthenticateResponse + { errorCode = x.errorCode + , errorMessage = x.errorMessage + , authBytes = x.authBytes + } + +newtype SaslHandshakeRequest = SaslHandshakeRequest + { mechanism :: Text + } deriving (Show, Eq, Generic) +instance Serializable SaslHandshakeRequest + +saslHandshakeRequestToV0 :: SaslHandshakeRequest -> SaslHandshakeRequestV0 +saslHandshakeRequestToV0 x = SaslHandshakeRequestV0 + { mechanism = x.mechanism + } +saslHandshakeRequestToV1 :: SaslHandshakeRequest -> SaslHandshakeRequestV1 +saslHandshakeRequestToV1 = saslHandshakeRequestToV0 + +saslHandshakeRequestFromV0 :: SaslHandshakeRequestV0 -> SaslHandshakeRequest +saslHandshakeRequestFromV0 x = SaslHandshakeRequest + { mechanism = x.mechanism + } +saslHandshakeRequestFromV1 :: SaslHandshakeRequestV1 -> SaslHandshakeRequest +saslHandshakeRequestFromV1 = saslHandshakeRequestFromV0 + +data SaslHandshakeResponse = SaslHandshakeResponse + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + , mechanisms :: !(KaArray Text) + -- ^ The mechanisms enabled in the server. + } deriving (Show, Eq, Generic) +instance Serializable SaslHandshakeResponse + +saslHandshakeResponseToV0 :: SaslHandshakeResponse -> SaslHandshakeResponseV0 +saslHandshakeResponseToV0 x = SaslHandshakeResponseV0 + { errorCode = x.errorCode + , mechanisms = x.mechanisms + } +saslHandshakeResponseToV1 :: SaslHandshakeResponse -> SaslHandshakeResponseV1 +saslHandshakeResponseToV1 = saslHandshakeResponseToV0 + +saslHandshakeResponseFromV0 :: SaslHandshakeResponseV0 -> SaslHandshakeResponse +saslHandshakeResponseFromV0 x = SaslHandshakeResponse + { errorCode = x.errorCode + , mechanisms = x.mechanisms + } +saslHandshakeResponseFromV1 :: SaslHandshakeResponseV1 -> SaslHandshakeResponse +saslHandshakeResponseFromV1 = saslHandshakeResponseFromV0 + data SyncGroupRequest = SyncGroupRequest { groupId :: !Text -- ^ The unique group identifier. @@ -2168,6 +2251,20 @@ instance Exception ProduceResponseEx catchProduceResponseEx :: IO ProduceResponse -> IO ProduceResponse catchProduceResponseEx act = act `catch` \(ProduceResponseEx resp) -> pure resp +newtype SaslAuthenticateResponseEx = SaslAuthenticateResponseEx SaslAuthenticateResponse + deriving (Show, Eq) +instance Exception SaslAuthenticateResponseEx + +catchSaslAuthenticateResponseEx :: IO SaslAuthenticateResponse -> IO SaslAuthenticateResponse +catchSaslAuthenticateResponseEx act = act `catch` \(SaslAuthenticateResponseEx resp) -> pure resp + +newtype SaslHandshakeResponseEx = SaslHandshakeResponseEx SaslHandshakeResponse + deriving (Show, Eq) +instance Exception SaslHandshakeResponseEx + +catchSaslHandshakeResponseEx :: IO SaslHandshakeResponse -> IO SaslHandshakeResponse +catchSaslHandshakeResponseEx act = act `catch` \(SaslHandshakeResponseEx resp) -> pure resp + newtype SyncGroupResponseEx = SyncGroupResponseEx SyncGroupResponse deriving (Show, Eq) instance Exception SyncGroupResponseEx diff --git a/hstream-kafka/protocol/Kafka/Protocol/Service.hs b/hstream-kafka/protocol/Kafka/Protocol/Service.hs index 2f8e88079..2d2e1edbc 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Service.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Service.hs @@ -21,6 +21,7 @@ module Kafka.Protocol.Service import Data.Int import Data.Kind (Constraint, Type) import Data.Proxy (Proxy (..)) +import GHC.MVar (MVar) import GHC.TypeLits import Kafka.Protocol.Encoding (NullableString, Serializable) @@ -28,9 +29,13 @@ import Kafka.Protocol.Encoding (NullableString, Serializable) ------------------------------------------------------------------------------- data RequestContext = RequestContext - { clientId :: !(Maybe NullableString) - , clientHost :: !String - } deriving (Show, Eq) + { clientId :: !(Maybe NullableString) + , clientHost :: !String + , clientAuthDone :: MVar Bool + -- Only used during SASL authentication. + -- Init with Empty. Then True: success, False: fail. + -- FIXME: better mechanism to notify auth result? + } deriving (Eq) type UnaryHandler i o = RequestContext -> i -> IO o diff --git a/hstream/app/kafka-server.hs b/hstream/app/kafka-server.hs index 90049e681..aa01c0396 100644 --- a/hstream/app/kafka-server.hs +++ b/hstream/app/kafka-server.hs @@ -105,9 +105,11 @@ app config@ServerOpts{..} = do void . forkIO $ updateHashRing gossipContext (loadBalanceHashRing serverContext) -- TODO: support tls (_tlsConfig) + -- TODO: support SASL options let netOpts = K.defaultServerOpts { K.serverHost = T.unpack $ decodeUtf8 _serverHost , K.serverPort = fromIntegral _serverPort + , K.serverSaslOptions = if _enableSaslAuth then Just K.SaslOptions else Nothing } Async.withAsync (serve serverContext netOpts) $ \a -> do -- start gossip @@ -157,8 +159,9 @@ serve sc@ServerContext{..} netOpts = do let netOpts' = netOpts{ K.serverOnStarted = Just serverOnStarted} Log.info $ "Starting" <> if isJust (K.serverSslOptions netOpts') then " secure " else " insecure " + <> (if isJust (K.serverSaslOptions netOpts') then "SASL " else "") <> "kafka server..." - K.runServer netOpts' sc K.handlers + K.runServer netOpts' sc K.unAuthedHandlers K.handlers serveListeners :: ServerContext @@ -179,17 +182,22 @@ serveListeners sc netOpts -- newSslOpts <- mapM readTlsPemFile $ -- join ((`Map.lookup` securityMap) =<< Map.lookup key listenerSecurityMap) let newSslOpts = Nothing + + -- TODO: sasl + let newSaslOpts = Nothing let netOpts' = netOpts{ K.serverPort = fromIntegral listenerPort , K.serverOnStarted = Just listenerOnStarted , K.serverSslOptions = newSslOpts + , K.serverSaslOptions = newSaslOpts } Log.info $ "Starting" <> (if isJust (K.serverSslOptions netOpts') then " secure " else " insecure ") + <> (if isJust (K.serverSaslOptions netOpts') then "SASL " else "") <> "advertised listener: " <> Log.build key <> ":" <> Log.build listenerAddress <> ":" <> Log.build listenerPort - K.runServer netOpts' sc' K.handlers + K.runServer netOpts' sc' K.unAuthedHandlers K.handlers ------------------------------------------------------------------------------- diff --git a/script/kafka_gen.py b/script/kafka_gen.py index 3048c05f7..e757d1445 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -114,6 +114,7 @@ def get_field_default(field_type, default=None): "OffsetFetch": (0, 2), "OffsetCommit": (0, 2), "ListOffsets": (0, 1), + "SaslHandshake": (0, 1), } # -----------------------------------------------------------------------------