Skip to content

Commit

Permalink
kafka-cli: polish kafka cli (#1672)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Nov 9, 2023
1 parent 8369261 commit afc4f44
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 75 deletions.
10 changes: 10 additions & 0 deletions common/hstream/HStream/Utils/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ module HStream.Utils.Common
( maybeToEither
, newRandomText
, limitedMapConcurrently
, splitOn
) where

import Control.Concurrent.Async (mapConcurrently)
import Control.Concurrent.QSem (QSem, newQSem, signalQSem, waitQSem)
import Control.Exception (bracket_)
import qualified Data.ByteString as BS
import Data.Text (Text)
import qualified Data.Text as Text
import System.Random
Expand All @@ -24,3 +26,11 @@ limitedMapConcurrently maxConcurrency f inputs = do
where
limited :: QSem -> IO c -> IO c
limited sem = bracket_ (waitQSem sem) (signalQSem sem)

-- Break a ByteString into pieces separated by the first ByteString argument, consuming the delimiter
splitOn :: BS.ByteString -> BS.ByteString -> [BS.ByteString]
splitOn "" = error "delimiter shouldn't be empty."
splitOn delimiter = go
where
go s = let (pre, post) = BS.breakSubstring delimiter s
in pre : if BS.null post then [] else go (BS.drop (BS.length delimiter) post)
136 changes: 80 additions & 56 deletions hstream-kafka/HStream/Kafka/Client/Cli.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE InterruptibleFFI #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE UnliftedFFITypes #-}

Expand All @@ -20,6 +18,7 @@ import Control.Exception (finally)
import Control.Monad
import Control.Monad.IO.Class (liftIO)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Char (toUpper)
import Data.Int
import Data.IORef
Expand All @@ -40,6 +39,7 @@ import System.IO.Unsafe (unsafePerformIO)

import HStream.Base.Table as Table
import qualified HStream.Kafka.Client.Api as KA
import HStream.Utils (newRandomText, splitOn)
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
Expand Down Expand Up @@ -127,18 +127,18 @@ data TopicCommandDeleteOpts = TopicCommandDeleteOpts
topicCommandParser :: Parser TopicCommand
topicCommandParser = hsubparser
( O.command "list" (O.info (pure TopicCommandList) (O.progDesc "Get all topics"))
<> O.command "info" (O.info (TopicCommandInfo <$> topicNameParser) (O.progDesc "topic info"))
<> O.command "describe" (O.info (TopicCommandInfo <$> topicNameParser) (O.progDesc "List details of given topic"))
<> O.command "create" (O.info (TopicCommandCreate <$> createTopicsRequestParserV0) (O.progDesc "Create a topic"))
<> O.command "delete" (O.info (TopicCommandDelete <$> topicDeleteOptsParser) (O.progDesc "delete a topic"))
<> O.command "delete" (O.info (TopicCommandDelete <$> topicDeleteOptsParser) (O.progDesc "Delete a topic"))
)

topicNameParser :: Parser Text
topicNameParser =
O.strOption (O.long "name" <> O.metavar "Text" <> O.help "Topic name")
O.strArgument (O.metavar "TopicName" <> O.help "Topic name")

topicNameParser' :: Parser (Either () Text)
topicNameParser' =
Right <$> O.strOption (O.long "name" <> O.metavar "Text" <> O.help "Topic name")
Right <$> O.strArgument (O.metavar "TopicName" <> O.help "Topic name")
<|> flag' (Left ()) (O.long "all" <> O.help "All topics")

topicDeleteOptsParser :: Parser TopicCommandDeleteOpts
Expand All @@ -148,7 +148,7 @@ topicDeleteOptsParser = TopicCommandDeleteOpts

handleTopicCommand :: Options -> TopicCommand -> IO ()
handleTopicCommand opts TopicCommandList = handleTopicList opts
handleTopicCommand opts (TopicCommandInfo n) = handleTopicInfo opts n
handleTopicCommand opts (TopicCommandInfo n) = handleTopicDescribe opts n
handleTopicCommand opts (TopicCommandCreate n) = handleTopicCreate opts n
handleTopicCommand opts (TopicCommandDelete o) = handleTopicDelete opts o

Expand All @@ -157,17 +157,21 @@ handleTopicList Options{..} = do
let req = K.MetadataRequestV0 (K.KaArray Nothing)
correlationId <- getCorrelationId
resp <- KA.withSendAndRecv host port (KA.metadata correlationId req)
let titles = ["Name", "ErrorCode", "IsInternal"]
K.NonNullKaArray topics = resp.topics
lenses = [ Text.unpack . (.name)
, show . (.errorCode)
, show . (.isInternal)
]
stats = (\s -> ($ s) <$> lenses) <$> (V.toList topics)
putStrLn $ simpleShowTable (map (, 30, Table.left) titles) stats

handleTopicInfo :: Options -> Text -> IO ()
handleTopicInfo Options{..} name = do
let K.NonNullKaArray topics = resp.topics
failed = V.filter (\t -> t.errorCode /= K.NONE) topics
if V.null failed
then do
let titles = ["Name", "IsInternal"]
lenses = [ Text.unpack . (.name)
, show . (.isInternal)
]
stats = (\s -> ($ s) <$> lenses) <$> (V.toList topics)
putStrLn $ simpleShowTable (map (, 30, Table.left) titles) stats
else do
putStrLn $ "List topic error: " <> show ((.errorCode) . V.head $ failed)

handleTopicDescribe :: Options -> Text -> IO ()
handleTopicDescribe Options{..} name = do
tp <- describeTopic host port name
let titles = ["Name", "IsInternal", "Partition", "LeaderId"]
lenses = [ const (Text.unpack tp.name)
Expand All @@ -179,14 +183,24 @@ handleTopicInfo Options{..} name = do
putStrLn $ simpleShowTable (map (, 30, Table.left) titles) stats

handleTopicCreate :: Options -> K.CreateTopicsRequestV0 -> IO ()
handleTopicCreate Options{..} req = do
handleTopicCreate Options{..} req@K.CreateTopicsRequestV0{..} = do
correlationId <- getCorrelationId
K.CreateTopicsResponseV0 (K.KaArray (Just rets)) <-
KA.withSendAndRecv host port (KA.createTopics correlationId req)
V.forM_ rets $ \ret -> do
when (ret.errorCode /= K.NONE) $
putStrLn $ "Create topic " <> show ret.name <> " failed: " <> show ret.errorCode
putStrLn "DONE"
case V.toList rets of
[K.CreatableTopicResultV0{..}]
| errorCode /= K.NONE -> putStrLn $ "Create topic " <> show name <> " failed: " <> show errorCode
| otherwise -> showTopic topics
_ -> putStrLn $ "UnexpectedError: create topic " <> " receive " <> show rets
where
showTopic topic = do
let titles = ["Name", "Partitions", "Replication-factor"]
lenses = [ Text.unpack . (.name)
, show . (.numPartitions)
, show . (.replicationFactor)
]
stats = (\s -> ($ s) <$> lenses) <$> V.toList (K.unNonNullKaArray topic)
putStrLn $ simpleShowTable (map (, 30, Table.left) titles) stats

handleTopicDelete :: Options -> TopicCommandDeleteOpts -> IO ()
handleTopicDelete Options{..} cmdopts = do
Expand Down Expand Up @@ -223,22 +237,22 @@ handleTopicDelete Options{..} cmdopts = do

data GroupCommand
= GroupCommandList
| GroupCommandInfo Text
| GroupCommandShow Text
deriving (Show)

groupIdParser :: Parser Text
groupIdParser =
O.strOption (O.long "id" <> O.metavar "Text" <> O.help "Group id")
O.strArgument (O.metavar "Text" <> O.help "Group id")

groupCommandParser :: Parser GroupCommand
groupCommandParser = hsubparser
( O.command "list" (O.info (pure GroupCommandList) (O.progDesc "Get all consumer groups"))
<> O.command "info" (O.info (GroupCommandInfo <$> groupIdParser) (O.progDesc "topic info"))
<> O.command "show" (O.info (GroupCommandShow <$> groupIdParser) (O.progDesc "Show topic info"))
)

handleGroupCommand :: Options -> GroupCommand -> IO ()
handleGroupCommand opts GroupCommandList = handleGroupList opts
handleGroupCommand opts (GroupCommandInfo n) = handleGroupInfo opts n
handleGroupCommand opts (GroupCommandShow n) = handleGroupShow opts n

handleGroupList :: Options -> IO ()
handleGroupList Options{..} = do
Expand All @@ -255,8 +269,8 @@ handleGroupList Options{..} = do
stats = (\s -> ($ s) <$> lenses) <$> (V.toList groups)
putStrLn $ simpleShowTable (map (, 30, Table.left) titles) stats

handleGroupInfo :: Options -> Text -> IO ()
handleGroupInfo Options{..} name = do
handleGroupShow :: Options -> Text -> IO ()
handleGroupShow Options{..} name = do
let req = K.DescribeGroupsRequestV0 (K.KaArray $ Just $ V.singleton name)
correlationId <- getCorrelationId
resp <- KA.withSendAndRecv host port (KA.describeGroups correlationId req)
Expand All @@ -268,11 +282,9 @@ handleGroupInfo Options{..} name = do
let emit idt s = putStrLn $ replicate idt ' ' <> s
members = K.unNonNullKaArray $ group.members
emit 0 . formatWith [yellow] $ "\9678 " <> Text.unpack group.groupId
-- Use show here because all the following can be an empty text, which will
-- be printed as `""` by show
emit 2 $ "GroupState: " <> show group.groupState
emit 2 $ "ProtocolType: " <> show group.protocolType
emit 2 $ "ProtocolData: " <> show group.protocolData
emit 2 $ "GroupState: " <> Text.unpack group.groupState
emit 2 $ "ProtocolType: " <> Text.unpack group.protocolType
emit 2 $ "ProtocolData: " <> Text.unpack group.protocolData
if V.null members
then emit 2 $ "Members: None"
else do
Expand Down Expand Up @@ -318,17 +330,19 @@ data ProduceData = ProduceInteractive | ProduceData Text
deriving (Show, Eq)

data ProduceCommandOpts = ProduceCommandOpts
{ topic :: Text
, partition :: Maybe Int32
, timeoutMs :: Int32
, produceData :: ProduceData
{ topic :: Text
, partition :: Maybe Int32
, timeoutMs :: Int32
, keySeparator :: BS.ByteString
, produceData :: ProduceData
} deriving (Show, Eq)

produceCommandParser :: Parser ProduceCommandOpts
produceCommandParser = ProduceCommandOpts
<$> strOption (long "topic" <> metavar "Text" <> help "Topic name")
<$> strArgument (metavar "TopicName" <> help "Topic name")
<*> O.optional (option auto (long "partition" <> short 'p' <> metavar "Int32" <> help "Partition index"))
<*> option auto (long "timeout" <> metavar "Int32" <> value 5000 <> help "Timeout in milliseconds")
<*> O.option O.str (O.long "separator" <> short 's' <> O.metavar "String" <> O.showDefault <> O.value "@" <> O.help "Separator of key. e.g. key1@value")
<*> ( ProduceData <$> strOption (long "data" <> short 'd' <> metavar "Text" <> help "Data")
<|> flag' ProduceInteractive (long "interactive" <> short 'i' <> help "Interactive mode")
)
Expand All @@ -347,30 +361,40 @@ handleProduceCommand Options{..} cmdopts = do
pure p
case cmdopts.produceData of
ProduceData d -> flip finally (hs_delete_producer producer) $ do
doProduce producer cmdopts.topic (fromMaybe (-1) cmdopts.partition) d
hs_producer_flush producer
let (k, v) = splitKeyValue cmdopts.keySeparator (encodeUtf8 d)
doProduce producer cmdopts.topic (fromMaybe (-1) cmdopts.partition) k v
ProduceInteractive -> flip finally (hs_delete_producer producer) $ do
putStrLn $ "Type message value and hit enter "
<> "to produce message. Use Ctrl-c to exit."
HL.runInputT HL.defaultSettings
(loopReadLine producer cmdopts.topic (fromMaybe (-1) cmdopts.partition))
where
doProduce p topic partition payload = do
splitKeyValue sep input =
let items = splitOn sep input
in case items of
[payload] -> (BS.empty, payload)
[key,payload] -> (key, payload)
_ -> errorWithoutStackTrace $ "invalid input: " <> show input <> " with separator " <> show sep

doProduce p topic partition key payload = do
(errmsg, ret) <-
HsForeign.withByteString (encodeUtf8 topic) $ \topic' topic_size ->
HsForeign.withByteString (encodeUtf8 payload) $ \payload'' payload_size ->
unsafeWithStdString $
hs_producer_produce p topic' topic_size partition
payload'' payload_size
HsForeign.withByteString payload $ \payload'' payload_size ->
HsForeign.withByteString key $ \key' key_size ->
unsafeWithStdString $
hs_producer_produce p topic' topic_size partition
payload'' payload_size key' key_size
when (ret /= 0) $ errorWithoutStackTrace $
"Produce failed: " <> (Text.unpack $ decodeUtf8 errmsg)
hs_producer_flush p

loopReadLine p topic partition = do
minput <- HL.getInputLine "> "
case minput of
Nothing -> return ()
Just payload -> do
liftIO $ doProduce p topic partition (Text.pack payload)
let (k, v) = splitKeyValue cmdopts.keySeparator (encodeUtf8 . Text.pack $ payload)
liftIO $ doProduce p topic partition k v
loopReadLine p topic partition

data HsProducer
Expand All @@ -385,6 +409,7 @@ foreign import ccall interruptible "hs_producer_produce"
-> Ptr Word8 -> Int -- Topic
-> Int32 -- Partition
-> Ptr Word8 -> Int -- Payload
-> Ptr Word8 -> Int -- Key
-> (Ptr HsForeign.StdString) -- errmsg
-> IO Int

Expand Down Expand Up @@ -412,7 +437,7 @@ data ConsumeCommandOpts = ConsumeCommandOpts

consumeCommandParser :: Parser ConsumeCommandOpts
consumeCommandParser = ConsumeCommandOpts
<$> strOption (long "group-id" <> metavar "Text" <> help "Group id")
<$> strOption (long "group-id" <> short 'g' <> metavar "Text" <> value Text.empty <> help "Group id")
<*> some (strOption (long "topic" <> short 't' <> metavar "Text" <> help "Topic name"))
<*> optional ( flag' OffsetResetEarliest (long "earliest" <> help "Reset offset to earliest")
<|> flag' OffsetResetLatest (long "latest" <> help "Reset offset to latest, default")
Expand All @@ -427,14 +452,13 @@ handleConsumeCommand Options{..} cmdopts = do
let topics = filter (not . Text.null) cmdopts.topics
when (null topics) $
errorWithoutStackTrace "Topic name is required"
when (Text.null cmdopts.groupId) $
errorWithoutStackTrace "Group id is required"
groupId <- if Text.null cmdopts.groupId then newRandomText 10 else return cmdopts.groupId
-- First check topic exists because we do not support auto topic creation yet.
-- Or you will get a coredump.
forM_ topics $ describeTopic host port

let brokers = encodeUtf8 $ Text.pack (host <> ":" <> show port)
groupIdBs = encodeUtf8 cmdopts.groupId :: ByteString
groupIdBs = encodeUtf8 groupId :: ByteString
offsetResetBs = case cmdopts.offsetReset of
Nothing -> "latest"
Just OffsetResetEarliest -> "earliest"
Expand Down Expand Up @@ -485,16 +509,16 @@ foreign import ccall interruptible "hs_consumer_consume"

creatableTopicParserV0 :: Parser K.CreatableTopicV0
creatableTopicParserV0 = K.CreatableTopicV0
<$> option str (O.long "name" <> metavar "Text")
<*> option auto (O.long "num-partitions" <> metavar "Int32")
<*> option auto (O.long "replication-factor" <> metavar "Int16")
<$> strArgument (metavar "TopicName")
<*> option auto (O.long "num-partitions" <> O.short 'p' <> O.value 1 <> O.showDefault <> metavar "Int32")
<*> option auto (O.long "replication-factor" <> O.short 'r' <> O.value 1 <> O.showDefault <> metavar "Int16")
<*> pure (K.KaArray $ Just V.empty)
<*> pure (K.KaArray $ Just V.empty)

createTopicsRequestParserV0 :: Parser K.CreateTopicsRequestV0
createTopicsRequestParserV0 = K.CreateTopicsRequestV0
<$> (K.KaArray . Just . V.fromList <$> some creatableTopicParserV0)
<*> pure 5000
<$> (K.KaArray . Just . V.singleton <$> creatableTopicParserV0)
<*> option auto (O.long "timeout" <> O.short 't' <> O.value 5000 <> O.showDefault <> metavar "Int32")

-------------------------------------------------------------------------------

Expand Down
21 changes: 10 additions & 11 deletions hstream-kafka/cbits/hs_kafka_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ class HsDeliveryReportCb : public RdKafka::DeliveryReportCb {
/* If message.err() is non-zero the message delivery failed permanently
* for the message. */
if (message.err())
std::cerr << "% Message delivery failed: " << message.errstr()
std::cerr << "Message delivery failed: " << message.errstr()
<< std::endl;
else
std::cerr << "% Message delivered to topic " << message.topic_name()
std::cerr << "Message delivered to topic " << message.topic_name()
<< " [" << message.partition() << "] at offset "
<< message.offset() << std::endl;
}
Expand Down Expand Up @@ -178,7 +178,7 @@ void msg_consume(RdKafka::Message* message, void* opaque) {
case RdKafka::ERR__PARTITION_EOF:
/* Last message */
if (consumer_exit_eof && ++consumer_eof_cnt == consumer_partition_cnt) {
std::cerr << "%% EOF reached for all " << consumer_partition_cnt
std::cerr << "EOF reached for all " << consumer_partition_cnt
<< " partition(s)" << std::endl;
run = 0;
}
Expand Down Expand Up @@ -251,25 +251,24 @@ void hs_delete_producer(HsProducer* p) { delete p; }

HsInt hs_producer_produce(HsProducer* p, const char* topic_, HsInt topic_size_,
int32_t partition_, const char* payload_,
HsInt payload_size_, std::string* errstr) {
if (!payload_) {
p->producer->poll(0);
return 0;
}
HsInt payload_size_, const char* key_,
HsInt key_size_, std::string* errstr) {

std::string topic(topic_, topic_size_);
std::string payload(payload_, payload_size_);
auto partition = partition_ < 0 ? RdKafka::Topic::PARTITION_UA : partition_;
auto key = key_size_ == 0 ? NULL : const_cast<char*>(key_);
auto value = payload_size_ == 0 ? NULL : const_cast<char*>(payload_);

retry:
RdKafka::ErrorCode err =
p->producer->produce(topic, partition,
/* Copy payload */
RdKafka::Producer::RK_MSG_COPY,
/* Value */
const_cast<char*>(payload_), payload_size_,
value, payload_size_,
/* Key */
NULL, 0,
key, key_size_,
/* Timestamp (defaults to current time) */
0,
/* Message headers, if any */
Expand Down Expand Up @@ -309,7 +308,7 @@ void hs_producer_flush(HsProducer* p) {
p->producer->flush(10 * 1000 /* wait for max 10 seconds */);

if (p->producer->outq_len() > 0) {
std::cerr << "% " << p->producer->outq_len()
std::cerr << p->producer->outq_len()
<< " message(s) were not delivered" << std::endl;
}
}
Expand Down
Loading

0 comments on commit afc4f44

Please sign in to comment.