diff --git a/client/src/main.rs b/client/src/main.rs index 3e4388f7e..11f3a96e3 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -2,9 +2,9 @@ use crate::cli::CliOpts; use avail_light_core::{ - api, + api::{self, v2::types::ApiData}, data::{ - self, ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey, + self, ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey, RpcNodeKey, SignerNonceKey, DB, }, light_client::{self, OutputEvent as LcEvent}, @@ -12,7 +12,8 @@ use avail_light_core::{ network::{ self, p2p::{self, extract_block_num, OutputEvent as P2pEvent, BOOTSTRAP_LIST_EMPTY_MESSAGE}, - rpc, Network, + rpc::{self, OutputEvent as RpcEvent}, + Network, }, shutdown::Controller, sync_client::SyncClient, @@ -24,11 +25,7 @@ use avail_light_core::{ }, utils::{default_subscriber, install_panic_hooks, json_subscriber, spawn_in_span}, }; -use avail_rust::{ - avail_core::AppId, - kate_recovery::{com::AppData, couscous}, - sp_core::blake2_128, -}; +use avail_rust::{avail_core::AppId, kate_recovery::couscous, sp_core::blake2_128}; use clap::Parser; use color_eyre::{ eyre::{eyre, WrapErr}, @@ -59,6 +56,9 @@ use tikv_jemallocator::Jemalloc; #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; +mod cli; +mod config; + /// Light Client for Avail Blockchain async fn run( @@ -132,8 +132,15 @@ async fn run( let public_params_len = hex::encode(raw_pp).len(); trace!("Public params ({public_params_len}): hash: {public_params_hash}"); - let (rpc_client, rpc_events, rpc_subscriptions) = - rpc::init(db.clone(), &cfg.genesis_hash, &cfg.rpc, shutdown.clone()).await?; + let (rpc_event_sender, rpc_event_receiver) = broadcast::channel(1000); + let (rpc_client, rpc_subscriptions) = rpc::init( + db.clone(), + &cfg.genesis_hash, + &cfg.rpc, + shutdown.clone(), + rpc_event_sender.clone(), + ) + .await?; let account_id = identity_cfg.avail_key_pair.public_key().to_account_id(); let client = rpc_client.current_client().await; @@ -141,9 +148,9 @@ async fn run( db.put(SignerNonceKey, nonce); // Subscribing to RPC events before first event is published - let publish_rpc_event_receiver = rpc_events.subscribe(); - let first_header_rpc_event_receiver = rpc_events.subscribe(); - let client_rpc_event_receiver = rpc_events.subscribe(); + let publish_rpc_event_receiver = rpc_event_sender.subscribe(); + let first_header_rpc_event_receiver = rpc_event_sender.subscribe(); + let client_rpc_event_receiver = rpc_event_sender.subscribe(); // spawn the RPC Network task for Event Loop to run in the background // and shut it down, without delays @@ -199,7 +206,7 @@ async fn run( let (block_tx, block_rx) = broadcast::channel::(1 << 7); let data_rx = cfg.app_id.map(AppId).map(|app_id| { - let (data_tx, data_rx) = broadcast::channel::<(u32, AppData)>(1 << 7); + let (data_tx, data_rx) = broadcast::channel::(1 << 7); spawn_in_span(shutdown.with_cancel(avail_light_core::app_client::run( (&cfg).into(), db.clone(), @@ -315,25 +322,33 @@ async fn run( telemetry::otlp::initialize(cfg.project_name.clone(), &cfg.origin, cfg.otel.clone()) .wrap_err("Unable to initialize OpenTelemetry service")?; + let rpc_host = db + .get(RpcNodeKey) + .map(|node| node.host) + .ok_or_else(|| eyre!("No connected host found"))?; + let mut state = ClientState::new( metrics, cfg.libp2p.kademlia.operation_mode.into(), + rpc_host, Multiaddr::empty(), metric_attributes, ); spawn_in_span(shutdown.with_cancel(async move { state - .handle_events(p2p_event_receiver, maintenance_receiver, lc_receiver) + .handle_events( + p2p_event_receiver, + maintenance_receiver, + lc_receiver, + rpc_event_receiver, + ) .await; })); Ok(()) } -mod cli; -mod config; - pub fn load_runtime_config(opts: &CliOpts) -> Result { let mut cfg = if let Some(config_path) = &opts.config { fs::metadata(config_path).map_err(|_| eyre!("Provided config file doesn't exist."))?; @@ -441,6 +456,7 @@ struct ClientState { metrics: Metrics, kad_mode: Mode, multiaddress: Multiaddr, + rpc_host: String, metric_attributes: Vec<(String, String)>, active_blocks: HashMap, } @@ -449,6 +465,7 @@ impl ClientState { fn new( metrics: Metrics, kad_mode: Mode, + rpc_host: String, multiaddress: Multiaddr, metric_attributes: Vec<(String, String)>, ) -> Self { @@ -456,6 +473,7 @@ impl ClientState { metrics, kad_mode, multiaddress, + rpc_host, metric_attributes, active_blocks: Default::default(), } @@ -469,10 +487,15 @@ impl ClientState { self.kad_mode = value; } + fn update_rpc_host(&mut self, value: String) { + self.rpc_host = value; + } + fn attributes(&self) -> Vec<(String, String)> { let mut attrs = vec![ ("operating_mode".to_string(), self.kad_mode.to_string()), ("multiaddress".to_string(), self.multiaddress.to_string()), + ("rpc_host".to_string(), self.rpc_host.to_string()), ]; attrs.extend(self.metric_attributes.clone()); @@ -559,6 +582,7 @@ impl ClientState { mut p2p_receiver: UnboundedReceiver, mut maintenance_receiver: UnboundedReceiver, mut lc_receiver: UnboundedReceiver, + mut rpc_receiver: broadcast::Receiver, ) { self.metrics.count(MetricCounter::Starts, self.attributes()); loop { @@ -667,10 +691,16 @@ impl ClientState { }, LcEvent::RecordRPCFetchDuration(duration) => { self.metrics.record(MetricValue::RPCFetchDuration(duration)); - } + }, LcEvent::RecordBlockConfidence(confidence) => { self.metrics.record(MetricValue::BlockConfidence(confidence)); - } + }, + } + } + + Ok(rpc_event) = rpc_receiver.recv() => { + if let RpcEvent::ConnectedHost(host) = rpc_event { + self.update_rpc_host(host); } } // break the loop if all channels are closed diff --git a/compatibility-tests/src/main.rs b/compatibility-tests/src/main.rs index 085fc8468..8fcd1ed48 100644 --- a/compatibility-tests/src/main.rs +++ b/compatibility-tests/src/main.rs @@ -10,6 +10,7 @@ use avail_rust::kate_recovery::matrix::Position; use clap::Parser; use color_eyre::Result; use std::time::Duration; +use tokio::sync::broadcast; #[derive(Parser)] struct CommandArgs { @@ -40,7 +41,8 @@ async fn main() -> Result<()> { }; let shutdown = Controller::new(); - let (rpc_client, _, subscriptions) = rpc::init(db, "DEV", &rpc_cfg, shutdown).await?; + let (rpc_sender, _) = broadcast::channel(1000); + let (rpc_client, subscriptions) = rpc::init(db, "DEV", &rpc_cfg, shutdown, rpc_sender).await?; tokio::spawn(subscriptions.run()); let mut correct: bool = true; diff --git a/core/src/api/v2/mod.rs b/core/src/api/v2/mod.rs index 9e2715b05..3a1787175 100644 --- a/core/src/api/v2/mod.rs +++ b/core/src/api/v2/mod.rs @@ -170,12 +170,12 @@ fn ws_route( .and_then(handlers::ws) } -pub async fn publish>( +pub async fn publish>>( topic: Topic, mut receiver: broadcast::Receiver, clients: WsClients, ) where - >::Error: Display, + >>::Error: Display, { loop { let message = match receiver.recv().await { @@ -185,25 +185,27 @@ pub async fn publish>( return; }, }; - - let message: PublishMessage = match message.try_into() { - Ok(message) => message, + let message: Option = match message.try_into() { + Ok(Some(message)) => Some(message), + Ok(None) => continue, // Silently skip Err(error) => { error!(?topic, "Cannot create message: {error}"); continue; }, }; - match clients.publish(&topic, message).await { - Ok(results) => { - let published = results.iter().filter(|&result| result.is_ok()).count(); - let failed = results.iter().filter(|&result| result.is_err()).count(); - info!(?topic, published, failed, "Message published to clients"); - for error in results.into_iter().filter_map(Result::err) { - debug!(?topic, "Cannot publish message to client: {error}") - } - }, - Err(error) => error!(?topic, "Cannot publish message: {error}"), + if let Some(message) = message { + match clients.publish(&topic, message).await { + Ok(results) => { + let published = results.iter().filter(|&result| result.is_ok()).count(); + let failed = results.iter().filter(|&result| result.is_err()).count(); + info!(?topic, published, failed, "Message published to clients"); + for error in results.into_iter().filter_map(Result::err) { + debug!(?topic, "Cannot publish message to client: {error}") + } + }, + Err(error) => error!(?topic, "Cannot publish message: {error}"), + } } } } diff --git a/core/src/api/v2/types.rs b/core/src/api/v2/types.rs index 92299d9b3..a2614c2bf 100644 --- a/core/src/api/v2/types.rs +++ b/core/src/api/v2/types.rs @@ -36,7 +36,7 @@ use crate::{ LatestSyncKey, RpcNodeKey, VerifiedDataKey, VerifiedHeaderKey, VerifiedSyncDataKey, VerifiedSyncHeaderKey, }, - network::rpc::Event as RpcEvent, + network::rpc::OutputEvent as RpcEvent, types::{self, BlockVerified}, utils::{decode_app_data, OptionalExtension}, }; @@ -517,7 +517,7 @@ impl TryFrom for Digest { } } -impl TryFrom for PublishMessage { +impl TryFrom for Option { type Error = Report; fn try_from(value: RpcEvent) -> Result { @@ -525,7 +525,9 @@ impl TryFrom for PublishMessage { RpcEvent::HeaderUpdate { header, .. } => header .try_into() .map(Box::new) - .map(PublishMessage::HeaderVerified), + .map(PublishMessage::HeaderVerified) + .map(Some), + RpcEvent::ConnectedHost(_) => Ok(None), // silently skip ConnectedHost event } } } @@ -537,14 +539,16 @@ pub struct ConfidenceMessage { confidence: Option, } -impl TryFrom for PublishMessage { +impl TryFrom for Option { type Error = Report; fn try_from(value: BlockVerified) -> Result { - Ok(PublishMessage::ConfidenceAchieved(ConfidenceMessage { - block_number: value.block_num, - confidence: value.confidence, - })) + Ok(Some(PublishMessage::ConfidenceAchieved( + ConfidenceMessage { + block_number: value.block_num, + confidence: value.confidence, + }, + ))) } } @@ -620,18 +624,22 @@ pub fn filter_fields(data_transactions: &mut [DataTransaction], fields: &HashSet } } -impl TryFrom<(u32, AppData)> for PublishMessage { +#[derive(Clone)] +pub struct ApiData(pub u32, pub AppData); + +impl TryFrom for Option { type Error = Report; - fn try_from((block_number, app_data): (u32, AppData)) -> Result { + fn try_from(ApiData(block_number, app_data): ApiData) -> Result { let data_transactions = app_data .into_iter() .map(TryFrom::try_from) .collect::>>()?; - Ok(PublishMessage::DataVerified(DataMessage { + + Ok(Some(PublishMessage::DataVerified(DataMessage { block_number, data_transactions, - })) + }))) } } diff --git a/core/src/app_client.rs b/core/src/app_client.rs index 79674076a..de87d9740 100644 --- a/core/src/app_client.rs +++ b/core/src/app_client.rs @@ -45,6 +45,7 @@ use tokio::sync::broadcast; use tracing::{debug, error, info, instrument}; use crate::{ + api::v2::types::ApiData, data::{AppDataKey, Database, IsSyncedKey, RecordKey, VerifiedDataKey, VerifiedSyncDataKey}, network::{p2p::Client as P2pClient, rpc::Client as RpcClient}, proof, @@ -428,7 +429,7 @@ pub async fn run( mut block_receive: broadcast::Receiver, pp: Arc, sync_range: Range, - data_verified_sender: broadcast::Sender<(u32, AppData)>, + data_verified_sender: broadcast::Sender, shutdown: Controller, ) { info!("Starting for app {app_id}..."); @@ -502,7 +503,7 @@ pub async fn run( }, }; set_data_verified_state(db.clone(), &sync_range, block_number); - if let Err(error) = data_verified_sender.send((block_number, data)) { + if let Err(error) = data_verified_sender.send(ApiData(block_number, data)) { error!("Cannot send data verified message: {error}"); let _ = shutdown.trigger_shutdown(format!("Cannot send data verified message: {error:#}")); diff --git a/core/src/crawl_client.rs b/core/src/crawl_client.rs index 48211d92c..e2270c8cd 100644 --- a/core/src/crawl_client.rs +++ b/core/src/crawl_client.rs @@ -1,8 +1,5 @@ use crate::{ - network::{ - p2p::Client, - rpc::{self, Event}, - }, + network::{p2p::Client, rpc}, telemetry::{otlp::Record, MetricName, Value}, types::{self, BlockVerified, Delay, Origin}, }; @@ -67,7 +64,7 @@ impl Value for CrawlMetricValue { } pub async fn run( - mut message_rx: broadcast::Receiver, + mut message_rx: broadcast::Receiver, network_client: Client, delay: u64, mode: CrawlMode, @@ -79,7 +76,7 @@ pub async fn run( let delay = Delay(Some(Duration::from_secs(delay))); - while let Ok(rpc::Event::HeaderUpdate { + while let Ok(rpc::OutputEvent::HeaderUpdate { header, received_at, }) = message_rx.recv().await diff --git a/core/src/fat_client.rs b/core/src/fat_client.rs index 977a8cdb2..66e0e2f85 100644 --- a/core/src/fat_client.rs +++ b/core/src/fat_client.rs @@ -31,7 +31,7 @@ use crate::{ data::{BlockHeaderKey, Database}, network::{ p2p::Client as P2pClient, - rpc::{Client as RpcClient, Event}, + rpc::{Client as RpcClient, OutputEvent as RpcEvent}, }, shutdown::Controller, types::{block_matrix_partition_format, BlockVerified, ClientChannels, Delay}, @@ -247,10 +247,12 @@ pub async fn run( let event_sender = event_sender.clone(); let (header, received_at) = match channels.rpc_event_receiver.recv().await { Ok(event) => match event { - Event::HeaderUpdate { + RpcEvent::HeaderUpdate { header, received_at, } => (header, received_at), + // skip ConnectedHost event + RpcEvent::ConnectedHost(_) => continue, }, Err(error) => { error!("Cannot receive message: {error}"); diff --git a/core/src/light_client.rs b/core/src/light_client.rs index e09ee7186..e31d29e1d 100644 --- a/core/src/light_client.rs +++ b/core/src/light_client.rs @@ -32,7 +32,7 @@ use crate::{ data::{AchievedConfidenceKey, BlockHeaderKey, Database, VerifiedCellCountKey}, network::{ self, - rpc::{self, Event}, + rpc::{self, OutputEvent as RpcEvent}, }, shutdown::Controller, types::{self, BlockRange, ClientChannels, Delay}, @@ -201,10 +201,12 @@ pub async fn run( let event_sender = event_sender.clone(); let (header, received_at) = match channels.rpc_event_receiver.recv().await { Ok(event) => match event { - Event::HeaderUpdate { + RpcEvent::HeaderUpdate { header, received_at, } => (header, received_at), + // skip ConnectedHost event + RpcEvent::ConnectedHost(_) => continue, }, Err(error) => { error!("Cannot receive message: {error}"); diff --git a/core/src/network/p2p.rs b/core/src/network/p2p.rs index f83c1a10a..8af1804b9 100644 --- a/core/src/network/p2p.rs +++ b/core/src/network/p2p.rs @@ -72,7 +72,6 @@ pub enum OutputEvent { IncomingGetRecord, IncomingPutRecord, KadModeChange(Mode), - Ping(Duration), IncomingConnection, IncomingConnectionError, diff --git a/core/src/network/rpc.rs b/core/src/network/rpc.rs index f624c748e..cdb9c7ff9 100644 --- a/core/src/network/rpc.rs +++ b/core/src/network/rpc.rs @@ -8,32 +8,43 @@ use color_eyre::{eyre::eyre, Result}; use configuration::RPCConfig; use rand::{seq::SliceRandom, thread_rng, Rng}; use serde::{de, Deserialize, Serialize}; -use std::{collections::HashSet, fmt::Display}; +use std::{collections::HashSet, fmt::Display, time::Instant}; use tokio::{ - sync::broadcast, + sync::broadcast::{Receiver, Sender}, time::{self, timeout}, }; use tracing::{debug, info}; -use crate::{data::Database, network::rpc, shutdown::Controller, types::GrandpaJustification}; - mod client; pub mod configuration; mod subscriptions; +use crate::{ + data::Database, network::rpc::OutputEvent as RpcEvent, shutdown::Controller, + types::GrandpaJustification, +}; +pub use client::Client; use subscriptions::SubscriptionLoop; + const CELL_SIZE: usize = 32; const PROOF_SIZE: usize = 48; pub const CELL_WITH_PROOF_SIZE: usize = CELL_SIZE + PROOF_SIZE; -pub use subscriptions::Event; - -pub use client::Client; pub enum Subscription { Header(AvailHeader), Justification(GrandpaJustification), } +#[derive(Clone, Debug)] +#[allow(clippy::large_enum_variant)] +pub enum OutputEvent { + ConnectedHost(String), + HeaderUpdate { + header: AvailHeader, + received_at: Instant, + }, +} + #[derive(Debug, Deserialize, Clone)] pub struct WrappedJustification(pub GrandpaJustification); @@ -188,20 +199,20 @@ pub async fn init( genesis_hash: &str, rpc: &RPCConfig, shutdown: Controller, -) -> Result<(Client, broadcast::Sender, SubscriptionLoop)> { + event_sender: Sender, +) -> Result<(Client, SubscriptionLoop)> { let rpc_client = Client::new( db.clone(), Nodes::new(&rpc.full_node_ws), genesis_hash, rpc.retry.clone(), shutdown, + event_sender.clone(), ) .await?; - // create output channel for RPC Subscription Events - let (event_sender, _) = broadcast::channel(1000); let subscriptions = SubscriptionLoop::new(db, rpc_client.clone(), event_sender.clone()).await?; - Ok((rpc_client, event_sender, subscriptions)) + Ok((rpc_client, subscriptions)) } /// Generates random cell positions for sampling @@ -259,13 +270,24 @@ pub fn cell_count_for_confidence(confidence: f64) -> u32 { } pub async fn wait_for_finalized_header( - mut rpc_events_receiver: broadcast::Receiver, + mut rpc_events_receiver: Receiver, timeout_seconds: u64, ) -> Result { let timeout_seconds = time::Duration::from_secs(timeout_seconds); - match timeout(timeout_seconds, rpc_events_receiver.recv()).await { - Ok(Ok(rpc::Event::HeaderUpdate { header, .. })) => Ok(header), - Ok(Err(error)) => Err(eyre!("Failed to receive finalized header: {error}")), - Err(_) => Err(eyre!("Timeout on waiting for first finalized header")), + + let result = timeout(timeout_seconds, async { + while let Ok(event) = rpc_events_receiver.recv().await { + if let OutputEvent::HeaderUpdate { header, .. } = event { + return Ok(header); + } + // silently skip ConnectedHost event + } + Err(eyre!("RPC event receiver chanel closed")) + }) + .await; + + match result { + Ok(header) => header, + Err(_) => Err(eyre!("Timeout while waiting for first finalized header")), } } diff --git a/core/src/network/rpc/client.rs b/core/src/network/rpc/client.rs index cd28f51f2..4eb10b48f 100644 --- a/core/src/network/rpc/client.rs +++ b/core/src/network/rpc/client.rs @@ -16,7 +16,7 @@ use color_eyre::{ }; use futures::{Stream, TryStreamExt}; use std::{iter::Iterator, pin::Pin, sync::Arc, time::Duration}; -use tokio::sync::RwLock; +use tokio::sync::{broadcast::Sender, RwLock}; use tokio_retry::Retry; use tokio_stream::{Elapsed, StreamExt}; use tracing::{error, info, warn}; @@ -25,6 +25,7 @@ use super::{configuration::RetryConfig, Node, Nodes, Subscription, WrappedProof} use crate::{ api::v2::types::Base64, data::{Database, RpcNodeKey, SignerNonceKey}, + network::rpc::OutputEvent as RpcEvent, shutdown::Controller, types::DEV_FLAG_GENHASH, }; @@ -122,6 +123,7 @@ pub struct Client { retry_config: RetryConfig, expected_genesis_hash: String, shutdown: Controller, + event_sender: Sender, } pub struct SubmitResponse { @@ -138,12 +140,14 @@ impl Client { expected_genesis_hash: &str, retry_config: RetryConfig, shutdown: Controller, + event_sender: Sender, ) -> Result { let (client, node) = Self::initialize_connection( &nodes, expected_genesis_hash, retry_config.clone(), shutdown.clone(), + event_sender.clone(), ) .await?; @@ -154,6 +158,7 @@ impl Client { retry_config, expected_genesis_hash: expected_genesis_hash.to_string(), shutdown, + event_sender, }; client.db.put(RpcNodeKey, node); @@ -168,12 +173,17 @@ impl Client { expected_genesis_hash: &str, retry_config: RetryConfig, shutdown: Controller, + event_sender: Sender, ) -> Result<(SDK, Node)> { let (available_nodes, _) = nodes.shuffle(Default::default()); let connection_result = shutdown .with_cancel(Retry::spawn(retry_config, || { - Self::attempt_connection(&available_nodes, expected_genesis_hash) + Self::attempt_connection( + &available_nodes, + expected_genesis_hash, + event_sender.clone(), + ) })) .await; @@ -188,10 +198,16 @@ impl Client { async fn attempt_connection( nodes: &[Node], expected_genesis_hash: &str, + event_sender: Sender, ) -> Result> { // Not passing any RPC function calls since this is a first try of connecting RPC nodes - Self::try_connect_and_execute(nodes, expected_genesis_hash, |_| futures::future::ok(())) - .await + Self::try_connect_and_execute( + nodes, + expected_genesis_hash, + |_| futures::future::ok(()), + event_sender, + ) + .await } // Iterates through the RPC nodes, tries to create the first successful connection, verifies the genesis hash, @@ -200,6 +216,7 @@ impl Client { nodes: &[Node], expected_genesis_hash: &str, f: F, + event_sender: Sender, ) -> Result> where F: FnMut(SDK) -> Fut + Copy, @@ -214,6 +231,9 @@ impl Client { match Self::try_node_connection_and_exec(node, expected_genesis_hash, f).await { Ok(attempt) => { info!("Successfully connected to RPC: {}", node.host); + // output event, signaling newly connected RPC host + event_sender.send(RpcEvent::ConnectedHost(node.host.clone()))?; + return Ok(attempt); }, Err(err) => { @@ -364,8 +384,15 @@ impl Client { F: FnMut(SDK) -> Fut + Copy, Fut: std::future::Future>, { - let nodes_fn = - || async { Self::try_connect_and_execute(nodes, &self.expected_genesis_hash, f).await }; + let nodes_fn = move || async move { + Self::try_connect_and_execute( + nodes, + &self.expected_genesis_hash, + f, + self.event_sender.clone(), + ) + .await + }; match self .shutdown diff --git a/core/src/network/rpc/subscriptions.rs b/core/src/network/rpc/subscriptions.rs index 9df1fc235..de677b3ea 100644 --- a/core/src/network/rpc/subscriptions.rs +++ b/core/src/network/rpc/subscriptions.rs @@ -13,7 +13,7 @@ use tokio::sync::broadcast::Sender; use tokio_stream::StreamExt; use tracing::{debug, info, trace}; -use super::{Client, Subscription}; +use super::{Client, OutputEvent, Subscription}; use crate::{ data::{ Database, FinalitySyncCheckpoint, FinalitySyncCheckpointKey, IsFinalitySyncedKey, @@ -24,14 +24,6 @@ use crate::{ utils::filter_auth_set_changes, }; -#[derive(Clone, Debug)] -pub enum Event { - HeaderUpdate { - header: AvailHeader, - received_at: Instant, - }, -} - struct BlockData { justifications: Vec, unverified_headers: Vec<(AvailHeader, Instant, ValidatorSet)>, @@ -42,13 +34,17 @@ struct BlockData { pub struct SubscriptionLoop { rpc_client: Client, - event_sender: Sender, + event_sender: Sender, db: T, block_data: BlockData, } impl SubscriptionLoop { - pub async fn new(db: T, rpc_client: Client, event_sender: Sender) -> Result { + pub async fn new( + db: T, + rpc_client: Client, + event_sender: Sender, + ) -> Result { // get the Hash of the Finalized Head [with Retries] let last_finalized_block_hash = rpc_client.get_finalized_head_hash().await?; @@ -217,7 +213,7 @@ impl SubscriptionLoop { }; // send as output event self.event_sender - .send(Event::HeaderUpdate { + .send(OutputEvent::HeaderUpdate { header, received_at, }) @@ -241,7 +237,7 @@ impl SubscriptionLoop { // finally, send the Verified Block Header self.event_sender - .send(Event::HeaderUpdate { + .send(OutputEvent::HeaderUpdate { header, received_at, }) diff --git a/core/src/types.rs b/core/src/types.rs index 67e7a9f3f..579cc62a3 100644 --- a/core/src/types.rs +++ b/core/src/types.rs @@ -1,5 +1,5 @@ //! Shared light client structs and enums. -use crate::network::rpc::Event; +use crate::network::rpc::OutputEvent; use crate::utils::{extract_app_lookup, extract_kate}; use avail_rust::{ avail_core::DataLookup, @@ -61,7 +61,7 @@ pub struct BlockVerified { pub struct ClientChannels { pub block_sender: broadcast::Sender, - pub rpc_event_receiver: broadcast::Receiver, + pub rpc_event_receiver: broadcast::Receiver, } impl TryFrom<(AvailHeader, Option)> for BlockVerified { diff --git a/crawler/src/main.rs b/crawler/src/main.rs index 11b9e18f2..28670d36d 100644 --- a/crawler/src/main.rs +++ b/crawler/src/main.rs @@ -1,6 +1,6 @@ use avail_light_core::{ crawl_client::{self, CrawlMetricValue, OutputEvent as CrawlerEvent}, - data::{Database, LatestHeaderKey, DB}, + data::{Database, LatestHeaderKey, RpcNodeKey, DB}, network::{ p2p::{self, OutputEvent as P2pEvent}, rpc, Network, @@ -83,9 +83,6 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> let partition = config.crawl_block_matrix_partition; let partition_size = format!("{}/{}", partition.number, partition.fraction); - let metrics = otlp::initialize("avail".to_string(), &config.origin, config.otel.clone()) - .wrap_err("Unable to initialize OpenTelemetry service")?; - let (p2p_client, p2p_event_loop, p2p_event_receiver) = p2p::init( config.libp2p.clone(), "avail".to_string(), @@ -116,16 +113,18 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> } })); - let (_, rpc_events, rpc_subscriptions) = rpc::init( + let (rpc_events_sender, _) = broadcast::channel(1000); + let (_, rpc_subscriptions) = rpc::init( db.clone(), &config.genesis_hash, &config.rpc, shutdown.clone(), + rpc_events_sender.clone(), ) .await?; - let first_header_rpc_event_receiver = rpc_events.subscribe(); - let client_rpc_event_receiver = rpc_events.subscribe(); + let first_header_rpc_event_receiver = rpc_events_sender.subscribe(); + let client_rpc_event_receiver = rpc_events_sender.subscribe(); let rpc_subscriptions_handle = spawn_in_span(shutdown.with_cancel(shutdown.with_trigger( "Subscription loop failure triggered shutdown".to_string(), @@ -185,7 +184,15 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> ("operating_mode".to_string(), "client".to_string()), ]; - let mut state = CrawlerState::new(metrics, String::default(), metric_attributes); + let metrics = otlp::initialize("avail".to_string(), &config.origin, config.otel.clone()) + .wrap_err("Unable to initialize OpenTelemetry service")?; + + let rpc_host = db + .get(RpcNodeKey) + .map(|node| node.host) + .ok_or_else(|| eyre!("No connected host found"))?; + + let mut state = CrawlerState::new(metrics, String::default(), rpc_host, metric_attributes); spawn_in_span(shutdown.with_cancel(async move { state @@ -200,6 +207,7 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> struct CrawlerState { metrics: Metrics, multiaddress: String, + rpc_host: String, metric_attributes: Vec<(String, String)>, } @@ -207,11 +215,13 @@ impl CrawlerState { fn new( metrics: Metrics, multiaddress: String, + rpc_host: String, metric_attributes: Vec<(String, String)>, ) -> Self { CrawlerState { metrics, multiaddress, + rpc_host, metric_attributes, } } @@ -221,7 +231,10 @@ impl CrawlerState { } fn attributes(&self) -> Vec<(String, String)> { - let mut attrs = vec![("multiaddress".to_string(), self.multiaddress.clone())]; + let mut attrs = vec![ + ("multiaddress".to_string(), self.multiaddress.clone()), + ("rpc_host".to_string(), self.rpc_host.to_string()), + ]; attrs.extend(self.metric_attributes.clone()); attrs diff --git a/fat/src/main.rs b/fat/src/main.rs index 7587d063b..7d882cbe9 100644 --- a/fat/src/main.rs +++ b/fat/src/main.rs @@ -1,10 +1,11 @@ use avail_light_core::{ api::{self, configuration::SharedConfig}, - data::{Database, LatestHeaderKey, DB}, + data::{Database, LatestHeaderKey, RpcNodeKey, DB}, fat_client::{self, OutputEvent as FatEvent}, network::{ p2p::{self, extract_block_num, OutputEvent as P2pEvent}, - rpc, Network, + rpc::{self, OutputEvent as RpcEvent}, + Network, }, shutdown::Controller, telemetry::{self, otlp::Metrics, MetricCounter, MetricValue}, @@ -82,10 +83,6 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> let partition_size = format!("{}/{}", partition.number, partition.fraction); let identity_cfg = IdentityConfig::from_suri("//Alice".to_string(), None)?; - let metrics = - telemetry::otlp::initialize("avail".to_string(), &Origin::FatClient, config.otel.clone()) - .wrap_err("Unable to initialize OpenTelemetry service")?; - let (p2p_client, p2p_event_loop, p2p_event_receiver) = p2p::init( config.libp2p.clone(), "avail".to_string(), @@ -118,16 +115,18 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> } })); - let (rpc_client, rpc_events, rpc_subscriptions) = rpc::init( + let (rpc_event_sender, rpc_event_receiver) = broadcast::channel(1000); + let (rpc_client, rpc_subscriptions) = rpc::init( db.clone(), &config.genesis_hash, &config.rpc, shutdown.clone(), + rpc_event_sender.clone(), ) .await?; - let first_header_rpc_event_receiver = rpc_events.subscribe(); - let client_rpc_event_receiver = rpc_events.subscribe(); + let first_header_rpc_event_receiver = rpc_event_sender.subscribe(); + let client_rpc_event_receiver = rpc_event_sender.subscribe(); let rpc_subscriptions_handle = spawn_in_span(shutdown.with_cancel(shutdown.with_trigger( "Subscription loop failure triggered shutdown".to_string(), @@ -206,11 +205,25 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> ("operating_mode".to_string(), "client".to_string()), ]; - let mut state = FatState::new(metrics, String::default(), metric_attributes); + let metrics = + telemetry::otlp::initialize("avail".to_string(), &Origin::FatClient, config.otel.clone()) + .wrap_err("Unable to initialize OpenTelemetry service")?; + + let rpc_host = db + .get(RpcNodeKey) + .map(|node| node.host) + .ok_or_else(|| eyre!("No connected host found"))?; + + let mut state = FatState::new(metrics, String::default(), rpc_host, metric_attributes); spawn_in_span(shutdown.with_cancel(async move { state - .handle_events(p2p_event_receiver, maintenance_receiver, fat_receiver) + .handle_events( + p2p_event_receiver, + maintenance_receiver, + fat_receiver, + rpc_event_receiver, + ) .await; })); @@ -265,6 +278,7 @@ impl BlockStat { struct FatState { metrics: Metrics, multiaddress: String, + rpc_host: String, metric_attributes: Vec<(String, String)>, active_blocks: HashMap, } @@ -273,11 +287,13 @@ impl FatState { fn new( metrics: Metrics, multiaddress: String, + rpc_host: String, metric_attributes: Vec<(String, String)>, ) -> Self { FatState { metrics, multiaddress, + rpc_host, metric_attributes, active_blocks: Default::default(), } @@ -287,8 +303,15 @@ impl FatState { self.multiaddress = value; } + fn update_rpc_host(&mut self, value: String) { + self.rpc_host = value; + } + fn attributes(&self) -> Vec<(String, String)> { - let mut attrs = vec![("multiaddress".to_string(), self.multiaddress.clone())]; + let mut attrs = vec![ + ("multiaddress".to_string(), self.multiaddress.clone()), + ("rpc_host".to_string(), self.rpc_host.to_string()), + ]; attrs.extend(self.metric_attributes.clone()); attrs @@ -374,6 +397,7 @@ impl FatState { mut p2p_receiver: UnboundedReceiver, mut maintenance_receiver: UnboundedReceiver, mut fat_receiver: UnboundedReceiver, + mut rpc_receiver: broadcast::Receiver, ) { self.metrics.count(MetricCounter::Starts, self.attributes()); loop { @@ -464,6 +488,12 @@ impl FatState { } } } + + Ok(rpc_event) = rpc_receiver.recv() => { + if let RpcEvent::ConnectedHost(host) = rpc_event { + self.update_rpc_host(host); + } + } // break the loop if all channels are closed else => break, }