From afc4f447e0e3e674759c36f7270c2b80c45f07e6 Mon Sep 17 00:00:00 2001
From: YangKian <45479280+YangKian@users.noreply.github.com>
Date: Thu, 9 Nov 2023 15:54:38 +0800
Subject: [PATCH] kafka-cli: polish kafka cli (#1672)

---
 common/hstream/HStream/Utils/Common.hs    |  10 ++
 hstream-kafka/HStream/Kafka/Client/Cli.hs | 136 +++++++++++++---------
 hstream-kafka/cbits/hs_kafka_client.cpp   |  21 ++--
 hstream/src/HStream/Client/Internal.hs    |   9 +-
 4 files changed, 101 insertions(+), 75 deletions(-)

diff --git a/common/hstream/HStream/Utils/Common.hs b/common/hstream/HStream/Utils/Common.hs
index b5fb581db..0fb54c872 100644
--- a/common/hstream/HStream/Utils/Common.hs
+++ b/common/hstream/HStream/Utils/Common.hs
@@ -2,11 +2,13 @@ module HStream.Utils.Common
   ( maybeToEither
   , newRandomText
   , limitedMapConcurrently
+  , splitOn
   ) where
 
 import           Control.Concurrent.Async (mapConcurrently)
 import           Control.Concurrent.QSem  (QSem, newQSem, signalQSem, waitQSem)
 import           Control.Exception        (bracket_)
+import qualified Data.ByteString          as BS
 import           Data.Text                (Text)
 import qualified Data.Text                as Text
 import           System.Random
@@ -24,3 +26,11 @@ limitedMapConcurrently maxConcurrency f inputs = do
  where
    limited :: QSem -> IO c -> IO c
    limited sem = bracket_ (waitQSem sem) (signalQSem sem)
+
+  -- Break a ByteString into pieces separated by the first ByteString argument, consuming the delimiter
+splitOn :: BS.ByteString -> BS.ByteString -> [BS.ByteString]
+splitOn ""        = error "delimiter shouldn't be empty."
+splitOn delimiter = go
+  where
+    go s = let (pre, post) = BS.breakSubstring delimiter s
+            in pre : if BS.null post then [] else go (BS.drop (BS.length delimiter) post)
diff --git a/hstream-kafka/HStream/Kafka/Client/Cli.hs b/hstream-kafka/HStream/Kafka/Client/Cli.hs
index 26b8801b4..92db0c102 100644
--- a/hstream-kafka/HStream/Kafka/Client/Cli.hs
+++ b/hstream-kafka/HStream/Kafka/Client/Cli.hs
@@ -1,7 +1,5 @@
 {-# LANGUAGE DuplicateRecordFields #-}
 {-# LANGUAGE InterruptibleFFI      #-}
-{-# LANGUAGE LambdaCase            #-}
-{-# LANGUAGE MagicHash             #-}
 {-# LANGUAGE OverloadedRecordDot   #-}
 {-# LANGUAGE UnliftedFFITypes      #-}
 
@@ -20,6 +18,7 @@ import           Control.Exception        (finally)
 import           Control.Monad
 import           Control.Monad.IO.Class   (liftIO)
 import           Data.ByteString          (ByteString)
+import qualified Data.ByteString          as BS
 import           Data.Char                (toUpper)
 import           Data.Int
 import           Data.IORef
@@ -40,6 +39,7 @@ import           System.IO.Unsafe         (unsafePerformIO)
 
 import           HStream.Base.Table       as Table
 import qualified HStream.Kafka.Client.Api as KA
+import           HStream.Utils            (newRandomText, splitOn)
 import qualified Kafka.Protocol.Encoding  as K
 import qualified Kafka.Protocol.Error     as K
 import qualified Kafka.Protocol.Message   as K
@@ -127,18 +127,18 @@ data TopicCommandDeleteOpts = TopicCommandDeleteOpts
 topicCommandParser :: Parser TopicCommand
 topicCommandParser = hsubparser
   ( O.command "list" (O.info (pure TopicCommandList) (O.progDesc "Get all topics"))
- <> O.command "info" (O.info (TopicCommandInfo <$> topicNameParser) (O.progDesc "topic info"))
+ <> O.command "describe" (O.info (TopicCommandInfo <$> topicNameParser) (O.progDesc "List details of given topic"))
  <> O.command "create" (O.info (TopicCommandCreate <$> createTopicsRequestParserV0) (O.progDesc "Create a topic"))
- <> O.command "delete" (O.info (TopicCommandDelete <$> topicDeleteOptsParser) (O.progDesc "delete a topic"))
+ <> O.command "delete" (O.info (TopicCommandDelete <$> topicDeleteOptsParser) (O.progDesc "Delete a topic"))
   )
 
 topicNameParser :: Parser Text
 topicNameParser =
-  O.strOption (O.long "name" <> O.metavar "Text" <> O.help "Topic name")
+  O.strArgument (O.metavar "TopicName" <> O.help "Topic name")
 
 topicNameParser' :: Parser (Either () Text)
 topicNameParser' =
-      Right <$> O.strOption (O.long "name" <> O.metavar "Text" <> O.help "Topic name")
+      Right <$> O.strArgument (O.metavar "TopicName" <> O.help "Topic name")
   <|> flag' (Left ()) (O.long "all" <> O.help "All topics")
 
 topicDeleteOptsParser :: Parser TopicCommandDeleteOpts
@@ -148,7 +148,7 @@ topicDeleteOptsParser = TopicCommandDeleteOpts
 
 handleTopicCommand :: Options -> TopicCommand -> IO ()
 handleTopicCommand opts TopicCommandList       = handleTopicList opts
-handleTopicCommand opts (TopicCommandInfo n)   = handleTopicInfo opts n
+handleTopicCommand opts (TopicCommandInfo n)   = handleTopicDescribe opts n
 handleTopicCommand opts (TopicCommandCreate n) = handleTopicCreate opts n
 handleTopicCommand opts (TopicCommandDelete o) = handleTopicDelete opts o
 
@@ -157,17 +157,21 @@ handleTopicList Options{..} = do
   let req = K.MetadataRequestV0 (K.KaArray Nothing)
   correlationId <- getCorrelationId
   resp <- KA.withSendAndRecv host port (KA.metadata correlationId req)
-  let titles = ["Name", "ErrorCode", "IsInternal"]
-      K.NonNullKaArray topics = resp.topics
-      lenses = [ Text.unpack . (.name)
-               , show . (.errorCode)
-               , show . (.isInternal)
-               ]
-      stats = (\s -> ($ s) <$> lenses) <$> (V.toList topics)
-  putStrLn $ simpleShowTable (map (, 30, Table.left) titles) stats
-
-handleTopicInfo :: Options -> Text -> IO ()
-handleTopicInfo Options{..} name = do
+  let K.NonNullKaArray topics = resp.topics
+      failed = V.filter (\t -> t.errorCode /= K.NONE) topics
+  if V.null failed
+    then do
+      let titles = ["Name", "IsInternal"]
+          lenses = [ Text.unpack . (.name)
+                   , show . (.isInternal)
+                   ]
+          stats = (\s -> ($ s) <$> lenses) <$> (V.toList topics)
+      putStrLn $ simpleShowTable (map (, 30, Table.left) titles) stats
+    else do
+      putStrLn $ "List topic error: " <> show ((.errorCode) . V.head $ failed)
+
+handleTopicDescribe :: Options -> Text -> IO ()
+handleTopicDescribe Options{..} name = do
   tp <- describeTopic host port name
   let titles = ["Name", "IsInternal", "Partition", "LeaderId"]
       lenses = [ const (Text.unpack tp.name)
@@ -179,14 +183,24 @@ handleTopicInfo Options{..} name = do
   putStrLn $ simpleShowTable (map (, 30, Table.left) titles) stats
 
 handleTopicCreate :: Options -> K.CreateTopicsRequestV0 -> IO ()
-handleTopicCreate Options{..} req = do
+handleTopicCreate Options{..} req@K.CreateTopicsRequestV0{..} = do
   correlationId <- getCorrelationId
   K.CreateTopicsResponseV0 (K.KaArray (Just rets)) <-
     KA.withSendAndRecv host port (KA.createTopics correlationId req)
-  V.forM_ rets $ \ret -> do
-    when (ret.errorCode /= K.NONE) $
-      putStrLn $ "Create topic " <> show ret.name <> " failed: " <> show ret.errorCode
-  putStrLn "DONE"
+  case V.toList rets of
+    [K.CreatableTopicResultV0{..}]
+      | errorCode /= K.NONE -> putStrLn $ "Create topic " <> show name <> " failed: " <> show errorCode
+      | otherwise -> showTopic topics
+    _ -> putStrLn $ "UnexpectedError: create topic " <> " receive " <> show rets
+ where
+   showTopic topic = do
+     let titles = ["Name", "Partitions", "Replication-factor"]
+         lenses = [ Text.unpack . (.name)
+                  , show . (.numPartitions)
+                  , show . (.replicationFactor)
+                  ]
+         stats = (\s -> ($ s) <$> lenses) <$> V.toList (K.unNonNullKaArray topic)
+     putStrLn $ simpleShowTable (map (, 30, Table.left) titles) stats
 
 handleTopicDelete :: Options -> TopicCommandDeleteOpts -> IO ()
 handleTopicDelete Options{..} cmdopts = do
@@ -223,22 +237,22 @@ handleTopicDelete Options{..} cmdopts = do
 
 data GroupCommand
   = GroupCommandList
-  | GroupCommandInfo Text
+  | GroupCommandShow Text
   deriving (Show)
 
 groupIdParser :: Parser Text
 groupIdParser =
-  O.strOption (O.long "id" <> O.metavar "Text" <> O.help "Group id")
+  O.strArgument (O.metavar "Text" <> O.help "Group id")
 
 groupCommandParser :: Parser GroupCommand
 groupCommandParser = hsubparser
   ( O.command "list" (O.info (pure GroupCommandList) (O.progDesc "Get all consumer groups"))
- <> O.command "info" (O.info (GroupCommandInfo <$> groupIdParser) (O.progDesc "topic info"))
+ <> O.command "show" (O.info (GroupCommandShow <$> groupIdParser) (O.progDesc "Show topic info"))
   )
 
 handleGroupCommand :: Options -> GroupCommand -> IO ()
 handleGroupCommand opts GroupCommandList     = handleGroupList opts
-handleGroupCommand opts (GroupCommandInfo n) = handleGroupInfo opts n
+handleGroupCommand opts (GroupCommandShow n) = handleGroupShow opts n
 
 handleGroupList :: Options -> IO ()
 handleGroupList Options{..} = do
@@ -255,8 +269,8 @@ handleGroupList Options{..} = do
       stats = (\s -> ($ s) <$> lenses) <$> (V.toList groups)
   putStrLn $ simpleShowTable (map (, 30, Table.left) titles) stats
 
-handleGroupInfo :: Options -> Text -> IO ()
-handleGroupInfo Options{..} name = do
+handleGroupShow :: Options -> Text -> IO ()
+handleGroupShow Options{..} name = do
   let req = K.DescribeGroupsRequestV0 (K.KaArray $ Just $ V.singleton name)
   correlationId <- getCorrelationId
   resp <- KA.withSendAndRecv host port (KA.describeGroups correlationId req)
@@ -268,11 +282,9 @@ handleGroupInfo Options{..} name = do
   let emit idt s = putStrLn $ replicate idt ' ' <> s
       members = K.unNonNullKaArray $ group.members
   emit 0 . formatWith [yellow] $ "\9678 " <> Text.unpack group.groupId
-  -- Use show here because all the following can be an empty text, which will
-  -- be printed as `""` by show
-  emit 2 $ "GroupState: " <> show group.groupState
-  emit 2 $ "ProtocolType: " <> show group.protocolType
-  emit 2 $ "ProtocolData: " <> show group.protocolData
+  emit 2 $ "GroupState: " <> Text.unpack group.groupState
+  emit 2 $ "ProtocolType: " <> Text.unpack group.protocolType
+  emit 2 $ "ProtocolData: " <> Text.unpack group.protocolData
   if V.null members
      then emit 2 $ "Members: None"
      else do
@@ -318,17 +330,19 @@ data ProduceData = ProduceInteractive | ProduceData Text
   deriving (Show, Eq)
 
 data ProduceCommandOpts = ProduceCommandOpts
-  { topic       :: Text
-  , partition   :: Maybe Int32
-  , timeoutMs   :: Int32
-  , produceData :: ProduceData
+  { topic        :: Text
+  , partition    :: Maybe Int32
+  , timeoutMs    :: Int32
+  , keySeparator :: BS.ByteString
+  , produceData  :: ProduceData
   } deriving (Show, Eq)
 
 produceCommandParser :: Parser ProduceCommandOpts
 produceCommandParser = ProduceCommandOpts
-  <$> strOption (long "topic" <> metavar "Text" <> help "Topic name")
+  <$> strArgument (metavar "TopicName" <> help "Topic name")
   <*> O.optional (option auto (long "partition" <> short 'p' <> metavar "Int32" <> help "Partition index"))
   <*> option auto (long "timeout" <> metavar "Int32" <> value 5000 <> help "Timeout in milliseconds")
+  <*> O.option O.str (O.long "separator" <> short 's' <> O.metavar "String" <> O.showDefault <> O.value "@" <> O.help "Separator of key. e.g. key1@value")
   <*> ( ProduceData <$> strOption (long "data" <> short 'd' <> metavar "Text" <> help "Data")
     <|> flag' ProduceInteractive (long "interactive" <> short 'i' <> help "Interactive mode")
       )
@@ -347,30 +361,40 @@ handleProduceCommand Options{..} cmdopts = do
     pure p
   case cmdopts.produceData of
     ProduceData d -> flip finally (hs_delete_producer producer) $ do
-      doProduce producer cmdopts.topic (fromMaybe (-1) cmdopts.partition) d
-      hs_producer_flush producer
+      let (k, v) = splitKeyValue cmdopts.keySeparator (encodeUtf8 d)
+      doProduce producer cmdopts.topic (fromMaybe (-1) cmdopts.partition) k v
     ProduceInteractive -> flip finally (hs_delete_producer producer) $ do
       putStrLn $ "Type message value and hit enter "
               <> "to produce message. Use Ctrl-c to exit."
       HL.runInputT HL.defaultSettings
         (loopReadLine producer cmdopts.topic (fromMaybe (-1) cmdopts.partition))
   where
-    doProduce p topic partition payload = do
+    splitKeyValue sep input =
+      let items = splitOn sep input
+       in case items of
+            [payload] -> (BS.empty, payload)
+            [key,payload] -> (key, payload)
+            _ -> errorWithoutStackTrace $ "invalid input: "  <> show input <> " with separator " <> show sep
+
+    doProduce p topic partition key payload = do
       (errmsg, ret) <-
         HsForeign.withByteString (encodeUtf8 topic) $ \topic' topic_size ->
-          HsForeign.withByteString (encodeUtf8 payload) $ \payload'' payload_size ->
-            unsafeWithStdString $
-              hs_producer_produce p topic' topic_size partition
-                                  payload'' payload_size
+          HsForeign.withByteString payload $ \payload'' payload_size ->
+            HsForeign.withByteString key $ \key' key_size ->
+              unsafeWithStdString $
+                hs_producer_produce p topic' topic_size partition
+                                    payload'' payload_size key' key_size
       when (ret /= 0) $ errorWithoutStackTrace $
         "Produce failed: " <> (Text.unpack $ decodeUtf8 errmsg)
+      hs_producer_flush p
 
     loopReadLine p topic partition = do
       minput <- HL.getInputLine "> "
       case minput of
         Nothing -> return ()
         Just payload -> do
-          liftIO $ doProduce p topic partition (Text.pack payload)
+          let (k, v) = splitKeyValue cmdopts.keySeparator (encodeUtf8 . Text.pack $ payload)
+          liftIO $ doProduce p topic partition k v
           loopReadLine p topic partition
 
 data HsProducer
@@ -385,6 +409,7 @@ foreign import ccall interruptible "hs_producer_produce"
     -> Ptr Word8 -> Int             -- Topic
     -> Int32                        -- Partition
     -> Ptr Word8 -> Int             -- Payload
+    -> Ptr Word8 -> Int             -- Key
     -> (Ptr HsForeign.StdString)    -- errmsg
     -> IO Int
 
@@ -412,7 +437,7 @@ data ConsumeCommandOpts = ConsumeCommandOpts
 
 consumeCommandParser :: Parser ConsumeCommandOpts
 consumeCommandParser = ConsumeCommandOpts
-  <$> strOption (long "group-id" <> metavar "Text" <> help "Group id")
+  <$> strOption (long "group-id" <> short 'g' <> metavar "Text" <> value Text.empty <> help "Group id")
   <*> some (strOption (long "topic" <> short 't' <> metavar "Text" <> help "Topic name"))
   <*> optional ( flag' OffsetResetEarliest (long "earliest" <> help "Reset offset to earliest")
              <|> flag' OffsetResetLatest (long "latest" <> help "Reset offset to latest, default")
@@ -427,14 +452,13 @@ handleConsumeCommand Options{..} cmdopts = do
   let topics = filter (not . Text.null) cmdopts.topics
   when (null topics) $
     errorWithoutStackTrace "Topic name is required"
-  when (Text.null cmdopts.groupId) $
-    errorWithoutStackTrace "Group id is required"
+  groupId <- if Text.null cmdopts.groupId then newRandomText 10  else return cmdopts.groupId
   -- First check topic exists because we do not support auto topic creation yet.
   -- Or you will get a coredump.
   forM_ topics $ describeTopic host port
 
   let brokers = encodeUtf8 $ Text.pack (host <> ":" <> show port)
-      groupIdBs = encodeUtf8 cmdopts.groupId :: ByteString
+      groupIdBs = encodeUtf8 groupId :: ByteString
       offsetResetBs = case cmdopts.offsetReset of
                         Nothing                  -> "latest"
                         Just OffsetResetEarliest -> "earliest"
@@ -485,16 +509,16 @@ foreign import ccall interruptible "hs_consumer_consume"
 
 creatableTopicParserV0 :: Parser K.CreatableTopicV0
 creatableTopicParserV0 = K.CreatableTopicV0
-  <$> option str (O.long "name" <> metavar "Text")
-  <*> option auto (O.long "num-partitions" <> metavar "Int32")
-  <*> option auto (O.long "replication-factor" <> metavar "Int16")
+  <$> strArgument (metavar "TopicName")
+  <*> option auto (O.long "num-partitions" <> O.short 'p' <> O.value 1 <> O.showDefault <> metavar "Int32")
+  <*> option auto (O.long "replication-factor" <> O.short 'r' <> O.value 1 <> O.showDefault <> metavar "Int16")
   <*> pure (K.KaArray $ Just V.empty)
   <*> pure (K.KaArray $ Just V.empty)
 
 createTopicsRequestParserV0 :: Parser K.CreateTopicsRequestV0
 createTopicsRequestParserV0 = K.CreateTopicsRequestV0
-  <$> (K.KaArray . Just . V.fromList <$> some creatableTopicParserV0)
-  <*> pure 5000
+  <$> (K.KaArray . Just . V.singleton <$> creatableTopicParserV0)
+  <*> option auto (O.long "timeout" <> O.short 't' <> O.value 5000 <> O.showDefault <> metavar "Int32")
 
 -------------------------------------------------------------------------------
 
diff --git a/hstream-kafka/cbits/hs_kafka_client.cpp b/hstream-kafka/cbits/hs_kafka_client.cpp
index c7ca609f2..4a18eb5ab 100644
--- a/hstream-kafka/cbits/hs_kafka_client.cpp
+++ b/hstream-kafka/cbits/hs_kafka_client.cpp
@@ -21,10 +21,10 @@ class HsDeliveryReportCb : public RdKafka::DeliveryReportCb {
     /* If message.err() is non-zero the message delivery failed permanently
      * for the message. */
     if (message.err())
-      std::cerr << "% Message delivery failed: " << message.errstr()
+      std::cerr << "Message delivery failed: " << message.errstr()
                 << std::endl;
     else
-      std::cerr << "% Message delivered to topic " << message.topic_name()
+      std::cerr << "Message delivered to topic " << message.topic_name()
                 << " [" << message.partition() << "] at offset "
                 << message.offset() << std::endl;
   }
@@ -178,7 +178,7 @@ void msg_consume(RdKafka::Message* message, void* opaque) {
     case RdKafka::ERR__PARTITION_EOF:
       /* Last message */
       if (consumer_exit_eof && ++consumer_eof_cnt == consumer_partition_cnt) {
-        std::cerr << "%% EOF reached for all " << consumer_partition_cnt
+        std::cerr << "EOF reached for all " << consumer_partition_cnt
                   << " partition(s)" << std::endl;
         run = 0;
       }
@@ -251,15 +251,14 @@ void hs_delete_producer(HsProducer* p) { delete p; }
 
 HsInt hs_producer_produce(HsProducer* p, const char* topic_, HsInt topic_size_,
                           int32_t partition_, const char* payload_,
-                          HsInt payload_size_, std::string* errstr) {
-  if (!payload_) {
-    p->producer->poll(0);
-    return 0;
-  }
+                          HsInt payload_size_, const char* key_, 
+                          HsInt key_size_, std::string* errstr) {
 
   std::string topic(topic_, topic_size_);
   std::string payload(payload_, payload_size_);
   auto partition = partition_ < 0 ? RdKafka::Topic::PARTITION_UA : partition_;
+  auto key = key_size_ == 0 ? NULL : const_cast<char*>(key_);
+  auto value = payload_size_ == 0 ? NULL : const_cast<char*>(payload_);
 
 retry:
   RdKafka::ErrorCode err =
@@ -267,9 +266,9 @@ HsInt hs_producer_produce(HsProducer* p, const char* topic_, HsInt topic_size_,
                            /* Copy payload */
                            RdKafka::Producer::RK_MSG_COPY,
                            /* Value */
-                           const_cast<char*>(payload_), payload_size_,
+                           value, payload_size_,
                            /* Key */
-                           NULL, 0,
+                           key, key_size_,
                            /* Timestamp (defaults to current time) */
                            0,
                            /* Message headers, if any */
@@ -309,7 +308,7 @@ void hs_producer_flush(HsProducer* p) {
   p->producer->flush(10 * 1000 /* wait for max 10 seconds */);
 
   if (p->producer->outq_len() > 0) {
-    std::cerr << "% " << p->producer->outq_len()
+    std::cerr << p->producer->outq_len()
               << " message(s) were not delivered" << std::endl;
   }
 }
diff --git a/hstream/src/HStream/Client/Internal.hs b/hstream/src/HStream/Client/Internal.hs
index cd40d2ba4..0b2efc761 100644
--- a/hstream/src/HStream/Client/Internal.hs
+++ b/hstream/src/HStream/Client/Internal.hs
@@ -43,7 +43,7 @@ import           HStream.Utils                    (ResourceType (..),
                                                    decompressBatchedRecord,
                                                    formatResult, getServerResp,
                                                    jsonObjectToStruct,
-                                                   newRandomText)
+                                                   newRandomText, splitOn)
 
 streamingFetch :: HStreamCliContext -> T.Text -> API.HStreamApi ClientRequest response -> IO ()
 streamingFetch = streamingFetch' (putStr . formatResult @PB.Struct) False
@@ -145,10 +145,3 @@ interactiveAppend AppendContext{..} = do
     Left _  -> (False, payload)
     Right p -> (True, BSL.toStrict . PB.toLazyByteString . jsonObjectToStruct $ p)
 
-  -- Break a ByteString into pieces separated by the first ByteString argument, consuming the delimiter
-  splitOn :: BS.ByteString -> BS.ByteString -> [BS.ByteString]
-  splitOn ""        = error "delimiter shouldn't be empty."
-  splitOn delimiter = go
-    where
-      go s = let (pre, post) = BS.breakSubstring delimiter s
-              in pre : if BS.null post then [] else go (BS.drop (BS.length delimiter) post)