Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add admin cli command to support list and clean connector related meta data #1827

Merged
merged 3 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions hstream-admin/server/HStream/Admin/Server/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ data MetaCommand
= MetaCmdList Text
| MetaCmdGet Text Text
| MetaCmdTask MetaTaskCommand
| MetaCmdClean MetaCleanCommand
| MetaCmdInfo
deriving (Show)

Expand All @@ -283,20 +284,22 @@ metaCmdParser = O.hsubparser
<> O.short 'r'
<> O.metavar "RESOURCE_CATEGORY"
<> O.help ("The category of the resource, currently support: "
<> "[subscription|query-info|view-info|qv-relation]")))
<> "[subscription|query-info|view-info|qv-relation|connectors|connector-infos]")))
(O.progDesc "List all metadata of specific resource"))
<> O.command "get" (O.info (MetaCmdGet <$> O.strOption ( O.long "resource"
<> O.short 'r'
<> O.metavar "RESOURCE_CATEGORY"
<> O.help ("The category of the resource, currently support: "
<> "[subscription|query-info|query-status|view-info|qv-relation]"))
<> "[subscription|query-info|query-status|view-info|qv-relation|connector|connector-info]"))
<*> O.strOption ( O.long "id"
<> O.short 'i'
<> O.metavar "RESOURCE_ID"
<> O.help "The Id of the resource"))
(O.progDesc "Get metadata of specific resource"))
<> O.command "info" (O.info (pure MetaCmdInfo) (O.progDesc "Get meta info"))
) O.<|> MetaCmdTask <$> metaTaskCmdParser
)
O.<|> MetaCmdTask <$> metaTaskCmdParser
O.<|> MetaCmdClean <$> metaCleanCmdParser

data MetaTaskCommand
= MetaTaskGet Text Text
Expand All @@ -307,14 +310,23 @@ metaTaskCmdParser = O.hsubparser
( O.command "get-task" (O.info (MetaTaskGet <$> O.strOption ( O.long "resource"
<> O.short 'r'
<> O.metavar "RESOURCE_CATEGORY"
<> O.help "The category of the resource")
<> O.help ("The category of the resource, currently support: "
<> "[stream|subscription|query|view|connector|shard|shard-reader]"))
<*> O.strOption ( O.long "id"
<> O.short 'i'
<> O.metavar "RESOURCE_ID"
<> O.help "The Id of the resource"))
(O.progDesc "Get task allocation metadata of specific resource"))
)

data MetaCleanCommand = CleanConnectors
deriving (Show)

metaCleanCmdParser :: O.Parser MetaCleanCommand
metaCleanCmdParser = O.hsubparser
( O.command "clean-connectors" (O.info (pure CleanConnectors) (O.progDesc "Clean up the taskMeta of connectors in deleted state."))
)

-------------------------------------------------------------------------------

-- TODO: auto generate from commom/stats/include/*.inc
Expand Down
49 changes: 36 additions & 13 deletions hstream/src/HStream/Server/Handler/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ module HStream.Server.Handler.Admin

import Control.Concurrent (readMVar, tryReadMVar)
import Control.Concurrent.STM.TVar (readTVarIO)
import Control.Monad (forM, void)
import Control.Exception (catch, throw)
import Control.Monad (forM, void, when)
import Data.Aeson ((.=))
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Text as Aeson
import qualified Data.HashMap.Strict as HM
import qualified Data.List as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Set as ST
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Text.Lazy as TL
Expand All @@ -36,7 +38,6 @@ import Proto3.Suite (Enumerated (Enumerated),
import qualified Z.Data.CBytes as CB
import Z.Data.CBytes (CBytes)

import Control.Exception (throw)
import qualified HStream.Admin.Server.Types as AT
import HStream.Base (rmTrailingZeros)
import qualified HStream.Exception as HE
Expand All @@ -61,6 +62,7 @@ import qualified HStream.Server.Core.View as HC
#endif
import HStream.Common.Server.MetaData (TaskAllocation,
renderTaskAllocationsToTable)
import HStream.IO.Types (TaskIdMeta (..), TaskMeta)
import HStream.Server.Exception (catchDefaultEx,
defaultExceptionHandle)
import qualified HStream.Server.HStreamApi as API
Expand All @@ -69,6 +71,8 @@ import HStream.Server.MetaData (QVRelation, QueryInfo,
renderQVRelationToTable,
renderQueryInfosToTable,
renderQueryStatusToTable,
renderTaskIdMetaMapToTable,
renderTaskMetaMapToTable,
renderViewInfosToTable)
import HStream.Server.Types
import qualified HStream.Stats as Stats
Expand All @@ -78,6 +82,7 @@ import HStream.Utils (Interval (..), cBytesToText,
returnResp, showNodeStatus,
structToJsonObject,
timestampToMsTimestamp)
import ZooKeeper.Exception (ZNONODE)

-------------------------------------------------------------------------------
-- All command line data types are defined in 'HStream.Admin.Types'
Expand Down Expand Up @@ -244,19 +249,25 @@ getResType resType =
runMeta :: ServerContext -> AT.MetaCommand -> IO Text
runMeta ServerContext{..} (AT.MetaCmdList resType) = do
case resType of
"subscription" -> pure <$> AT.tableResponse . renderSubscriptionWrapToTable =<< M.listMeta @SubscriptionWrap metaHandle
"query-info" -> pure <$> AT.plainResponse . renderQueryInfosToTable =<< M.listMeta @QueryInfo metaHandle
"view-info" -> pure <$> AT.plainResponse . renderViewInfosToTable =<< M.listMeta @ViewInfo metaHandle
"qv-relation" -> pure <$> AT.tableResponse . renderQVRelationToTable =<< M.listMeta @QVRelation metaHandle
_ -> return $ AT.errorResponse "invalid resource type, try [subscription|query-info|view-info|qv-relateion]"
"subscription" -> pure <$> AT.tableResponse . renderSubscriptionWrapToTable =<< M.listMeta @SubscriptionWrap metaHandle
"query-info" -> pure <$> AT.plainResponse . renderQueryInfosToTable =<< M.listMeta @QueryInfo metaHandle
"view-info" -> pure <$> AT.plainResponse . renderViewInfosToTable =<< M.listMeta @ViewInfo metaHandle
"qv-relation" -> pure <$> AT.tableResponse . renderQVRelationToTable =<< M.listMeta @QVRelation metaHandle
"connectors" -> pure <$> AT.tableResponse . renderTaskIdMetaMapToTable =<< M.getAllMeta @TaskIdMeta metaHandle
"connector-infos" -> pure <$> AT.tableResponse . renderTaskMetaMapToTable =<< M.getAllMeta @TaskMeta metaHandle
_ -> return $ AT.errorResponse "invalid resource type, try "
<> "[subscription|query-info|view-info|qv-relateion|connectors|connector-infos]"
runMeta ServerContext{..} (AT.MetaCmdGet resType rId) = do
case resType of
"subscription" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderSubscriptionWrapToTable .L.singleton) =<< M.getMeta @SubscriptionWrap rId metaHandle
"query-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderQueryInfosToTable . L.singleton) =<< M.getMeta @QueryInfo rId metaHandle
"query-status" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQueryStatusToTable . L.singleton) =<< M.getMeta @QueryStatus rId metaHandle
"view-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderViewInfosToTable . L.singleton) =<< M.getMeta @ViewInfo rId metaHandle
"qv-relation" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQVRelationToTable . L.singleton) =<< M.getMeta @QVRelation rId metaHandle
_ -> return $ AT.errorResponse "invalid resource type, try [subscription|query-info|query-status|view-info|qv-relateion]"
"subscription" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderSubscriptionWrapToTable .L.singleton) =<< M.getMeta @SubscriptionWrap rId metaHandle
"query-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderQueryInfosToTable . L.singleton) =<< M.getMeta @QueryInfo rId metaHandle
"query-status" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQueryStatusToTable . L.singleton) =<< M.getMeta @QueryStatus rId metaHandle
"view-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderViewInfosToTable . L.singleton) =<< M.getMeta @ViewInfo rId metaHandle
"qv-relation" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQVRelationToTable . L.singleton) =<< M.getMeta @QVRelation rId metaHandle
"connector" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderTaskIdMetaMapToTable . Map.singleton rId) =<< M.getMeta @TaskIdMeta rId metaHandle
"connector-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderTaskMetaMapToTable . Map.singleton rId) =<< M.getMeta @TaskMeta rId metaHandle
_ -> return $ AT.errorResponse "invalid resource type, try "
<> "[subscription|query-info|query-status|view-info|qv-relateion|connector|connector-info]"
runMeta ServerContext{serverOpts=ServerOpts{..}} AT.MetaCmdInfo = do
let headers = ["Meta Type" :: Text, "Connection Info"]
rows = case _metaStore of
Expand All @@ -266,12 +277,24 @@ runMeta ServerContext{serverOpts=ServerOpts{..}} AT.MetaCmdInfo = do
content = Aeson.object ["headers" .= headers, "rows" .= rows]
return $ AT.tableResponse content
runMeta sc (AT.MetaCmdTask taskCmd) = runMetaTask sc taskCmd
runMeta sc (AT.MetaCmdClean cmd) = runMetaCleanTask sc cmd

runMetaTask :: ServerContext -> AT.MetaTaskCommand -> IO Text
runMetaTask ServerContext{..} (AT.MetaTaskGet resType rId) = do
let metaId = mkAllocationKey (getResType resType) rId
pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderTaskAllocationsToTable . L.singleton) =<< M.getMeta @TaskAllocation metaId metaHandle

runMetaCleanTask :: ServerContext -> AT.MetaCleanCommand -> IO Text
runMetaCleanTask ServerContext{..} AT.CleanConnectors = do
activIds <- ST.fromList . map taskIdMeta <$> M.listMeta @TaskIdMeta metaHandle
allTaskMetas <- M.getAllMeta @TaskMeta metaHandle
void $ Map.traverseWithKey (\k _ -> removeMeta k activIds) allTaskMetas
return $ AT.plainResponse "OK"
where
removeMeta key ids = do
when (ST.notMember key ids) $
catch (M.deleteMeta @TaskMeta key Nothing metaHandle) $ \(_ :: ZNONODE) -> return ()

-------------------------------------------------------------------------------
-- Admin Stream Command

Expand Down
30 changes: 30 additions & 0 deletions hstream/src/HStream/Server/MetaData/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ module HStream.Server.MetaData.Types
, renderQueryStatusToTable
, renderViewInfosToTable
, renderQVRelationToTable
, renderTaskMetaMapToTable
, renderTaskIdMetaMapToTable

#ifdef HStreamEnableSchema
, hstreamColumnCatalogToColumnCatalog
Expand All @@ -55,8 +57,11 @@ import Data.Int (Int64)
import qualified Data.IntMap as IntMap
import Data.IORef
import qualified Data.List as L
import Data.Map (Map)
import qualified Data.Map.Strict as M
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import Data.Time.Clock.System (SystemTime (MkSystemTime),
getSystemTime)
import qualified Data.Vector as V
Expand All @@ -67,6 +72,9 @@ import GHC.Stack

import HStream.Common.Server.MetaData (rootPath)
import HStream.Common.ZookeeperClient (ZookeeperClient)
import HStream.IO.Types (TaskIdMeta (..),
TaskInfo (..),
TaskMeta (..))
import qualified HStream.Logger as Log
import HStream.MetaStore.Types (FHandle, HasPath (..),
MetaHandle,
Expand All @@ -79,6 +87,7 @@ import qualified HStream.Server.HStreamApi as API
import HStream.Server.MetaData.Exception
import HStream.Server.Types (SubscriptionWrap (..))
import qualified HStream.Store as S
import HStream.ThirdParty.Protobuf (toRFC3339)
import HStream.Utils
#ifdef HStreamUseV2Engine
import DiffFlow.Types
Expand Down Expand Up @@ -157,6 +166,27 @@ renderQVRelationToTable relations =
rows = map (\QVRelation{..} -> [qvRelationQueryName, qvRelationViewName]) relations
in Aeson.object ["headers" Aeson..= headers, "rows" Aeson..= rows]

renderTaskMetaMapToTable :: Map Text TaskMeta -> Aeson.Value
renderTaskMetaMapToTable mp =
let headers = [ "Connector Name" :: Text
, "Task Id" :: Text
, "Task Type" :: Text
, "Created Time" :: Text
, "State" :: Text
]
rows = map getMetaInfo $ M.toList mp
in Aeson.object ["headers" Aeson..= headers, "rows" Aeson..= rows]
where
getMetaInfo (taskId, TaskMeta{taskInfoMeta=TaskInfo{..}, ..}) =
let createTime = TL.toStrict . toRFC3339 $ taskCreatedTime
in [taskName, taskId, T.pack . show $ taskType, createTime, T.pack . show $ taskStateMeta]

renderTaskIdMetaMapToTable :: Map Text TaskIdMeta -> Aeson.Value
renderTaskIdMetaMapToTable mp =
let headers = ["Connector Name" :: Text , "Task Id"]
rows = map (\(name, TaskIdMeta{..}) -> [name, taskIdMeta]) $ M.toList mp
in Aeson.object ["headers" Aeson..= headers, "rows" Aeson..= rows]

type SourceStreams = [Text]
type SinkStream = Text
type RelatedStreams = (SourceStreams, SinkStream)
Expand Down
Loading