Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: some attempts on cluster consistence (about resource allocation). Oh God... #1824

Merged
merged 3 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 50 additions & 34 deletions common/server/HStream/Common/Server/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module HStream.Common.Server.Lookup
, kafkaResourceMetaId
) where

import Control.Concurrent (threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException (..), throwIO,
try)
Expand Down Expand Up @@ -41,47 +42,62 @@ lookupNodePersist
-> Text
-> Maybe Text
-> IO A.ServerNode
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey = do
-- FIXME: it will insert the results of lookup no matter the resource exists
-- or not
M.getMetaWithVer @TaskAllocation metaId metaHandle >>= \case
Nothing -> do
(epoch, hashRing) <- readTVarIO loadBalanceHashRing
theNode <- getResNode hashRing key advertisedListenersKey
try (M.insertMeta @TaskAllocation
metaId
(TaskAllocation epoch (A.serverNodeId theNode))
metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode
Just (TaskAllocation epoch nodeId, version) -> do
serverList <- getMemberList gossipContext >>=
fmap V.concat . mapM (fromInternalServerNodeWithKey advertisedListenersKey)
case find ((nodeId == ) . A.serverNodeId) serverList of
Just theNode -> return theNode
lookupNodePersist metaHandle_ gossipContext_ loadBalanceHashRing_
key_ metaId_ advertisedListenersKey_ =
-- FIXME: This is only a mitigation for the case that the node has not
-- known the full cluster info. Reinvestigate it!!!
-- And as you see, a hard-coded constant...
go metaHandle_ gossipContext_ loadBalanceHashRing_ key_ metaId_ advertisedListenersKey_ 5
where
-- TODO: Currerntly, 'leftRetries' only works before a re-allocation. It can be also
-- used on other cases such as encountering an exception.
go metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey leftRetries = do
-- FIXME: it will insert the results of lookup no matter the resource exists
-- or not
M.getMetaWithVer @TaskAllocation metaId metaHandle >>= \case
Nothing -> do
(epoch', hashRing) <- atomically $ do
(epoch', hashRing) <- readTVar loadBalanceHashRing
if epoch' > epoch
then pure (epoch', hashRing)
else retry
theNode' <- getResNode hashRing key advertisedListenersKey
try (M.updateMeta @TaskAllocation metaId
(TaskAllocation epoch' (A.serverNodeId theNode'))
(Just version) metaHandle) >>= \case
(epoch, hashRing) <- readTVarIO loadBalanceHashRing
theNode <- getResNode hashRing key advertisedListenersKey
try (M.insertMeta @TaskAllocation
metaId
(TaskAllocation epoch (A.serverNodeId theNode))
metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode'
Right () -> return theNode
Just (TaskAllocation epoch nodeId, version) -> do
serverList <- getMemberList gossipContext >>=
fmap V.concat . mapM (fromInternalServerNodeWithKey advertisedListenersKey)
case find ((nodeId == ) . A.serverNodeId) serverList of
Just theNode -> return theNode
Nothing -> do
if leftRetries > 0
then do
Log.info $ "<lookupNodePersist> on <key=" <> Log.buildString' key <> ", metaId=" <>
Log.buildString' metaId <> ">: found on Node=" <> Log.buildString' nodeId <>
", but not sure if it's really dead. Left " <> Log.buildString' leftRetries <>
" retries before re-allocate it..."
threadDelay (1 * 1000 * 1000)
go metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey (leftRetries - 1)
else do
(epoch', hashRing) <- readTVarIO loadBalanceHashRing
theNode' <- getResNode hashRing key advertisedListenersKey
try (M.updateMeta @TaskAllocation metaId
(TaskAllocation epoch' (A.serverNodeId theNode'))
(Just version) metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode'

data KafkaResource
= KafkaResTopic Text
Expand Down
16 changes: 14 additions & 2 deletions hstream-io/HStream/IO/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ createIOTaskFromTaskInfo
=> Worker -> T.Text -> TaskInfo -> IOOptions -> Bool -> Bool -> Bool -> IO ()
createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..}
ioOptions cleanIfExists createMetaData enableCheck = do
getIOTask worker taskName >>= \case
M.getIOTaskFromName workerHandle taskName >>= \case
Nothing -> pure ()
Just _ -> do
if cleanIfExists
Expand All @@ -106,7 +106,7 @@ createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..}

when createMetaData $ M.createIOTaskMeta workerHandle taskName taskId taskInfo
C.modifyMVar_ ioTasksM $ \ioTasks -> do
-- FIXME: already check ioTask exist in `getIOTask worker` step, no need check again
-- FIXME: already check ioTask exist in `getIOTaskFromName` step, no need check again
case HM.lookup taskName ioTasks of
Just _ -> throwIO $ HE.ConnectorExists taskName
Nothing -> do
Expand Down Expand Up @@ -220,9 +220,21 @@ updateConnectorConfig worker name config = do
<> ", new config:" <> Log.buildString' newConnCfg
return True

-- WARNING: This function uses only cache in memory, which can be
-- outdated, especially under complex cluster circumstances.
-- Please be very careful when using this function, e.g. check
-- if a task already exists before creating it.
-- And remember there are <name -> task id -> task meta> mappings
-- in meta store, and the latter is never cleaned up!!!
getIOTask :: Worker -> T.Text -> IO (Maybe IOTask)
getIOTask Worker{..} name = HM.lookup name <$> C.readMVar ioTasksM

-- WARNING: This function uses only cache in memory, which can be
-- outdated, especially under complex cluster circumstances.
-- Please be very careful when using this function, e.g. check
-- if a task already exists before creating it.
-- And remember there are <name -> task id -> task meta> mappings
-- in meta store, and the latter is never cleaned up!!!
getIOTask_ :: Worker -> T.Text -> IO IOTask
getIOTask_ Worker{..} name = do
ioTasks <- C.readMVar ioTasksM
Expand Down
9 changes: 8 additions & 1 deletion hstream/app/lib/KafkaServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,14 @@ serve sc@ServerContext{..} netOpts usingCppServer usingSparseOffset = do
-- FIXME: Why need to call deleteAll here?
-- Also in CI, getRqResult(common/hstream/HStream/MetaStore/RqliteUtils.hs) may throw a RQLiteUnspecifiedErr
-- because the affected rows are more than 1, why that's invalid ?
deleteAllMeta @M.TaskAllocation metaHandle `catches` exceptionHandlers
-- FIXME: The following line is very delicate and can cause weird problems.
-- It was intended to re-allocate tasks after a server restart. However,
-- this should be done BEFORE any node serves any client request or
-- internal task. However, the current `serverOnStarted` is not
-- ensured to be called before serving outside.
-- TODO: I do not have 100% confidence this is correct. So it should be
-- carefully investigated and tested.
-- deleteAllMeta @M.TaskAllocation metaHandle `catches` exceptionHandlers

Log.info "starting task detector"
TM.runTaskDetector $ TM.TaskDetector {
Expand Down
9 changes: 8 additions & 1 deletion hstream/app/server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,14 @@ serve sc@ServerContext{..} rpcOpts enableStreamV2 = do
Gossip -> return ()
_ -> do
getProtoTimestamp >>= \x -> upsertMeta @Proto.Timestamp clusterStartTimeId x metaHandle
handle (\(_ :: RQLiteRowNotFound) -> return ()) $ deleteAllMeta @TaskAllocation metaHandle
-- FIXME: The following line is very delicate and can cause weird problems.
-- It was intended to re-allocate tasks after a server restart. However,
-- this should be done BEFORE any node serves any client request or
-- internal task. However, the current `serverOnStarted` is not
-- ensured to be called before serving outside.
-- TODO: I do not have 100% confidence this is correct. So it should be
-- carefully investigated and tested.
-- handle (\(_ :: RQLiteRowNotFound) -> return ()) $ deleteAllMeta @TaskAllocation metaHandle
-- recover tasks
Log.info "recovering local io tasks"
Cluster.recoverLocalTasks sc scIOWorker
Expand Down
Loading