Skip to content

Commit

Permalink
kafka: do authorization on find coordinator (10) (#1778)
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina authored Mar 13, 2024
1 parent 18948be commit 8daf2e4
Showing 1 changed file with 35 additions and 19 deletions.
54 changes: 35 additions & 19 deletions hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -292,26 +292,42 @@ data CoordinatorType
| TRANSACTION
deriving (Enum, Eq)

-- FIXME: This function does not catch any Kafka ErrorCodeException.
-- Is this proper?
handleFindCoordinator :: ServerContext -> K.RequestContext -> K.FindCoordinatorRequest -> IO K.FindCoordinatorResponse
handleFindCoordinator ServerContext{..} _ req = do
handleFindCoordinator ServerContext{..} reqCtx req = do
case toEnum (fromIntegral req.keyType) of
GROUP -> do
A.ServerNode{..} <- lookupKafkaPersist metaHandle gossipContext loadBalanceHashRing scAdvertisedListenersKey (KafkaResGroup req.key)
Log.info $ "findCoordinator for group:" <> Log.buildString' req.key <> ", result:" <> Log.buildString' serverNodeId
return $ K.FindCoordinatorResponse {
errorMessage=Nothing
, nodeId=fromIntegral serverNodeId
, errorCode=0
, throttleTimeMs=0
, port=fromIntegral serverNodePort
, host=serverNodeHost
}
-- [ACL] check [DESCRIBE GROUP]
K.simpleAuthorize (K.toAuthorizableReqCtx reqCtx) authorizer K.Res_GROUP req.key K.AclOp_DESCRIBE >>= \case
True -> do
A.ServerNode{..} <- lookupKafkaPersist metaHandle gossipContext loadBalanceHashRing scAdvertisedListenersKey (KafkaResGroup req.key)
Log.info $ "findCoordinator for group:" <> Log.buildString' req.key <> ", result:" <> Log.buildString' serverNodeId
return $ K.FindCoordinatorResponse {
errorMessage=Nothing
, nodeId=fromIntegral serverNodeId
, errorCode=0
, throttleTimeMs=0
, port=fromIntegral serverNodePort
, host=serverNodeHost
}
-- Note: About kafka's error message, see org.apache.kafka.common.protocol.Errors
False -> return (makeErrorResponse K.GROUP_AUTHORIZATION_FAILED "Group authorization failed.")
_ -> do
return $ K.FindCoordinatorResponse {
errorMessage=Just "KeyType Must be 0(GROUP)"
, nodeId=0
, errorCode=K.COORDINATOR_NOT_AVAILABLE
, throttleTimeMs=0
, port=0
, host=""
}
-- TODO: authz [DESCRIBE TRANSACTION_ID] when this is supported
-- FIXME: Is the error code/message
return $ makeErrorResponse K.COORDINATOR_NOT_AVAILABLE "KeyType Must be 0(GROUP)"
where
-- FIXME: hard-coded constants
-- Note: Kafka returns `nodeId = -1`, `port = -1` and `host=""` on error.
-- See kafka.server.KafkaApis#getCoordinator and
-- org.apache.kafka.common.Node#NO_NODE
makeErrorResponse :: K.ErrorCode -> Text -> K.FindCoordinatorResponse
makeErrorResponse code errMsg = K.FindCoordinatorResponse {
errorMessage = Just errMsg
, nodeId = -1
, errorCode = code
, throttleTimeMs = 0
, port = -1
, host = ""
}

0 comments on commit 8daf2e4

Please sign in to comment.