Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consensus): fallback json rpc syncing for consensus #3211

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 45 additions & 47 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use crate::{
storage::{self, ConnectionPool},
};

/// If less than TEMPORARY_FETCHER_THRESHOLD certificates are missing,
/// the temporary fetcher will stop fetching blocks.
pub(crate) const TEMPORARY_FETCHER_THRESHOLD: u64 = 10;
/// Whenever more than FALLBACK_FETCHER_THRESHOLD certificates are missing,
/// the fallback fetcher is active.
pub(crate) const FALLBACK_FETCHER_THRESHOLD: u64 = 10;

/// External node.
pub(super) struct EN {
Expand Down Expand Up @@ -115,11 +115,9 @@ impl EN {
let store = store.clone();
async {
let store = store;
self.temporary_block_fetcher(ctx, &store).await?;
tracing::info!(
"temporary block fetcher finished, switching to p2p fetching only"
);
Ok(())
self.fallback_block_fetcher(ctx, &store)
.await
.wrap("fallback_block_fetcher()")
}
});

Expand Down Expand Up @@ -191,7 +189,7 @@ impl EN {
.new_payload_queue(ctx, actions, self.sync_state.clone())
.await
.wrap("new_fetcher_cursor()")?;
self.fetch_blocks(ctx, &mut payload_queue, None).await
self.fetch_blocks(ctx, &mut payload_queue).await
})
.await;
match res {
Expand Down Expand Up @@ -362,9 +360,13 @@ impl EN {
}

/// Fetches (with retries) the given block from the main node.
async fn fetch_block(&self, ctx: &ctx::Ctx, n: L2BlockNumber) -> ctx::Result<FetchedBlock> {
async fn fetch_block(
&self,
ctx: &ctx::Ctx,
n: validator::BlockNumber,
) -> ctx::Result<FetchedBlock> {
const RETRY_INTERVAL: time::Duration = time::Duration::seconds(5);

let n = L2BlockNumber(n.0.try_into().context("overflow")?);
loop {
match ctx.wait(self.client.sync_l2_block(n, true)).await? {
Ok(Some(block)) => return Ok(block.try_into()?),
Expand All @@ -376,76 +378,72 @@ impl EN {
}
}

/// Fetches blocks from the main node directly, until the certificates
/// are backfilled. This allows for smooth transition from json RPC to p2p block syncing.
pub(crate) async fn temporary_block_fetcher(
/// Fetches blocks from the main node directly whenever the EN is lagging behind too much.
pub(crate) async fn fallback_block_fetcher(
&self,
ctx: &ctx::Ctx,
store: &Store,
) -> ctx::Result<()> {
const MAX_CONCURRENT_REQUESTS: usize = 30;
scope::run!(ctx, |ctx, s| async {
let (send, mut recv) = ctx::channel::bounded(MAX_CONCURRENT_REQUESTS);
s.spawn(async {
let Some(mut next) = store.next_block(ctx).await? else {
return Ok(());
};
while store.persisted().borrow().next().0 + TEMPORARY_FETCHER_THRESHOLD < next.0 {
let n = L2BlockNumber(next.0.try_into().context("overflow")?);
self.sync_state.wait_for_main_node_block(ctx, n).await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?;
// TODO: metrics.
s.spawn::<()>(async {
let send = send;
let is_lagging =
|main| main >= store.persisted().borrow().next() + FALLBACK_FETCHER_THRESHOLD;
let mut next = store.next_block(ctx).await.wrap("next_block()")?;
loop {
// Wait until p2p syncing is lagging.
self.sync_state
.wait_for_main_node_block(ctx, is_lagging)
.await?;
// Determine the next block to fetch and wait for it to be available.
next = next.max(store.next_block(ctx).await.wrap("next_block()")?);
self.sync_state
.wait_for_main_node_block(ctx, |main| main >= next)
.await?;
// Fetch the block asynchronously.
send.send(ctx, s.spawn(self.fetch_block(ctx, next))).await?;
next = next.next();
}
drop(send);
Ok(())
});
while let Ok(block) = recv.recv_or_disconnected(ctx).await? {
loop {
let block = recv.recv(ctx).await?;
store
.queue_next_fetched_block(ctx, block.join(ctx).await?)
.await
.wrap("queue_next_fetched_block()")?;
}
Ok(())
})
.await
}

/// Fetches blocks from the main node in range `[cursor.next()..end)`.
/// Fetches blocks starting with `queue.next()`.
async fn fetch_blocks(
&self,
ctx: &ctx::Ctx,
queue: &mut storage::PayloadQueue,
end: Option<validator::BlockNumber>,
) -> ctx::Result<()> {
const MAX_CONCURRENT_REQUESTS: usize = 30;
let first = queue.next();
let mut next = first;
let mut next = queue.next();
scope::run!(ctx, |ctx, s| async {
let (send, mut recv) = ctx::channel::bounded(MAX_CONCURRENT_REQUESTS);
s.spawn(async {
s.spawn::<()>(async {
let send = send;
while end.map_or(true, |end| next < end) {
let n = L2BlockNumber(next.0.try_into().context("overflow")?);
self.sync_state.wait_for_main_node_block(ctx, n).await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?;
loop {
self.sync_state
.wait_for_main_node_block(ctx, |main| main >= next)
.await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, next))).await?;
next = next.next();
}
Ok(())
});
while end.map_or(true, |end| queue.next() < end) {
loop {
let block = recv.recv(ctx).await?.join(ctx).await?;
queue.send(block).await.context("queue.send()")?;
}
Ok(())
})
.await?;
// If fetched anything, wait for the last block to be stored persistently.
if first < queue.next() {
self.pool
.wait_for_payload(ctx, queue.next().prev().unwrap())
.await
.wrap("wait_for_payload()")?;
}
Ok(())
.await
}
}
8 changes: 3 additions & 5 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,12 @@ impl Store {
}

/// Number of the next block to queue.
pub(crate) async fn next_block(
&self,
ctx: &ctx::Ctx,
) -> ctx::OrCanceled<Option<validator::BlockNumber>> {
pub(crate) async fn next_block(&self, ctx: &ctx::Ctx) -> ctx::Result<validator::BlockNumber> {
Ok(sync::lock(ctx, &self.block_payloads)
.await?
.as_ref()
.map(|p| p.next()))
.context("payload_queue not set")?
.next())
}

/// Queues the next block.
Expand Down
39 changes: 1 addition & 38 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ use zksync_types::{
};
use zksync_web3_decl::client::{Client, DynClient, L2};

use crate::{
en,
storage::{ConnectionPool, Store},
};
use crate::{en, storage::ConnectionPool};

/// Fake StateKeeper for tests.
#[derive(Debug)]
Expand Down Expand Up @@ -413,40 +410,6 @@ impl StateKeeper {
.await
}

pub async fn run_temporary_fetcher(
self,
ctx: &ctx::Ctx,
client: Box<DynClient<L2>>,
) -> ctx::Result<()> {
scope::run!(ctx, |ctx, s| async {
let payload_queue = self
.pool
.connection(ctx)
.await
.wrap("connection()")?
.new_payload_queue(ctx, self.actions_sender, self.sync_state.clone())
.await
.wrap("new_payload_queue()")?;
let (store, runner) = Store::new(
ctx,
self.pool.clone(),
Some(payload_queue),
Some(client.clone()),
)
.await
.wrap("Store::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });
en::EN {
pool: self.pool.clone(),
client,
sync_state: self.sync_state.clone(),
}
.temporary_block_fetcher(ctx, &store)
.await
})
.await
}

/// Runs consensus node for the external node.
pub async fn run_consensus(
self,
Expand Down
65 changes: 4 additions & 61 deletions core/node/consensus/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use zksync_types::ProtocolVersionId;
use zksync_web3_decl::namespaces::EnNamespaceClient as _;

use crate::{
en::TEMPORARY_FETCHER_THRESHOLD,
en::FALLBACK_FETCHER_THRESHOLD,
mn::run_main_node,
storage::{ConnectionPool, Store},
testonly,
Expand Down Expand Up @@ -665,7 +665,7 @@ async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool, version: ProtocolV
// Test temporary fetcher fetching blocks if a lot of certs are missing.
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId) {
async fn test_fallback_fetcher(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
Expand Down Expand Up @@ -705,7 +705,7 @@ async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId)
s.spawn_bg(runner.run(ctx));
s.spawn_bg(node.run_fetcher(ctx, client.clone()));
validator
.push_random_blocks(rng, account, TEMPORARY_FETCHER_THRESHOLD as usize + 1)
.push_random_blocks(rng, account, FALLBACK_FETCHER_THRESHOLD as usize + 1)
.await;
node_pool
.wait_for_payload(ctx, validator.last_block())
Expand All @@ -715,58 +715,7 @@ async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId)
.await
.unwrap();

tracing::info!(
"Run p2p fetcher. Blocks should be fetched by the temporary fetcher anyway."
);
scope::run!(ctx, |ctx, s| async {
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(node.run_consensus(ctx, client.clone(), node_cfg.clone()));
validator.push_random_blocks(rng, account, 5).await;
node_pool
.wait_for_payload(ctx, validator.last_block())
.await?;
Ok(())
})
.await
.unwrap();
Ok(())
})
.await
.unwrap();
}

// Test that temporary fetcher terminates once enough blocks have certs.
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_temporary_fetcher_termination(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

scope::run!(ctx, |ctx, s| async {
tracing::info!("Spawn validator.");
let validator_pool = ConnectionPool::test(from_snapshot, version).await;
let (mut validator, runner) =
testonly::StateKeeper::new(ctx, validator_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(run_main_node(
ctx,
validator_cfg.config.clone(),
validator_cfg.secrets.clone(),
validator_pool.clone(),
));
// API server needs at least 1 L1 batch to start.
validator.seal_batch().await;
let client = validator.connect(ctx).await?;

let node_pool = ConnectionPool::test(from_snapshot, version).await;

// Run the EN so the consensus is initialized on EN and wait for it to sync.
tracing::info!("Run p2p fetcher. Blocks should be fetched by the fallback fetcher anyway.");
scope::run!(ctx, |ctx, s| async {
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
Expand All @@ -779,12 +728,6 @@ async fn test_temporary_fetcher_termination(from_snapshot: bool, version: Protoc
})
.await
.unwrap();

// Run the temporary fetcher. It should terminate immediately, since EN is synced.
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
node.run_temporary_fetcher(ctx, client).await?;

Ok(())
})
.await
Expand Down
1 change: 1 addition & 0 deletions core/node/node_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ zksync_health_check.workspace = true
zksync_utils.workspace = true
zksync_eth_client.workspace = true
zksync_concurrency.workspace = true
zksync_consensus_roles.workspace = true
vise.workspace = true
zksync_vm_executor.workspace = true

Expand Down
21 changes: 12 additions & 9 deletions core/node/node_sync/src/sync_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use async_trait::async_trait;
use serde::Serialize;
use tokio::sync::watch;
use zksync_concurrency::{ctx, sync};
use zksync_consensus_roles::validator;
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_health_check::{CheckHealth, Health, HealthStatus};
use zksync_shared_metrics::EN_METRICS;
Expand Down Expand Up @@ -50,18 +51,20 @@ impl SyncState {
.unwrap();
}

/// Waits until the main node block is greater or equal to the given block number.
/// Returns the current main node block number.
pub async fn wait_for_main_node_block(
&self,
ctx: &ctx::Ctx,
want: L2BlockNumber,
) -> ctx::OrCanceled<()> {
sync::wait_for(
ctx,
&mut self.0.subscribe(),
|inner| matches!(inner.main_node_block, Some(got) if got >= want),
)
.await?;
Ok(())
pred: impl Fn(validator::BlockNumber) -> bool,
) -> ctx::OrCanceled<validator::BlockNumber> {
sync::wait_for_some(ctx, &mut self.0.subscribe(), |inner| {
inner
.main_node_block
.map(|n| validator::BlockNumber(n.0.into()))
.filter(|n| pred(*n))
})
.await
}

pub fn set_main_node_block(&self, block: L2BlockNumber) {
Expand Down
Loading