Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): periodically fetch bridge addresses #2949

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ prover/data/keys/setup_*

# Zk Toolbox
chains/era/configs/*
chains/gateway/*
configs/*
era-observability/
core/tests/ts-integration/deployments-zk
Expand Down
8 changes: 8 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ pub(crate) struct OptionalENConfig {
/// Gateway RPC URL, needed for operating during migration.
#[allow(dead_code)]
pub gateway_url: Option<SensitiveUrl>,
/// Interval for bridge addresses refreshing in seconds.
bridge_addresses_refresh_interval_sec: Option<NonZeroU64>,
}

impl OptionalENConfig {
Expand Down Expand Up @@ -675,6 +677,7 @@ impl OptionalENConfig {
api_namespaces,
contracts_diamond_proxy_addr: None,
gateway_url: enconfig.gateway_url.clone(),
bridge_addresses_refresh_interval_sec: enconfig.bridge_addresses_refresh_interval_sec,
})
}

Expand Down Expand Up @@ -901,6 +904,11 @@ impl OptionalENConfig {
Duration::from_secs(self.pruning_data_retention_sec)
}

pub fn bridge_addresses_refresh_interval(&self) -> Option<Duration> {
self.bridge_addresses_refresh_interval_sec
.map(|n| Duration::from_secs(n.get()))
}

#[cfg(test)]
fn mock() -> Self {
// Set all values to their defaults
Expand Down
4 changes: 4 additions & 0 deletions core/bin/external_node/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,10 @@ impl ExternalNodeBuilder {
response_body_size_limit: Some(self.config.optional.max_response_body_size()),
with_extended_tracing: self.config.optional.extended_rpc_tracing,
pruning_info_refresh_interval: Some(pruning_info_refresh_interval),
bridge_addresses_refresh_interval: self
.config
.optional
.bridge_addresses_refresh_interval(),
polling_interval: Some(self.config.optional.polling_interval()),
websocket_requests_per_minute_limit: None, // To be set by WS server layer method if required.
replication_lag_limit: None, // TODO: Support replication lag limit
Expand Down
3 changes: 2 additions & 1 deletion core/lib/config/src/configs/en_config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::num::NonZeroUsize;
use std::num::{NonZeroU64, NonZeroUsize};

use serde::Deserialize;
use zksync_basic_types::{
Expand All @@ -19,4 +19,5 @@ pub struct ENConfig {
pub main_node_rate_limit_rps: Option<NonZeroUsize>,

pub gateway_url: Option<SensitiveUrl>,
pub bridge_addresses_refresh_interval_sec: Option<NonZeroU64>,
}
perekopskiy marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,7 @@ impl Distribution<configs::en_config::ENConfig> for EncodeDist {
main_node_rate_limit_rps: self.sample_opt(|| rng.gen()),
gateway_url: self
.sample_opt(|| format!("localhost:{}", rng.gen::<u16>()).parse().unwrap()),
bridge_addresses_refresh_interval_sec: self.sample_opt(|| rng.gen()),
}
}
}
Expand Down
11 changes: 10 additions & 1 deletion core/lib/protobuf_config/src/en.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{num::NonZeroUsize, str::FromStr};
use std::{
num::{NonZeroU64, NonZeroUsize},
str::FromStr,
};

use anyhow::Context;
use zksync_basic_types::{url::SensitiveUrl, L1ChainId, L2ChainId};
Expand Down Expand Up @@ -36,6 +39,9 @@ impl ProtoRepr for proto::ExternalNode {
.as_ref()
.map(|a| a.parse().context("gateway_url"))
.transpose()?,
bridge_addresses_refresh_interval_sec: self
.bridge_addresses_refresh_interval_sec
.and_then(NonZeroU64::new),
})
}

Expand All @@ -55,6 +61,9 @@ impl ProtoRepr for proto::ExternalNode {
.gateway_url
.as_ref()
.map(|a| a.expose_str().to_string()),
bridge_addresses_refresh_interval_sec: this
.bridge_addresses_refresh_interval_sec
.map(|a| a.get()),
}
}
}
1 change: 1 addition & 0 deletions core/lib/protobuf_config/src/proto/config/en.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ message ExternalNode {
optional uint64 main_node_rate_limit_rps = 6; // optional
optional config.genesis.L1BatchCommitDataGeneratorMode l1_batch_commit_data_generator_mode = 7; // optional, default to rollup
optional string gateway_url = 8; // optional
optional uint64 bridge_addresses_refresh_interval_sec = 9; // optional
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl ZksNamespaceServer for ZksNamespace {
}

async fn get_bridge_contracts(&self) -> RpcResult<BridgeAddresses> {
Ok(self.get_bridge_contracts_impl())
Ok(self.get_bridge_contracts_impl().await)
}

async fn l1_chain_id(&self) -> RpcResult<U64> {
Expand Down
80 changes: 38 additions & 42 deletions core/node/api_server/src/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use self::{
use crate::{
execution_sandbox::{BlockStartInfo, VmConcurrencyBarrier},
tx_sender::TxSender,
web3::state::BridgeAddressesHandle,
};

pub mod backend_jsonrpsee;
Expand Down Expand Up @@ -143,7 +144,6 @@ struct OptionalApiParams {
#[derive(Debug)]
pub struct ApiServer {
pool: ConnectionPool<Core>,
updaters_pool: ConnectionPool<Core>,
health_updater: Arc<HealthUpdater>,
config: InternalApiConfig,
transport: ApiTransport,
Expand All @@ -153,18 +153,21 @@ pub struct ApiServer {
namespaces: Vec<Namespace>,
method_tracer: Arc<MethodTracer>,
optional: OptionalApiParams,
bridge_addresses_handle: BridgeAddressesHandle,
sealed_l2_block_handle: SealedL2BlockNumber,
}

#[derive(Debug)]
pub struct ApiBuilder {
pool: ConnectionPool<Core>,
updaters_pool: ConnectionPool<Core>,
config: InternalApiConfig,
polling_interval: Duration,
pruning_info_refresh_interval: Duration,
// Mandatory params that must be set using builder methods.
transport: Option<ApiTransport>,
tx_sender: Option<TxSender>,
bridge_addresses_handle: Option<BridgeAddressesHandle>,
sealed_l2_block_handle: Option<SealedL2BlockNumber>,
// Optional params that may or may not be set using builder methods. We treat `namespaces`
// specially because we want to output a warning if they are not set.
namespaces: Option<Vec<Namespace>>,
Expand All @@ -178,13 +181,14 @@ impl ApiBuilder {

pub fn jsonrpsee_backend(config: InternalApiConfig, pool: ConnectionPool<Core>) -> Self {
Self {
updaters_pool: pool.clone(),
pool,
config,
polling_interval: Self::DEFAULT_POLLING_INTERVAL,
pruning_info_refresh_interval: Self::DEFAULT_PRUNING_INFO_REFRESH_INTERVAL,
transport: None,
tx_sender: None,
bridge_addresses_handle: None,
sealed_l2_block_handle: None,
namespaces: None,
method_tracer: Arc::new(MethodTracer::default()),
optional: OptionalApiParams::default(),
Expand All @@ -201,15 +205,6 @@ impl ApiBuilder {
self
}

/// Configures a dedicated DB pool to be used for updating different information,
/// such as last mined block number or account nonces. This pool is used to execute
/// in a background task. If not called, the main pool will be used. If the API server is under high load,
/// it may make sense to supply a single-connection pool to reduce pool contention with the API methods.
pub fn with_updaters_pool(mut self, pool: ConnectionPool<Core>) -> Self {
self.updaters_pool = pool;
self
}

pub fn with_tx_sender(mut self, tx_sender: TxSender) -> Self {
self.tx_sender = Some(tx_sender);
self
Expand Down Expand Up @@ -285,6 +280,22 @@ impl ApiBuilder {
self
}

pub fn with_sealed_l2_block_handle(
mut self,
sealed_l2_block_handle: SealedL2BlockNumber,
) -> Self {
self.sealed_l2_block_handle = Some(sealed_l2_block_handle);
self
}

pub fn with_bridge_addresses_handle(
mut self,
bridge_addresses_handle: BridgeAddressesHandle,
) -> Self {
self.bridge_addresses_handle = Some(bridge_addresses_handle);
self
}

// Intended for tests only.
#[doc(hidden)]
fn with_pub_sub_events(mut self, sender: mpsc::UnboundedSender<PubSubEvent>) -> Self {
Expand Down Expand Up @@ -312,7 +323,6 @@ impl ApiBuilder {
Ok(ApiServer {
pool: self.pool,
health_updater: Arc::new(health_updater),
updaters_pool: self.updaters_pool,
config: self.config,
transport,
tx_sender: self.tx_sender.context("Transaction sender not set")?,
Expand All @@ -326,6 +336,12 @@ impl ApiBuilder {
}),
method_tracer: self.method_tracer,
optional: self.optional,
sealed_l2_block_handle: self
.sealed_l2_block_handle
.context("Sealed l2 block handle not set")?,
bridge_addresses_handle: self
.bridge_addresses_handle
.context("Bridge addresses handle not set")?,
})
}
}
Expand All @@ -335,11 +351,8 @@ impl ApiServer {
self.health_updater.subscribe()
}

async fn build_rpc_state(
self,
last_sealed_l2_block: SealedL2BlockNumber,
) -> anyhow::Result<RpcState> {
let mut storage = self.updaters_pool.connection_tagged("api").await?;
async fn build_rpc_state(self) -> anyhow::Result<RpcState> {
let mut storage = self.pool.connection_tagged("api").await?;
let start_info =
BlockStartInfo::new(&mut storage, self.pruning_info_refresh_interval).await?;
drop(storage);
Expand All @@ -363,19 +376,19 @@ impl ApiServer {
api_config: self.config,
start_info,
mempool_cache: self.optional.mempool_cache,
last_sealed_l2_block,
last_sealed_l2_block: self.sealed_l2_block_handle,
bridge_addresses_handle: self.bridge_addresses_handle,
tree_api: self.optional.tree_api,
})
}

async fn build_rpc_module(
self,
pub_sub: Option<EthSubscribe>,
last_sealed_l2_block: SealedL2BlockNumber,
) -> anyhow::Result<RpcModule<()>> {
let namespaces = self.namespaces.clone();
let zksync_network_id = self.config.l2_chain_id;
let rpc_state = self.build_rpc_state(last_sealed_l2_block).await?;
let rpc_state = self.build_rpc_state().await?;

// Collect all the methods into a single RPC module.
let mut rpc = RpcModule::new(());
Expand Down Expand Up @@ -473,21 +486,9 @@ impl ApiServer {
self,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<ApiServerHandles> {
// Chosen to be significantly smaller than the interval between L2 blocks, but larger than
// the latency of getting the latest sealed L2 block number from Postgres. If the API server
// processes enough requests, information about the latest sealed L2 block will be updated
// by reporting block difference metrics, so the actual update lag would be much smaller than this value.
const SEALED_L2_BLOCK_UPDATE_INTERVAL: Duration = Duration::from_millis(25);

let transport = self.transport;
let mut tasks = vec![];

let (last_sealed_l2_block, sealed_l2_block_update_task) = SealedL2BlockNumber::new(
self.updaters_pool.clone(),
SEALED_L2_BLOCK_UPDATE_INTERVAL,
stop_receiver.clone(),
);

let mut tasks = vec![tokio::spawn(sealed_l2_block_update_task)];
let pub_sub = if matches!(transport, ApiTransport::WebSocket(_))
&& self.namespaces.contains(&Namespace::Pubsub)
{
Expand All @@ -510,12 +511,8 @@ impl ApiServer {
// framework it'll no longer be needed.
let health_check = self.health_updater.subscribe();
let (local_addr_sender, local_addr) = oneshot::channel();
let server_task = tokio::spawn(self.run_jsonrpsee_server(
stop_receiver,
pub_sub,
last_sealed_l2_block,
local_addr_sender,
));
let server_task =
tokio::spawn(self.run_jsonrpsee_server(stop_receiver, pub_sub, local_addr_sender));

tasks.push(server_task);
Ok(ApiServerHandles {
Expand Down Expand Up @@ -584,7 +581,6 @@ impl ApiServer {
self,
mut stop_receiver: watch::Receiver<bool>,
pub_sub: Option<EthSubscribe>,
last_sealed_l2_block: SealedL2BlockNumber,
local_addr_sender: oneshot::Sender<SocketAddr>,
) -> anyhow::Result<()> {
let transport = self.transport;
Expand Down Expand Up @@ -640,7 +636,7 @@ impl ApiServer {
tracing::info!("Enabled extended call tracing for {transport_str} API server; this might negatively affect performance");
}

let rpc = self.build_rpc_module(pub_sub, last_sealed_l2_block).await?;
let rpc = self.build_rpc_module(pub_sub).await?;
let registered_method_names = Arc::new(rpc.method_names().collect::<HashSet<_>>());
tracing::debug!(
"Built RPC module for {transport_str} server with {} methods: {registered_method_names:?}",
Expand Down
4 changes: 2 additions & 2 deletions core/node/api_server/src/web3/namespaces/zks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ impl ZksNamespace {
self.state.api_config.l2_testnet_paymaster_addr
}

pub fn get_bridge_contracts_impl(&self) -> BridgeAddresses {
self.state.api_config.bridge_addresses.clone()
pub async fn get_bridge_contracts_impl(&self) -> BridgeAddresses {
self.state.bridge_addresses_handle.read().await
}

pub fn l1_chain_id_impl(&self) -> U64 {
Expand Down
Loading
Loading