Skip to content

Commit

Permalink
fix server ready
Browse files Browse the repository at this point in the history
  • Loading branch information
Time Hoo committed Aug 23, 2022
1 parent a9c1024 commit 7f2405b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 9 deletions.
7 changes: 4 additions & 3 deletions hstream-gossip/src/HStream/Gossip/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
module HStream.Gossip.Core where

import Control.Concurrent (killThread, newEmptyMVar,
putMVar, takeMVar,
putMVar, readMVar, takeMVar,
tryPutMVar)
import Control.Concurrent.STM (atomically, check, dupTChan,
flushTQueue, modifyTVar,
Expand Down Expand Up @@ -214,7 +214,8 @@ handleEventMessage gc@GossipContext{..} msg@(EventMessage eName lpTime bs) = do
then do
Log.info . Log.buildString $ "[Server Node" <> show (I.serverNodeId serverSelf)
<> "] Handling Internal Event" <> show eName <> " with lamport " <> show lpInt
handleINITEvent gc bs
(isSeed, _, _) <- readMVar seedsInfo
when isSeed $ handleINITEvent gc bs
else Log.info $ "Action dealing with event " <> Log.buildString' eName <> " not found"
Just action -> do
action bs
Expand All @@ -228,7 +229,7 @@ handleINITEvent gc@GossipContext{..} payload = do
void $ tryPutMVar clusterInited Gossip
atomically $ do
mWorkers <- readTVar workers
check $ Map.size mWorkers == length seeds
check $ (Map.size mWorkers + 1) == length seeds
broadCastUserEvent gc eventNameINITED (BL.toStrict $ PT.toLazyByteString serverSelf)

broadCastUserEvent :: GossipContext -> EventName -> EventPayload -> IO ()
Expand Down
12 changes: 6 additions & 6 deletions hstream-gossip/src/HStream/Gossip/Start.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ initGossipContext gossipOpts _eventHandlers serverSelf seeds = do
clusterInited <- newEmptyMVar
clusterReady <- newEmptyMVar
numInited <- newEmptyMVar
seedsInfo <- newEmptyMVar
let eventHandlers = Map.insert eventNameINITED (handleINITEDEvent numInited (length seeds) clusterReady) _eventHandlers
return GossipContext {..}

startGossip :: GossipContext -> IO ()
startGossip gc@GossipContext {..} = do
seedInfo <- newEmptyMVar
void $ startListeners (I.serverNodeHost serverSelf) seedInfo gc
(isSeed, seeds', wasIDead) <- readMVar seedInfo
void $ startListeners (I.serverNodeHost serverSelf) gc
(isSeed, seeds', wasIDead) <- readMVar seedsInfo
if isSeed && not wasIDead
then newTVarIO 0 >>= putMVar numInited . Just
>> threadDelay 1000000
Expand Down Expand Up @@ -137,13 +137,13 @@ handleINITEDEvent initedM l ready payload = readMVar initedM >>= \case
if x == l then putMVar ready () >> Log.info "All servers have been initialized"
else when (x > l) $ Log.warning "More seeds has been initiated, something went wrong"

startListeners :: ByteString -> MVar (Bool, [(ByteString, Int)], Bool) -> GossipContext -> IO (Async ())
startListeners grpcHost seedInfo gc@GossipContext {..} = do
startListeners :: ByteString -> GossipContext -> IO (Async ())
startListeners grpcHost gc@GossipContext {..} = do
let grpcOpts = GRPC.defaultServiceOptions {
GRPC.serverHost = GRPC.Host grpcHost
, GRPC.serverPort = GRPC.Port $ fromIntegral $ I.serverNodeGossipPort serverSelf
, GRPC.serverOnStarted = Just $ do
void . forkIO $ amIASeed serverSelf seeds >>= putMVar seedInfo
void . forkIO $ amIASeed serverSelf seeds >>= putMVar seedsInfo
Log.debug . Log.buildString $ "Server node " <> show serverSelf <> " started"
}
let api = handlers gc
Expand Down
1 change: 1 addition & 0 deletions hstream-gossip/src/HStream/Gossip/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ data GossipContext = GossipContext
{ serverSelf :: I.ServerNode
, eventHandlers :: EventHandlers
, seeds :: [(ByteString, Int)]
, seedsInfo :: MVar (Bool, [(ByteString, Int)], Bool)
, serverList :: TVar ServerList
, actionChan :: TChan RequestAction
, statePool :: TQueue G.StateMessage
Expand Down

0 comments on commit 7f2405b

Please sign in to comment.