Implement a call to rotate dynamos in CSJ
facundominguez authored and amesgen committed Aug 7, 2024
-- when the client should pause, download headers, or ask about agreement with
-- a given point (jumping). See the 'Jumping' type for more details.
-- Interactions with the BlockFetch logic
-- --------------------------------------
-- When syncing, the BlockFetch logic will fetch blocks from the dynamo. If the
-- dynamo is responding too slowly, the BlockFetch logic can ask to change the
-- dynamo with a call to 'rotateDynamo'.
-- Interactions with the Limit on Patience
-- ---------------------------------------
-- > j ╔════════╗
-- > ╭────────── ║ Dynamo ║ ◀─────────╮
-- > │ ╚════════╝ │f
-- > ▼ ▲ │
-- > ┌────────────┐ │ k ┌──────────┐
-- > │ Disengaged │ ◀──────────│────────── │ Objector │
-- > └────────────┘ ╭─────│────────── └──────────┘
-- > │ │ ▲ ▲ │
-- > g│ │e b │ │ │
-- > │ │ ╭─────╯ i│ │c
-- > ╭╌╌╌╌╌╌▼╌╌╌╌╌╌╌╌╌╌╌╌╌│╌╌╌╌╌╌╌╌╌╌│╌▼╌╌╌╮
-- > │ ╭──╚════════╝ │f
-- > ▼ ▲ │
-- > ┌────────────┐ │ k ┌──────────┐
-- > │ Disengaged │ ◀──────────│────────── │ Objector │
-- > └────────────┘ ╭─────│────────── └──────────┘
-- > │ │ ▲ ▲ │
-- > l│ g│ │e b │ │ │
-- > │ │ ╭─────╯ i│ │c
-- > ╭╌╌╌╌╌╌▼╌╌╌╌╌╌╌╌╌╌╌╌╌│╌╌╌╌╌╌╌╌╌╌│╌▼╌╌╌╮
-- > ┆ ╔═══════╗ a ┌──────┐ d ┌─────┐ |
-- > ┆ ║ Happy ║ ───▶ │ LFI* │ ───▶ │ FI* │ |
-- > ┆ ╚═══════╝ ◀─╮ └──────┘ └─────┘ |
-- If dynamo or objector claim to have no more headers, they are disengaged
-- (j|k).
-- The BlockFetch logic can ask to change the dynamo if it is not serving blocks
-- fast enough. If there are other non-disengaged peers the dynamo is demoted to
-- a jumper (l) and a new dynamo is elected.
module Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping (
, ContextWith (..)
, Instruction (..)
, JumpInstruction (..)
, JumpResult (..)
, Jumping (..)
, getDynamo
, makeContext
, mkJumping
, noJumping
, registerClient
, rotateDynamo
, unregisterClient
) where

import Cardano.Slotting.Slot (SlotNo (..), WithOrigin (..))
import Control.Monad (forM, forM_, when)
import Control.Monad (forM, forM_, void, when)
import Data.Foldable (toList)
import Data.List (sortOn)
import qualified Data.Map as Map
import Data.Maybe (catMaybes, fromMaybe)
import Data.Maybe.Strict (StrictMaybe (..))
import Data.Sequence.Strict (StrictSeq)
import qualified Data.Sequence.Strict as Seq
import GHC.Generics (Generic)
import Ouroboros.Consensus.Block (HasHeader (getHeaderFields), Header,
Dynamo _ lastJumpSlot
| slot < lastJumpSlot -> do
disengage (handle context)
electNewDynamo (stripContext context)
void $ electNewDynamo (stripContext context)
| otherwise -> pure ()

-- | This function is called when we receive a 'MsgAwaitReply' message.
Expand All @@ -478,7 +493,7 @@ onAwaitReply context =
readTVar (cschJumping (handle context)) >>= \case
Dynamo{} -> do
disengage (handle context)
electNewDynamo (stripContext context)
void $ electNewDynamo (stripContext context)
Objector{} -> do
disengage (handle context)
electNewObjector (stripContext context)
updateChainSyncState (handle context) jumpInfo
RejectedJump JumpToGoodPoint{} -> do
startDisengaging (handle context)
electNewDynamo (stripContext context)
void $ electNewDynamo (stripContext context)

-- Not interesting in the dynamo state
AcceptedJump JumpTo{} -> pure ()
Expand Down Expand Up @@ -662,10 +677,10 @@ updateJumpInfo context jumpInfo =
getDynamo ::
(MonadSTM m) =>
ChainSyncClientHandleCollection peer m blk ->
STM m (Maybe (ChainSyncClientHandle m blk))
STM m (Maybe (peer, ChainSyncClientHandle m blk))
getDynamo handlesCol = do
handles <- cschcSeq handlesCol
fmap snd <$> findM (\(_, handle) -> isDynamo <$> readTVar (cschJumping handle)) handles
findM (\(_, handle) -> isDynamo <$> readTVar (cschJumping handle)) handles
isDynamo Dynamo{} = True
isDynamo _ = False
Expand Down Expand Up @@ -720,7 +735,7 @@ registerClient context peer csState mkHandle = do
Nothing -> do
fragment <- csCandidate <$> readTVar csState
pure $ Dynamo DynamoStarted $ pointSlot $ AF.anchorPoint fragment
Just handle -> do
Just (_, handle) -> do
mJustInfo <- readTVar (cschJumpInfo handle)
newJumper mJustInfo (Happy FreshJumper Nothing)
Expand All @@ -744,7 +759,52 @@ unregisterClient context = do
Disengaged{} -> pure ()
Jumper{} -> pure ()
Objector{} -> electNewObjector context'
Dynamo{} -> electNewDynamo context'
Dynamo{} -> void $ electNewDynamo context'

-- | Elects a new dynamo by demoting the given dynamo to a jumper, moving the
-- peer to the end of the queue of chain sync handles and electing a new dynamo.
-- It does nothing if there is no other engaged peer to elect or if the given
-- peer is not the dynamo.
-- Yields the new dynamo, if there is one.
rotateDynamo ::
( Ord peer,
LedgerSupportsProtocol blk,
MonadSTM m
) =>
ChainSyncClientHandleCollection peer m blk ->
peer ->
STM m (Maybe (peer, ChainSyncClientHandle m blk))
rotateDynamo handlesCol peer = do
handles <- cschcMap handlesCol
case handles Map.!? peer of
Nothing ->
-- Do not re-elect a dynamo if the peer has been disconnected.
getDynamo handlesCol
Just oldDynHandle ->
readTVar (cschJumping oldDynHandle) >>= \case
Dynamo{} -> do
cschcRotateHandle handlesCol peer
peerStates <- cschcSeq handlesCol
mEngaged <- findNonDisengaged peerStates
case mEngaged of
Nothing ->
-- There are no engaged peers. This case cannot happen, as the
-- dynamo is always engaged.
error "rotateDynamo: no engaged peer found"
Just (newDynamoId, newDynHandle)
| newDynamoId == peer ->
-- The old dynamo is the only engaged peer left.
pure $ Just (newDynamoId, newDynHandle)
| otherwise -> do
newJumper Nothing (Happy FreshJumper Nothing)
>>= writeTVar (cschJumping oldDynHandle)
promoteToDynamo peerStates newDynamoId newDynHandle
pure $ Just (newDynamoId, newDynHandle)
_ ->
-- Do not re-elect a dynamo if the peer is not the dynamo.
getDynamo handlesCol

-- | Choose an unspecified new non-idling dynamo and demote all other peers to
Expand All @@ -754,32 +814,53 @@ electNewDynamo ::
LedgerSupportsProtocol blk
) =>
Context m peer blk ->
STM m ()
STM m (Maybe (peer, ChainSyncClientHandle m blk))
electNewDynamo context = do
peerStates <- cschcSeq (handlesCol context)
mDynamo <- findNonDisengaged peerStates
case mDynamo of
Nothing -> pure ()
Nothing -> pure Nothing
Just (dynId, dynamo) -> do
fragment <- csCandidate <$> readTVar (cschState dynamo)
mJumpInfo <- readTVar (cschJumpInfo dynamo)
-- If there is no jump info, the dynamo must be just starting and
-- there is no need to set the intersection of the ChainSync server.
let dynamoInitState = maybe DynamoStarted DynamoStarting mJumpInfo
writeTVar (cschJumping dynamo) $
Dynamo dynamoInitState $ pointSlot $ AF.headPoint fragment
-- Demote all other peers to jumpers
forM_ peerStates $ \(peer, st) ->
when (peer /= dynId) $ do
jumpingState <- readTVar (cschJumping st)
when (not (isDisengaged jumpingState)) $
newJumper mJumpInfo (Happy FreshJumper Nothing)
>>= writeTVar (cschJumping st)
findNonDisengaged =
findM $ \(_, st) -> not . isDisengaged <$> readTVar (cschJumping st)
isDisengaged Disengaged{} = True
isDisengaged _ = False
promoteToDynamo peerStates dynId dynamo
pure $ Just (dynId, dynamo)

-- | Promote the given peer to dynamo and demote all other peers to jumpers.
promoteToDynamo ::
( MonadSTM m,
Eq peer,
LedgerSupportsProtocol blk
) =>
StrictSeq (peer, ChainSyncClientHandle m blk) ->
peer ->
ChainSyncClientHandle m blk ->
STM m ()
promoteToDynamo peerStates dynId dynamo = do
fragment <- csCandidate <$> readTVar (cschState dynamo)
mJumpInfo <- readTVar (cschJumpInfo dynamo)
-- If there is no jump info, the dynamo must be just starting and
-- there is no need to set the intersection of the ChainSync server.
let dynamoInitState = maybe DynamoStarted DynamoStarting mJumpInfo
writeTVar (cschJumping dynamo) $
Dynamo dynamoInitState $ pointSlot $ AF.headPoint fragment
-- Demote all other peers to jumpers
forM_ peerStates $ \(peer, st) ->
when (peer /= dynId) $ do
jumpingState <- readTVar (cschJumping st)
when (not (isDisengaged jumpingState)) $
newJumper mJumpInfo (Happy FreshJumper Nothing)
>>= writeTVar (cschJumping st)

-- | Find a non-disengaged peer in the given sequence
findNonDisengaged ::
(MonadSTM m) =>
StrictSeq (peer, ChainSyncClientHandle m blk) ->
STM m (Maybe (peer, ChainSyncClientHandle m blk))
findNonDisengaged =
findM $ \(_, st) -> not . isDisengaged <$> readTVar (cschJumping st)

isDisengaged :: ChainSyncJumpingState m blk -> Bool
isDisengaged Disengaged{} = True
isDisengaged _ = False

findM :: (Foldable f, Monad m) => (a -> m Bool) -> f a -> m (Maybe a)
findM p =
