diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/Jumping.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/Jumping.hs index 6e3283a14e..7af23d8f18 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/Jumping.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/Jumping.hs @@ -74,6 +74,13 @@ -- when the client should pause, download headers, or ask about agreement with -- a given point (jumping). See the 'Jumping' type for more details. -- +-- Interactions with the BlockFetch logic +-- -------------------------------------- +-- +-- When syncing, the BlockFetch logic will fetch blocks from the dynamo. If the +-- dynamo is responding too slowly, the BlockFetch logic can ask to change the +-- dynamo with a call to 'rotateDynamo'. +-- -- Interactions with the Limit on Patience -- --------------------------------------- -- @@ -100,15 +107,15 @@ -- -- > j ╔════════╗ -- > ╭────────── ║ Dynamo ║ ◀─────────╮ --- > │ ╚════════╝ │f --- > ▼ ▲ │ --- > ┌────────────┐ │ k ┌──────────┐ --- > │ Disengaged │ ◀───────────│────────── │ Objector │ --- > └────────────┘ ╭─────│────────── └──────────┘ --- > │ │ ▲ ▲ │ --- > g│ │e b │ │ │ --- > │ │ ╭─────╯ i│ │c --- > ╭╌╌╌╌╌╌╌▼╌╌╌╌╌╌╌╌╌╌╌╌╌│╌╌╌╌╌╌╌╌╌╌│╌▼╌╌╌╮ +-- > │ ╭──╚════════╝ │f +-- > ▼ │ ▲ │ +-- > ┌────────────┐ │ │ k ┌──────────┐ +-- > │ Disengaged │ ◀─│─────────│────────── │ Objector │ +-- > └────────────┘ │ ╭─────│────────── └──────────┘ +-- > │ │ │ ▲ ▲ │ +-- > l│ g│ │e b │ │ │ +-- > │ │ │ ╭─────╯ i│ │c +-- > ╭╌╌╌▼╌╌╌▼╌╌╌╌╌╌╌╌╌╌╌╌╌│╌╌╌╌╌╌╌╌╌╌│╌▼╌╌╌╮ -- > ┆ ╔═══════╗ a ┌──────┐ d ┌─────┐ | -- > ┆ ║ Happy ║ ───▶ │ LFI* │ ───▶ │ FI* │ | -- > ┆ ╚═══════╝ ◀─╮ └──────┘ └─────┘ | @@ -147,6 +154,10 @@ -- If dynamo or objector claim to have no more headers, they are disengaged -- (j|k). -- +-- The BlockFetch logic can ask to change the dynamo if it is not serving blocks +-- fast enough. If there are other non-disengaged peers the dynamo is demoted to +-- a jumper (l) and a new dynamo is elected. +-- module Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping ( Context , ContextWith (..) @@ -154,19 +165,23 @@ module Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping ( , JumpInstruction (..) , JumpResult (..) , Jumping (..) + , getDynamo , makeContext , mkJumping , noJumping , registerClient + , rotateDynamo , unregisterClient ) where import Cardano.Slotting.Slot (SlotNo (..), WithOrigin (..)) -import Control.Monad (forM, forM_, when) +import Control.Monad (forM, forM_, void, when) import Data.Foldable (toList) import Data.List (sortOn) +import qualified Data.Map as Map import Data.Maybe (catMaybes, fromMaybe) import Data.Maybe.Strict (StrictMaybe (..)) +import Data.Sequence.Strict (StrictSeq) import qualified Data.Sequence.Strict as Seq import GHC.Generics (Generic) import Ouroboros.Consensus.Block (HasHeader (getHeaderFields), Header, @@ -460,7 +475,7 @@ onRollBackward context slot = Dynamo _ lastJumpSlot | slot < lastJumpSlot -> do disengage (handle context) - electNewDynamo (stripContext context) + void $ electNewDynamo (stripContext context) | otherwise -> pure () -- | This function is called when we receive a 'MsgAwaitReply' message. @@ -478,7 +493,7 @@ onAwaitReply context = readTVar (cschJumping (handle context)) >>= \case Dynamo{} -> do disengage (handle context) - electNewDynamo (stripContext context) + void $ electNewDynamo (stripContext context) Objector{} -> do disengage (handle context) electNewObjector (stripContext context) @@ -511,7 +526,7 @@ processJumpResult context jumpResult = updateChainSyncState (handle context) jumpInfo RejectedJump JumpToGoodPoint{} -> do startDisengaging (handle context) - electNewDynamo (stripContext context) + void $ electNewDynamo (stripContext context) -- Not interesting in the dynamo state AcceptedJump JumpTo{} -> pure () @@ -662,10 +677,10 @@ updateJumpInfo context jumpInfo = getDynamo :: (MonadSTM m) => ChainSyncClientHandleCollection peer m blk -> - STM m (Maybe (ChainSyncClientHandle m blk)) + STM m (Maybe (peer, ChainSyncClientHandle m blk)) getDynamo handlesCol = do handles <- cschcSeq handlesCol - fmap snd <$> findM (\(_, handle) -> isDynamo <$> readTVar (cschJumping handle)) handles + findM (\(_, handle) -> isDynamo <$> readTVar (cschJumping handle)) handles where isDynamo Dynamo{} = True isDynamo _ = False @@ -720,7 +735,7 @@ registerClient context peer csState mkHandle = do Nothing -> do fragment <- csCandidate <$> readTVar csState pure $ Dynamo DynamoStarted $ pointSlot $ AF.anchorPoint fragment - Just handle -> do + Just (_, handle) -> do mJustInfo <- readTVar (cschJumpInfo handle) newJumper mJustInfo (Happy FreshJumper Nothing) cschJumping <- newTVar csjState @@ -744,7 +759,52 @@ unregisterClient context = do Disengaged{} -> pure () Jumper{} -> pure () Objector{} -> electNewObjector context' - Dynamo{} -> electNewDynamo context' + Dynamo{} -> void $ electNewDynamo context' + +-- | Elects a new dynamo by demoting the given dynamo to a jumper, moving the +-- peer to the end of the queue of chain sync handles and electing a new dynamo. +-- +-- It does nothing if there is no other engaged peer to elect or if the given +-- peer is not the dynamo. +-- +-- Yields the new dynamo, if there is one. +rotateDynamo :: + ( Ord peer, + LedgerSupportsProtocol blk, + MonadSTM m + ) => + ChainSyncClientHandleCollection peer m blk -> + peer -> + STM m (Maybe (peer, ChainSyncClientHandle m blk)) +rotateDynamo handlesCol peer = do + handles <- cschcMap handlesCol + case handles Map.!? peer of + Nothing -> + -- Do not re-elect a dynamo if the peer has been disconnected. + getDynamo handlesCol + Just oldDynHandle -> + readTVar (cschJumping oldDynHandle) >>= \case + Dynamo{} -> do + cschcRotateHandle handlesCol peer + peerStates <- cschcSeq handlesCol + mEngaged <- findNonDisengaged peerStates + case mEngaged of + Nothing -> + -- There are no engaged peers. This case cannot happen, as the + -- dynamo is always engaged. + error "rotateDynamo: no engaged peer found" + Just (newDynamoId, newDynHandle) + | newDynamoId == peer -> + -- The old dynamo is the only engaged peer left. + pure $ Just (newDynamoId, newDynHandle) + | otherwise -> do + newJumper Nothing (Happy FreshJumper Nothing) + >>= writeTVar (cschJumping oldDynHandle) + promoteToDynamo peerStates newDynamoId newDynHandle + pure $ Just (newDynamoId, newDynHandle) + _ -> + -- Do not re-elect a dynamo if the peer is not the dynamo. + getDynamo handlesCol -- | Choose an unspecified new non-idling dynamo and demote all other peers to -- jumpers. @@ -754,32 +814,53 @@ electNewDynamo :: LedgerSupportsProtocol blk ) => Context m peer blk -> - STM m () + STM m (Maybe (peer, ChainSyncClientHandle m blk)) electNewDynamo context = do peerStates <- cschcSeq (handlesCol context) mDynamo <- findNonDisengaged peerStates case mDynamo of - Nothing -> pure () + Nothing -> pure Nothing Just (dynId, dynamo) -> do - fragment <- csCandidate <$> readTVar (cschState dynamo) - mJumpInfo <- readTVar (cschJumpInfo dynamo) - -- If there is no jump info, the dynamo must be just starting and - -- there is no need to set the intersection of the ChainSync server. - let dynamoInitState = maybe DynamoStarted DynamoStarting mJumpInfo - writeTVar (cschJumping dynamo) $ - Dynamo dynamoInitState $ pointSlot $ AF.headPoint fragment - -- Demote all other peers to jumpers - forM_ peerStates $ \(peer, st) -> - when (peer /= dynId) $ do - jumpingState <- readTVar (cschJumping st) - when (not (isDisengaged jumpingState)) $ - newJumper mJumpInfo (Happy FreshJumper Nothing) - >>= writeTVar (cschJumping st) - where - findNonDisengaged = - findM $ \(_, st) -> not . isDisengaged <$> readTVar (cschJumping st) - isDisengaged Disengaged{} = True - isDisengaged _ = False + promoteToDynamo peerStates dynId dynamo + pure $ Just (dynId, dynamo) + +-- | Promote the given peer to dynamo and demote all other peers to jumpers. +promoteToDynamo :: + ( MonadSTM m, + Eq peer, + LedgerSupportsProtocol blk + ) => + StrictSeq (peer, ChainSyncClientHandle m blk) -> + peer -> + ChainSyncClientHandle m blk -> + STM m () +promoteToDynamo peerStates dynId dynamo = do + fragment <- csCandidate <$> readTVar (cschState dynamo) + mJumpInfo <- readTVar (cschJumpInfo dynamo) + -- If there is no jump info, the dynamo must be just starting and + -- there is no need to set the intersection of the ChainSync server. + let dynamoInitState = maybe DynamoStarted DynamoStarting mJumpInfo + writeTVar (cschJumping dynamo) $ + Dynamo dynamoInitState $ pointSlot $ AF.headPoint fragment + -- Demote all other peers to jumpers + forM_ peerStates $ \(peer, st) -> + when (peer /= dynId) $ do + jumpingState <- readTVar (cschJumping st) + when (not (isDisengaged jumpingState)) $ + newJumper mJumpInfo (Happy FreshJumper Nothing) + >>= writeTVar (cschJumping st) + +-- | Find a non-disengaged peer in the given sequence +findNonDisengaged :: + (MonadSTM m) => + StrictSeq (peer, ChainSyncClientHandle m blk) -> + STM m (Maybe (peer, ChainSyncClientHandle m blk)) +findNonDisengaged = + findM $ \(_, st) -> not . isDisengaged <$> readTVar (cschJumping st) + +isDisengaged :: ChainSyncJumpingState m blk -> Bool +isDisengaged Disengaged{} = True +isDisengaged _ = False findM :: (Foldable f, Monad m) => (a -> m Bool) -> f a -> m (Maybe a) findM p =