Skip to content

Commit

Permalink
Limit the rate at which GDD is evaluated
Browse files Browse the repository at this point in the history
  • Loading branch information
facundominguez authored and amesgen committed Aug 7, 2024
1 parent 028883a commit b7fa122
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module Ouroboros.Consensus.Node.Genesis (
, mkGenesisConfig
-- * NodeKernel helpers
, GenesisNodeKernelArgs (..)
, LoEAndGDDNodeKernelArgs (..)
, mkGenesisNodeKernelArgs
, setGetLoEFragment
) where
Expand Down Expand Up @@ -54,7 +55,7 @@ data GenesisConfig = GenesisConfig
{ gcBlockFetchConfig :: !GenesisBlockFetchConfiguration
, gcChainSyncLoPBucketConfig :: !ChainSyncLoPBucketConfig
, gcCSJConfig :: !CSJConfig
, gcLoEAndGDDConfig :: !(LoEAndGDDConfig ())
, gcLoEAndGDDConfig :: !(LoEAndGDDConfig LoEAndGDDParams)
} deriving stock (Eq, Generic, Show)

-- | Genesis configuration flags and low-level args, as parsed from config file or CLI
Expand All @@ -66,6 +67,7 @@ data GenesisConfigFlags = GenesisConfigFlags
, gcfBucketCapacity :: Maybe Integer
, gcfBucketRate :: Maybe Integer
, gcfCSJJumpSize :: Maybe Integer
, gcfGDDRateLimit :: Maybe DiffTime
} deriving stock (Eq, Generic, Show)

defaultGenesisConfigFlags :: GenesisConfigFlags
Expand All @@ -77,6 +79,7 @@ defaultGenesisConfigFlags = GenesisConfigFlags
, gcfBucketCapacity = Nothing
, gcfBucketRate = Nothing
, gcfCSJJumpSize = Nothing
, gcfGDDRateLimit = Nothing
}

enableGenesisConfigDefault :: GenesisConfig
Expand Down Expand Up @@ -113,7 +116,7 @@ mkGenesisConfig (Just GenesisConfigFlags{..}) =
}
else CSJDisabled
, gcLoEAndGDDConfig = if gcfEnableLoEAndGDD
then LoEAndGDDEnabled ()
then LoEAndGDDEnabled LoEAndGDDParams{lgpGDDRateLimit}
else LoEAndGDDDisabled
}
where
Expand All @@ -124,21 +127,34 @@ mkGenesisConfig (Just GenesisConfigFlags{..}) =
-- 3 * 2160 * 20 works in more recent ranges of slots, but causes syncing to
-- block in byron.
defaultCSJJumpSize = 2 * 2160
defaultGDDRateLimit = 1.0 -- seconds

gbfcBulkSyncGracePeriod = fromInteger $ fromMaybe defaultBulkSyncGracePeriod gcfBulkSyncGracePeriod
csbcCapacity = fromInteger $ fromMaybe defaultCapacity gcfBucketCapacity
csbcRate = fromInteger $ fromMaybe defaultRate gcfBucketRate
csjcJumpSize = fromInteger $ fromMaybe defaultCSJJumpSize gcfCSJJumpSize
lgpGDDRateLimit = fromMaybe defaultGDDRateLimit gcfGDDRateLimit

newtype LoEAndGDDParams = LoEAndGDDParams
{ -- | How often to evaluate GDD. 0 means as soon as possible.
-- Otherwise, no faster than once every T seconds, where T is the
-- value of the field.
lgpGDDRateLimit :: DiffTime
} deriving stock (Eq, Generic, Show)

-- | Genesis-related arguments needed by the NodeKernel initialization logic.
data GenesisNodeKernelArgs m blk = GenesisNodeKernelArgs {
gnkaLoEAndGDDArgs :: !(LoEAndGDDConfig (LoEAndGDDNodeKernelArgs m blk))
}

data LoEAndGDDNodeKernelArgs m blk = LoEAndGDDNodeKernelArgs {
-- | A TVar containing an action that returns the 'ChainDB.GetLoEFragment'
-- action. We use this extra indirection to update this action after we
-- opened the ChainDB (which happens before we initialize the NodeKernel).
-- After that, this TVar will not be modified again.
gnkaGetLoEFragment :: !(LoEAndGDDConfig (StrictTVar m (ChainDB.GetLoEFragment m blk)))
lgnkaLoEFragmentTVar :: !(StrictTVar m (ChainDB.GetLoEFragment m blk))
, lgnkaGDDRateLimit :: DiffTime
}

-- | Create the initial 'GenesisNodeKernelArgs" (with a temporary
-- 'ChainDB.GetLoEFragment' that will be replaced via 'setGetLoEFragment') and a
-- function to update the 'ChainDbArgs' accordingly.
Expand All @@ -149,20 +165,24 @@ mkGenesisNodeKernelArgs ::
, Complete ChainDbArgs m blk -> Complete ChainDbArgs m blk
)
mkGenesisNodeKernelArgs gcfg = do
gnkaGetLoEFragment <- for (gcLoEAndGDDConfig gcfg) $ \() ->
newTVarIO $ pure $
gnkaLoEAndGDDArgs <- for (gcLoEAndGDDConfig gcfg) $ \p -> do
loeFragmentTVar <- newTVarIO $ pure $
-- Use the most conservative LoE fragment until 'setGetLoEFragment'
-- is called.
ChainDB.LoEEnabled $ AF.Empty AF.AnchorGenesis
let updateChainDbArgs = case gnkaGetLoEFragment of
pure LoEAndGDDNodeKernelArgs
{ lgnkaLoEFragmentTVar = loeFragmentTVar
, lgnkaGDDRateLimit = lgpGDDRateLimit p
}
let updateChainDbArgs = case gnkaLoEAndGDDArgs of
LoEAndGDDDisabled -> id
LoEAndGDDEnabled varGetLoEFragment -> \cfg ->
LoEAndGDDEnabled lgnkArgs -> \cfg ->
cfg { ChainDB.cdbsArgs =
(ChainDB.cdbsArgs cfg) { ChainDB.cdbsLoE = getLoEFragment }
}
where
getLoEFragment = join $ readTVarIO varGetLoEFragment
pure (GenesisNodeKernelArgs {gnkaGetLoEFragment}, updateChainDbArgs)
getLoEFragment = join $ readTVarIO $ lgnkaLoEFragmentTVar lgnkArgs
pure (GenesisNodeKernelArgs{gnkaLoEAndGDDArgs}, updateChainDbArgs)

-- | Set 'gnkaGetLoEFragment' to the actual logic for determining the current
-- LoE fragment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck
(SomeHeaderInFutureCheck)
import Ouroboros.Consensus.Node.Genesis (GenesisNodeKernelArgs (..),
LoEAndGDDConfig (..), setGetLoEFragment)
LoEAndGDDConfig (..), LoEAndGDDNodeKernelArgs (..), setGetLoEFragment)
import Ouroboros.Consensus.Node.GSM (GsmNodeKernelArgs (..))
import qualified Ouroboros.Consensus.Node.GSM as GSM
import Ouroboros.Consensus.Node.Run
Expand Down Expand Up @@ -273,20 +273,21 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
ps_POLICY_PEER_SHARE_STICKY_TIME
ps_POLICY_PEER_SHARE_MAX_PEERS

case gnkaGetLoEFragment genesisArgs of
LoEAndGDDDisabled -> pure ()
LoEAndGDDEnabled varGetLoEFragment -> do
case gnkaLoEAndGDDArgs genesisArgs of
LoEAndGDDDisabled -> pure ()
LoEAndGDDEnabled lgArgs -> do
varLoEFragment <- newTVarIO $ AF.Empty AF.AnchorGenesis
setGetLoEFragment
(readTVar varGsmState)
(readTVar varLoEFragment)
varGetLoEFragment
(lgnkaLoEFragmentTVar lgArgs)

void $ forkLinkedWatcher registry "NodeKernel.GDD" $
gddWatcher
cfg
(gddTracer tracers)
chainDB
(lgnkaGDDRateLimit lgArgs)
(readTVar varGsmState)
-- TODO GDD should only consider (big) ledger peers
(cschcMap varChainSyncHandles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@ runThreadNetwork systemTime ThreadNetworkArgs
, getUseBootstrapPeers = pure DontUseBootstrapPeers
, publicPeerSelectionStateVar
, genesisArgs = GenesisNodeKernelArgs {
gnkaGetLoEFragment = LoEAndGDDDisabled
gnkaLoEAndGDDArgs = LoEAndGDDDisabled
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ startNode schedulerConfig genesisTest interval = do
lrConfig
(mkGDDTracerTestBlock lrTracer)
lnChainDb
1.0 -- Default config value in NodeKernel.hs at the time or writing
(pure GSM.Syncing) -- TODO actually run GSM
(cschcMap handles)
var
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ gddWatcher ::
=> TopLevelConfig blk
-> Tracer m (TraceGDDEvent peer blk)
-> ChainDB m blk
-> DiffTime -- ^ How often to evaluate GDD. 0 means as soon as possible.
-- Otherwise, no faster than once every T seconds, where T is
-- the provided value.
-> STM m GsmState
-> STM m (Map peer (ChainSyncClientHandle m blk))
-- ^ The ChainSync handles. We trigger the GDD whenever our 'GsmState'
Expand All @@ -95,7 +98,7 @@ gddWatcher ::
-> Watcher m
(GsmState, GDDStateView m blk peer)
(Map peer (StrictMaybe (WithOrigin SlotNo), Bool))
gddWatcher cfg tracer chainDb getGsmState getHandles varLoEFrag =
gddWatcher cfg tracer chainDb rateLimit getGsmState getHandles varLoEFrag =
Watcher {
wInitial = Nothing
, wReader = (,) <$> getGsmState <*> getGDDStateView
Expand Down Expand Up @@ -137,12 +140,17 @@ gddWatcher cfg tracer chainDb getGsmState getHandles varLoEFrag =

wNotify :: (GsmState, GDDStateView m blk peer) -> m ()
wNotify (_gsmState, stateView) = do
t0 <- getMonotonicTime
loeFrag <- evaluateGDD cfg tracer stateView
oldLoEFrag <- atomically $ swapTVar varLoEFrag loeFrag
-- The chain selection only depends on the LoE tip, so there
-- is no point in retriggering it if the LoE tip hasn't changed.
when (AF.headHash oldLoEFrag /= AF.headHash loeFrag) $
ChainDB.triggerChainSelectionAsync chainDb
tf <- getMonotonicTime
-- We limit the rate at which GDD is evaluated, otherwise it would
-- be called every time a new header is validated.
threadDelay $ rateLimit - diffTime tf t0

-- | Pure snapshot of the dynamic data the GDD operates on.
data GDDStateView m blk peer = GDDStateView {
Expand Down

0 comments on commit b7fa122

Please sign in to comment.