From bab4a6acd2eff9710ff4cd2e95fcee6fcc6d5d9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20=E2=80=9CNiols=E2=80=9D=20Jeannerod?= Date: Thu, 27 Jun 2024 09:18:20 +0000 Subject: [PATCH] Change how last starvation is recorded Previously, we only registered the time at which starvation started. This is in fact not enough: if after the grace period the peer is still not making us unstarved, we won't detect it. Instead, we record the last starvation as either ongoing or we record its end time. --- .../Test/Consensus/PeerSimulator/Trace.hs | 4 +++ .../BlockFetch/ClientInterface.hs | 9 ++--- .../Consensus/Storage/ChainDB/API.hs | 8 +++-- .../Consensus/Storage/ChainDB/Impl.hs | 8 +++-- .../Storage/ChainDB/Impl/Background.hs | 6 +++- .../Consensus/Storage/ChainDB/Impl/Query.hs | 11 +++++-- .../Consensus/Storage/ChainDB/Impl/Types.hs | 33 ++++++++++++------- 7 files changed, 54 insertions(+), 25 deletions(-) diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Trace.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Trace.hs index f003dfe447..f9797d2ebc 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Trace.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Trace.hs @@ -48,6 +48,8 @@ import Ouroboros.Network.AnchoredFragment (AnchoredFragment, headPoint) import qualified Ouroboros.Network.AnchoredFragment as AF import Ouroboros.Network.Block (SlotNo (SlotNo), Tip, castPoint) +import Ouroboros.Network.BlockFetch.ConsensusInterface + (ChainSelStarvation (..)) import Test.Consensus.PointSchedule.NodeState (NodeState) import Test.Consensus.PointSchedule.Peers (Peer (Peer), PeerId) import Test.Util.TersePrinting (terseAnchor, terseBlock, @@ -369,6 +371,8 @@ traceChainDBEventTestBlockWith tracer = \case AddedReprocessLoEBlocksToQueue -> trace $ "Requested ChainSel run" _ -> pure () + ChainDB.TraceChainSelStarvation ChainSelStarvationOngoing -> trace "ChainSel starved" + ChainDB.TraceChainSelStarvation (ChainSelStarvationEndedAt time) -> trace $ "ChainSel starvation ended at " ++ prettyTime time _ -> pure () where trace = traceUnitWith tracer "ChainDB" diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs index d91487a43c..93750475a9 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs @@ -42,7 +42,8 @@ import Ouroboros.Network.AnchoredFragment (AnchoredFragment) import qualified Ouroboros.Network.AnchoredFragment as AF import Ouroboros.Network.Block (MaxSlotNo) import Ouroboros.Network.BlockFetch.ConsensusInterface - (BlockFetchConsensusInterface (..), FetchMode (..), + (BlockFetchConsensusInterface (..), + ChainSelStarvation (..), FetchMode (..), FromConsensus (..), WhetherReceivingTentativeBlocks (..)) import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers, requiresBootstrapPeers) @@ -56,7 +57,7 @@ data ChainDbView m blk = ChainDbView { , getIsFetched :: STM m (Point blk -> Bool) , getMaxSlotNo :: STM m MaxSlotNo , addBlockWaitWrittenToDisk :: InvalidBlockPunishment m -> blk -> m Bool - , getLastTimeStarved :: STM m Time + , getChainSelStarvation :: STM m ChainSelStarvation } defaultChainDbView :: IOLike m => ChainDB m blk -> ChainDbView m blk @@ -65,7 +66,7 @@ defaultChainDbView chainDB = ChainDbView { , getIsFetched = ChainDB.getIsFetched chainDB , getMaxSlotNo = ChainDB.getMaxSlotNo chainDB , addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB - , getLastTimeStarved = ChainDB.getLastTimeStarved chainDB + , getChainSelStarvation = ChainDB.getChainSelStarvation chainDB } -- | How to get the wall-clock time of a slot. Note that this is a very @@ -351,7 +352,7 @@ mkBlockFetchConsensusInterface headerForgeUTCTime = slotForgeTime . headerRealPoint . unFromConsensus blockForgeUTCTime = slotForgeTime . blockRealPoint . unFromConsensus - lastChainSelStarvation = getLastTimeStarved chainDB + readChainSelStarvation = getChainSelStarvation chainDB demoteCSJDynamo :: peer -> m () demoteCSJDynamo = void . atomically . Jumping.rotateDynamo csHandlesCol diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs index 43eb45418e..1042680591 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs @@ -91,6 +91,8 @@ import qualified Ouroboros.Network.AnchoredFragment as AF import Ouroboros.Network.Block (ChainUpdate, MaxSlotNo, Serialised (..)) import qualified Ouroboros.Network.Block as Network +import Ouroboros.Network.BlockFetch.ConsensusInterface + (ChainSelStarvation (..)) import Ouroboros.Network.Mock.Chain (Chain (..)) import qualified Ouroboros.Network.Mock.Chain as Chain import System.FS.API.Types (FsError) @@ -334,9 +336,9 @@ data ChainDB m blk = ChainDB { -- invalid block is detected. These blocks are likely to be valid. , getIsInvalidBlock :: STM m (WithFingerprint (HeaderHash blk -> Maybe (InvalidBlockReason blk))) - -- | The last time we starved the chainsel thread. this is used by the - -- blockfetch decision logic to demote peers. - , getLastTimeStarved :: STM m Time + -- | Whether ChainSel is currently starved, or when was last time it + -- stopped being starved. + , getChainSelStarvation :: STM m ChainSelStarvation , closeDB :: m () diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs index d9c391924c..ec5cbe320c 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs @@ -68,6 +68,8 @@ import Ouroboros.Consensus.Util.ResourceRegistry (WithTempRegistry, import Ouroboros.Consensus.Util.STM (Fingerprint (..), WithFingerprint (..)) import qualified Ouroboros.Network.AnchoredFragment as AF +import Ouroboros.Network.BlockFetch.ConsensusInterface + (ChainSelStarvation (..)) {------------------------------------------------------------------------------- Initialization @@ -176,7 +178,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do copyFuse <- newFuse "copy to immutable db" chainSelFuse <- newFuse "chain selection" chainSelQueue <- newChainSelQueue (Args.cdbsBlocksToAddSize cdbSpecificArgs) - varLastTimeStarved <- newTVarIO =<< getMonotonicTime + varChainSelStarvation <- newTVarIO ChainSelStarvationOngoing let env = CDB { cdbImmutableDB = immutableDB , cdbVolatileDB = volatileDB @@ -201,7 +203,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do , cdbChainSelQueue = chainSelQueue , cdbFutureBlocks = varFutureBlocks , cdbLoE = Args.cdbsLoE cdbSpecificArgs - , cdbLastTimeStarved = varLastTimeStarved + , cdbChainSelStarvation = varChainSelStarvation } h <- fmap CDBHandle $ newTVarIO $ ChainDbOpen env let chainDB = API.ChainDB @@ -219,7 +221,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do , stream = Iterator.stream h , newFollower = Follower.newFollower h , getIsInvalidBlock = getEnvSTM h Query.getIsInvalidBlock - , getLastTimeStarved = getEnvSTM h Query.getLastTimeStarved + , getChainSelStarvation = getEnvSTM h Query.getChainSelStarvation , closeDB = closeDB h , isOpen = isOpen h } diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs index 2edcb1e826..af6081a3f7 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs @@ -522,7 +522,7 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do -- exception (or it errored), notify the blocked thread withFuse fuse $ bracketOnError - (lift $ getChainSelMessage (writeTVar cdbLastTimeStarved) cdbChainSelQueue) + (lift $ getChainSelMessage reportChainSelStarvation cdbChainSelQueue) (\message -> lift $ atomically $ do case message of ChainSelReprocessLoEBlocks -> pure () @@ -541,3 +541,7 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do trace $ PoppedBlockFromQueue $ FallingEdgeWith $ blockRealPoint blockToAdd chainSelSync cdb message) + where + reportChainSelStarvation s = do + traceWith cdbTracer $ TraceChainSelStarvation s + atomically $ writeTVar cdbChainSelStarvation s diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs index 324cddfdd0..96bb57eac3 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs @@ -21,7 +21,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Query ( , getAnyBlockComponent , getAnyKnownBlock , getAnyKnownBlockComponent - , getLastTimeStarved + , getChainSelStarvation ) where import qualified Data.Map.Strict as Map @@ -43,6 +43,8 @@ import Ouroboros.Consensus.Util.STM (WithFingerprint (..)) import Ouroboros.Network.AnchoredFragment (AnchoredFragment) import qualified Ouroboros.Network.AnchoredFragment as AF import Ouroboros.Network.Block (MaxSlotNo, maxSlotNoFromWithOrigin) +import Ouroboros.Network.BlockFetch.ConsensusInterface + (ChainSelStarvation (..)) -- | Return the last @k@ headers. -- @@ -149,8 +151,11 @@ getIsInvalidBlock :: getIsInvalidBlock CDB{..} = fmap (fmap (fmap invalidBlockReason) . flip Map.lookup) <$> readTVar cdbInvalid -getLastTimeStarved :: forall m blk. IOLike m => ChainDbEnv m blk -> STM m Time -getLastTimeStarved CDB{..} = readTVar cdbLastTimeStarved +getChainSelStarvation :: + forall m blk. IOLike m + => ChainDbEnv m blk + -> STM m ChainSelStarvation +getChainSelStarvation CDB {..} = readTVar cdbChainSelStarvation getIsValid :: forall m blk. (IOLike m, HasHeader blk) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs index ec9299fa5c..7d326e66fd 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs @@ -63,7 +63,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types ( , TraceValidationEvent (..) ) where -import Cardano.Prelude (whenM) +import Control.Monad (when) import Control.Tracer import Data.Foldable (traverse_) import Data.Map.Strict (Map) @@ -108,6 +108,8 @@ import Ouroboros.Consensus.Util.ResourceRegistry import Ouroboros.Consensus.Util.STM (WithFingerprint) import Ouroboros.Network.AnchoredFragment (AnchoredFragment) import Ouroboros.Network.Block (MaxSlotNo) +import Ouroboros.Network.BlockFetch.ConsensusInterface + (ChainSelStarvation (..)) -- | All the serialisation related constraints needed by the ChainDB. class ( ImmutableDbSerialiseConstraints blk @@ -276,9 +278,9 @@ data ChainDbEnv m blk = CDB -- switch back to a chain containing it. The fragment is usually anchored at -- a recent immutable tip; if it does not, it will conservatively be treated -- as the empty fragment anchored in the current immutable tip. - , cdbLastTimeStarved :: !(StrictTVar m Time) - -- ^ The last time we starved the ChainSel thread. This is used by the - -- BlockFetch decision logic to demote peers. + , cdbChainSelStarvation :: !(StrictTVar m ChainSelStarvation) + -- ^ Information on the last starvation of ChainSel, whether ongoing or + -- ended recently. } deriving (Generic) -- | We include @blk@ in 'showTypeOf' because it helps resolving type families @@ -513,13 +515,21 @@ addReprocessLoEBlocks tracer (ChainSelQueue queue) = do atomically $ writeTBQueue queue ChainSelReprocessLoEBlocks -- | Get the oldest message from the 'ChainSelQueue' queue. Can block when the --- queue is empty; in that case, reports the current time to the given callback. -getChainSelMessage :: IOLike m => (Time -> STM m ()) -> ChainSelQueue m blk -> m (ChainSelMessage m blk) -getChainSelMessage whenEmpty (ChainSelQueue queue) = do - time <- getMonotonicTime - -- NOTE: The two following lines are in different `atomically` on purpose. - atomically $ whenM (isEmptyTBQueue queue) (whenEmpty time) - atomically $ readTBQueue queue +-- queue is empty; in that case, reports the starvation (and its end) to the +-- callback. +getChainSelMessage + :: IOLike m + => (ChainSelStarvation -> m ()) + -> ChainSelQueue m blk + -> m (ChainSelMessage m blk) +getChainSelMessage report (ChainSelQueue queue) = do + -- NOTE: The test of emptiness and the blocking read are in different STM + -- transactions on purpose. + starved <- atomically $ isEmptyTBQueue queue + when starved $ report ChainSelStarvationOngoing + message <- atomically $ readTBQueue queue + when starved $ report =<< ChainSelStarvationEndedAt <$> getMonotonicTime + return message -- | Flush the 'ChainSelQueue' queue and notify the waiting threads. -- @@ -552,6 +562,7 @@ data TraceEvent blk | TraceLedgerReplayEvent (LgrDB.TraceReplayEvent blk) | TraceImmutableDBEvent (ImmutableDB.TraceEvent blk) | TraceVolatileDBEvent (VolatileDB.TraceEvent blk) + | TraceChainSelStarvation ChainSelStarvation deriving (Generic)