Skip to content

Commit

Permalink
feat: refactor hstream io store
Browse files Browse the repository at this point in the history
  • Loading branch information
s12f authored and s12f committed Sep 28, 2022
1 parent ffe8c6a commit cf9366f
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 17 deletions.
60 changes: 60 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,63 @@ jobs:
name: distributed-tests-reports
path: ~/data/reports.tar
retention-days: 7

hstream-io-tests:
needs: build-and-test
runs-on: ubuntu-latest
name: hstream-io-tests
strategy:
fail-fast: false

steps:
- name: retrieve saved docker image
uses: actions/download-artifact@v3
with:
name: image-testing
path: ~/data

- name: docker load
run: docker load -i ~/data/new_hstream_image.tar

- name: fetch hstream io tests source code
uses: actions/checkout@v2
with:
repository: "hstreamdb/hstream-connectors"
submodules: "recursive"
path: hstream-connectors

- uses: actions/setup-java@v2
with:
distribution: "adopt"
java-version: 11
cache: "gradle"

- uses: gradle/wrapper-validation-action@v1

- name: publish local java-toolkit
working-directory: ./hstream-connectors/java-toolkit
run: ./gradlew publishToMavenLocal -PdisableSigning --info

- name: build images
run: |
cd hstream-connectors/source-debezium
./gradlew buildImages
cd ../sink-jdbc
./gradlew buildImages
cd ../sink-mongodb
./gradlew buildImages
- name: run hstream io tests
run: |
cd hstream-connectors/integration_tests
export HSTREAM_IMAGE_NAME=$NEW_HSTREAM_IMAGE
./gradlew test --info --fail-fast
- uses: actions/upload-artifact@v2
if: ${{ success() }} || ${{ failure() }}
with:
name: hstream-io-logs
path: |
hstream-connectors/integration_tests/.logs
/tmp/io/tasks
hstream-connectors/integration_tests/app/build/reports
71 changes: 62 additions & 9 deletions hstream-io/HStream/IO/IOTask.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ module HStream.IO.IOTask where
import qualified Control.Concurrent as C
import Control.Exception (throwIO)
import qualified Control.Exception as E
import Control.Monad (msum, unless, void, when)
import Control.Monad (forever, msum, unless, void, when)
import qualified Data.Aeson as J
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import qualified Data.ByteString.Lazy.Char8 as BSLC
import Data.IORef (newIORef, readIORef, writeIORef)
import qualified Data.Text as T
import GHC.IO.Handle (hFlush)
import RIO.Directory (createDirectoryIfMissing)
import qualified System.Process.Typed as TP

import qualified Control.Concurrent.STM as STM
import qualified GHC.IO.Handle as IO
import qualified HStream.IO.Messages as MSG
import qualified HStream.IO.Meta as M
import HStream.IO.Types
Expand All @@ -42,10 +44,14 @@ getDockerName :: T.Text -> T.Text
getDockerName = ("IOTASK_" <>)

runIOTask :: IOTask -> IO ()
runIOTask IOTask{..} = do
runIOTask ioTask@IOTask{..} = do
Log.info $ "taskCmd: " <> Log.buildString taskCmd
tp <- TP.startProcess taskProcessConfig
writeIORef process' (Just tp)
_ <- C.forkIO $
E.catch
(handleStdout ioTask (TP.getStdout tp) (TP.getStdin tp))
(\(e :: E.SomeException) -> Log.info $ "handleStdout exited:" <> Log.buildString (show e))
return ()
where
TaskInfo {..} = taskInfo
Expand All @@ -57,14 +63,49 @@ runIOTask IOTask{..} = do
" -v " , taskPath, ":/data",
" " , T.unpack tcImage,
" run",
" --config /data/config.json",
" >> ", taskPath, "/log", " 2>&1"
" --config /data/config.json"
]
taskProcessConfig = TP.setStdin TP.createPipe
. TP.setStdout TP.closed
. TP.setStderr TP.closed
. TP.setStdout TP.createPipe
. TP.setStderr TP.inherit
$ TP.shell taskCmd

handleStdout :: IOTask -> IO.Handle -> IO.Handle -> IO ()
handleStdout ioTask@IOTask{..} hStdout hStdin = forever $ do
line <- BS.hGetLine hStdout
let logPath = taskPath ++ "/stdout.log"
BS.appendFile logPath (line <> "\n")
case J.eitherDecode (BSL.fromStrict line) of
Left _ -> pure () -- Log.info $ "decode err:" <> Log.buildString err
Right msg -> do
Log.info $ "connectorMsg:" <> Log.buildString (show msg)
respM <- handleConnectorMessage ioTask msg
Log.info $ "ConnectorRes:" <> Log.buildString (show respM)
case respM of
Nothing -> pure ()
Just resp -> do
BSLC.hPutStrLn hStdin (J.encode resp)
IO.hFlush hStdin

handleConnectorMessage :: IOTask -> MSG.ConnectorMessage -> IO (Maybe MSG.ConnectorCallResponse)
handleConnectorMessage ioTask MSG.ConnectorMessage{..} = do
case cmType of
MSG.CONNECTOR_SEND -> do
Log.warning "unsupported connector send"
pure Nothing
MSG.CONNECTOR_CALL -> do
result <- handleKvMessage ioTask cmMessage
return $ Just (MSG.ConnectorCallResponse cmId result)

handleKvMessage :: IOTask -> MSG.KvMessage -> IO J.Value
handleKvMessage IOTask{..} kvMsg@MSG.KvMessage {..} = do
case (kmAction, kmValue) of
("get", _) -> J.toJSON <$> M.getTaskKv taskHandle taskId kmKey
("set", Just val) -> J.Null <$ M.setTaskKv taskHandle taskId kmKey val
_ -> do
Log.warning $ "invalid kv msg:" <> Log.buildString (show kvMsg)
pure J.Null

-- runIOTaskWithRetry :: IOTask -> IO ()
-- runIOTaskWithRetry task@IOTask{..} =
-- runWithRetry (1 :: Int)
Expand Down Expand Up @@ -100,8 +141,13 @@ stopIOTask task@IOTask{..} ifIsRunning force = do
Just tp <- readIORef process'
let stdin = TP.getStdin tp
BSLC.hPutStrLn stdin (J.encode MSG.InputCommandStop <> "\n")
hFlush stdin
_ <- TP.waitExitCode tp
IO.hFlush stdin
void $ TP.waitExitCode tp
-- FIXME: timeout
-- timeout <- delayTMVar
-- STM.atomically $ STM.orElse (void $ TP.waitExitCodeSTM tp) (STM.takeTMVar timeout)
-- kill task process
-- void $ TP.runProcess killProcessConfig
TP.stopProcess tp
writeIORef process' Nothing
return STOPPED
Expand All @@ -111,6 +157,13 @@ stopIOTask task@IOTask{..} ifIsRunning force = do
where
killDockerCmd = "docker kill " ++ T.unpack (getDockerName taskId)
killProcessConfig = TP.shell killDockerCmd
delayTMVar = do
tmvar <- STM.newEmptyTMVarIO
_ <- C.forkIO $ do
C.threadDelay 15000000
Log.info "exit process timeout, force to stop it"
STM.atomically $ STM.putTMVar tmvar ()
return tmvar

updateStatus :: IOTask -> (IOTaskStatus -> IO IOTaskStatus) -> IO ()
updateStatus IOTask{..} action = do
Expand Down
46 changes: 46 additions & 0 deletions hstream-io/HStream/IO/Messages.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module HStream.IO.Messages where

import qualified Data.Aeson as J
import qualified Data.Aeson.TH as JT
import Data.Char (toLower)
import qualified Data.Text as T


Expand All @@ -21,3 +22,48 @@ data CheckResult
}

$(JT.deriveJSON JT.defaultOptions ''CheckResult)

data ConnectorMessageType
= CONNECTOR_CALL
| CONNECTOR_SEND
deriving (Show)

$(JT.deriveJSON
JT.defaultOptions { JT.constructorTagModifier = map toLower }
''ConnectorMessageType)

data KvMessage
= KvMessage
{ kmAction :: T.Text
, kmKey :: T.Text
, kmValue :: Maybe T.Text
} deriving (Show)

$(JT.deriveJSON
JT.defaultOptions
{ JT.fieldLabelModifier = map toLower . drop 2 }
''KvMessage)

data ConnectorMessage
= ConnectorMessage
{ cmType :: ConnectorMessageType
, cmId :: T.Text
, cmMessage :: KvMessage
} deriving (Show)

$(JT.deriveJSON
JT.defaultOptions
{ JT.fieldLabelModifier = map toLower . drop 2 }
''ConnectorMessage)

data ConnectorCallResponse = ConnectorCallResponse
{ ccrId :: T.Text
, ccrMessage :: J.Value
} deriving (Show)

instance J.ToJSON ConnectorCallResponse where
toJSON ConnectorCallResponse{..} =
J.object [ ("type" :: T.Text) J..= CONNECTOR_CALL
, ("id" :: T.Text) J..= ccrId
, ("message" :: T.Text) J..= ccrMessage
]
18 changes: 17 additions & 1 deletion hstream-io/HStream/IO/Meta.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import qualified Data.Text as T
import GHC.Stack (HasCallStack)

import HStream.IO.Types
import qualified HStream.IO.Types as Types
import HStream.MetaStore.Types (MetaHandle, MetaStore (..))
import qualified HStream.Server.HStreamApi as API

Expand All @@ -19,7 +20,8 @@ createIOTaskMeta h taskName taskId taskInfo = do
insertMeta taskName (TaskIdMeta taskId) h

listIOTaskMeta :: MetaHandle -> IO [API.Connector]
listIOTaskMeta h = map convertTaskMeta <$> listMeta @TaskMeta h
listIOTaskMeta h = do
map convertTaskMeta . filter (\TaskMeta{..} -> taskStateMeta /= DELETED) <$> listMeta @TaskMeta h

getIOTaskMeta :: MetaHandle -> T.Text -> IO (Maybe TaskMeta)
getIOTaskMeta h tid = getMeta tid h
Expand All @@ -36,3 +38,17 @@ deleteIOTaskMeta h name = do
-- FIXME: use multiops
deleteMeta @TaskIdMeta name Nothing h
updateStatusInMeta h tid DELETED

mapKvKey :: T.Text -> T.Text -> T.Text
mapKvKey taskId key = taskId <> "_" <> key

getTaskKv :: MetaHandle -> T.Text -> T.Text -> IO (Maybe T.Text)
getTaskKv h taskId key = fmap Types.value <$> getMeta mappedKey h
where mappedKey = mapKvKey taskId key

setTaskKv :: MetaHandle -> T.Text -> T.Text -> T.Text -> IO ()
setTaskKv h taskId key val = do
getTaskKv h taskId key >>= \case
Nothing -> insertMeta mappedKey (TaskKvMeta val) h
Just _ -> updateMeta mappedKey (TaskKvMeta val) Nothing h
where mappedKey = mapKvKey taskId key
16 changes: 14 additions & 2 deletions hstream-io/HStream/IO/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ data IOOptions = IOOptions
, optSinkImages :: HM.HashMap T.Text T.Text
} deriving (Show, Eq)

type TaskProcess = TP.Process IO.Handle IO.Handle ()

data IOTask = IOTask
{ taskId :: T.Text
, taskInfo :: TaskInfo
, taskPath :: FilePath
, taskHandle :: MetaHandle
, process' :: IORef (Maybe (TP.Process IO.Handle () ()))
, process' :: IORef (Maybe TaskProcess)
, statusM :: C.MVar IOTaskStatus
}

Expand Down Expand Up @@ -101,6 +103,11 @@ newtype TaskIdMeta = TaskIdMeta {taskIdMeta :: T.Text}
deriving (Show)
$(JT.deriveJSON JT.defaultOptions ''TaskIdMeta)

newtype TaskKvMeta = TaskKvMeta
{ value :: T.Text
} deriving (Show)
$(JT.deriveJSON JT.defaultOptions ''TaskKvMeta)

ioRootPath :: T.Text
ioRootPath = "/hstream/io"

Expand All @@ -110,13 +117,18 @@ instance HasPath TaskMeta ZHandle where
instance HasPath TaskIdMeta ZHandle where
myRootPath = ioRootPath <> "/taskNames"

instance HasPath TaskKvMeta ZHandle where
myRootPath = ioRootPath <> "/taskKvs"

instance HasPath TaskMeta RHandle where
myRootPath = "ioTasks"

instance HasPath TaskIdMeta RHandle where
myRootPath = "ioTaskNames"

-- TODO: spec the exceptions
instance HasPath TaskKvMeta RHandle where
myRootPath = "ioTaskKvs"

class TaskJson cm where
toTaskJson :: cm -> T.Text -> J.Value

Expand Down
6 changes: 1 addition & 5 deletions hstream/src/HStream/Server/MetaData/Value.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,12 @@ import HStream.Server.MetaData.Types
import HStream.Server.Types (SubscriptionWrap (..))
import HStream.Utils (textToCBytes)

ioKvPath :: CBytes
ioKvPath = textToCBytes ioRootPath <> "/kv"

paths :: [CBytes]
paths = [ textToCBytes rootPath
, textToCBytes ioRootPath
, textToCBytes ioRootPath
, ioKvPath
, textToCBytes $ myRootPath @TaskIdMeta @ZHandle
, textToCBytes $ myRootPath @TaskMeta @ZHandle
, textToCBytes $ myRootPath @TaskKvMeta @ZHandle
, textToCBytes $ myRootPath @ShardReader @ZHandle
, textToCBytes $ myRootPath @PersistentQuery @ZHandle
, textToCBytes $ myRootPath @SubscriptionWrap @ZHandle
Expand Down

0 comments on commit cf9366f

Please sign in to comment.