Skip to content

Commit

Permalink
polish group
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Nov 7, 2023
1 parent 3e5ff55 commit 97daad5
Showing 1 changed file with 13 additions and 16 deletions.
29 changes: 13 additions & 16 deletions hstream-kafka/HStream/Kafka/Client/Cli.hs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BC
import HStream.Base.Table as Table
import qualified HStream.Kafka.Client.Api as KA
import HStream.Utils (splitOn)
import HStream.Utils (newRandomText, splitOn)
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
Expand Down Expand Up @@ -238,22 +238,22 @@ handleTopicDelete Options{..} cmdopts = do

data GroupCommand
= GroupCommandList
| GroupCommandInfo Text
| GroupCommandShow Text
deriving (Show)

groupIdParser :: Parser Text
groupIdParser =
O.strOption (O.long "id" <> O.metavar "Text" <> O.help "Group id")
O.strArgument (O.metavar "Text" <> O.help "Group id")

groupCommandParser :: Parser GroupCommand
groupCommandParser = hsubparser
( O.command "list" (O.info (pure GroupCommandList) (O.progDesc "Get all consumer groups"))
<> O.command "info" (O.info (GroupCommandInfo <$> groupIdParser) (O.progDesc "topic info"))
<> O.command "show" (O.info (GroupCommandShow <$> groupIdParser) (O.progDesc "Show topic info"))
)

handleGroupCommand :: Options -> GroupCommand -> IO ()
handleGroupCommand opts GroupCommandList = handleGroupList opts
handleGroupCommand opts (GroupCommandInfo n) = handleGroupInfo opts n
handleGroupCommand opts (GroupCommandShow n) = handleGroupShow opts n

handleGroupList :: Options -> IO ()
handleGroupList Options{..} = do
Expand All @@ -270,8 +270,8 @@ handleGroupList Options{..} = do
stats = (\s -> ($ s) <$> lenses) <$> (V.toList groups)
putStrLn $ simpleShowTable (map (, 30, Table.left) titles) stats

handleGroupInfo :: Options -> Text -> IO ()
handleGroupInfo Options{..} name = do
handleGroupShow :: Options -> Text -> IO ()
handleGroupShow Options{..} name = do
let req = K.DescribeGroupsRequestV0 (K.KaArray $ Just $ V.singleton name)
correlationId <- getCorrelationId
resp <- KA.withSendAndRecv host port (KA.describeGroups correlationId req)
Expand All @@ -283,11 +283,9 @@ handleGroupInfo Options{..} name = do
let emit idt s = putStrLn $ replicate idt ' ' <> s
members = K.unNonNullKaArray $ group.members
emit 0 . formatWith [yellow] $ "\9678 " <> Text.unpack group.groupId
-- Use show here because all the following can be an empty text, which will
-- be printed as `""` by show
emit 2 $ "GroupState: " <> show group.groupState
emit 2 $ "ProtocolType: " <> show group.protocolType
emit 2 $ "ProtocolData: " <> show group.protocolData
emit 2 $ "GroupState: " <> Text.unpack group.groupState
emit 2 $ "ProtocolType: " <> Text.unpack group.protocolType
emit 2 $ "ProtocolData: " <> Text.unpack group.protocolData
if V.null members
then emit 2 $ "Members: None"
else do
Expand Down Expand Up @@ -440,7 +438,7 @@ data ConsumeCommandOpts = ConsumeCommandOpts

consumeCommandParser :: Parser ConsumeCommandOpts
consumeCommandParser = ConsumeCommandOpts
<$> strOption (long "group-id" <> metavar "Text" <> help "Group id")
<$> strOption (long "group-id" <> short 'g' <> metavar "Text" <> value Text.empty <> help "Group id")
<*> some (strOption (long "topic" <> short 't' <> metavar "Text" <> help "Topic name"))
<*> optional ( flag' OffsetResetEarliest (long "earliest" <> help "Reset offset to earliest")
<|> flag' OffsetResetLatest (long "latest" <> help "Reset offset to latest, default")
Expand All @@ -455,14 +453,13 @@ handleConsumeCommand Options{..} cmdopts = do
let topics = filter (not . Text.null) cmdopts.topics
when (null topics) $
errorWithoutStackTrace "Topic name is required"
when (Text.null cmdopts.groupId) $
errorWithoutStackTrace "Group id is required"
groupId <- if Text.null cmdopts.groupId then newRandomText 10 else return cmdopts.groupId
-- First check topic exists because we do not support auto topic creation yet.
-- Or you will get a coredump.
forM_ topics $ describeTopic host port

let brokers = encodeUtf8 $ Text.pack (host <> ":" <> show port)
groupIdBs = encodeUtf8 cmdopts.groupId :: ByteString
groupIdBs = encodeUtf8 groupId :: ByteString
offsetResetBs = case cmdopts.offsetReset of
Nothing -> "latest"
Just OffsetResetEarliest -> "earliest"
Expand Down

0 comments on commit 97daad5

Please sign in to comment.