Skip to content

Commit

Permalink
RPC client updates (#727)
Browse files Browse the repository at this point in the history
* unified events from rpc client

* add RPC endpoint to metric attributes 

---------

Co-authored-by: momosh <[email protected]>
  • Loading branch information
vbhattaccmu and momosh authored Oct 29, 2024
1 parent a9e1c61 commit db5eb49
Show file tree
Hide file tree
Showing 15 changed files with 250 additions and 119 deletions.
70 changes: 50 additions & 20 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@

use crate::cli::CliOpts;
use avail_light_core::{
api,
api::{self, v2::types::ApiData},
data::{
self, ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey,
self, ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey, RpcNodeKey,
SignerNonceKey, DB,
},
light_client::{self, OutputEvent as LcEvent},
maintenance::{self, OutputEvent as MaintenanceEvent},
network::{
self,
p2p::{self, extract_block_num, OutputEvent as P2pEvent, BOOTSTRAP_LIST_EMPTY_MESSAGE},
rpc, Network,
rpc::{self, OutputEvent as RpcEvent},
Network,
},
shutdown::Controller,
sync_client::SyncClient,
Expand All @@ -24,11 +25,7 @@ use avail_light_core::{
},
utils::{default_subscriber, install_panic_hooks, json_subscriber, spawn_in_span},
};
use avail_rust::{
avail_core::AppId,
kate_recovery::{com::AppData, couscous},
sp_core::blake2_128,
};
use avail_rust::{avail_core::AppId, kate_recovery::couscous, sp_core::blake2_128};
use clap::Parser;
use color_eyre::{
eyre::{eyre, WrapErr},
Expand Down Expand Up @@ -59,6 +56,9 @@ use tikv_jemallocator::Jemalloc;
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

mod cli;
mod config;

/// Light Client for Avail Blockchain
async fn run(
Expand Down Expand Up @@ -132,18 +132,25 @@ async fn run(
let public_params_len = hex::encode(raw_pp).len();
trace!("Public params ({public_params_len}): hash: {public_params_hash}");

let (rpc_client, rpc_events, rpc_subscriptions) =
rpc::init(db.clone(), &cfg.genesis_hash, &cfg.rpc, shutdown.clone()).await?;
let (rpc_event_sender, rpc_event_receiver) = broadcast::channel(1000);
let (rpc_client, rpc_subscriptions) = rpc::init(
db.clone(),
&cfg.genesis_hash,
&cfg.rpc,
shutdown.clone(),
rpc_event_sender.clone(),
)
.await?;

let account_id = identity_cfg.avail_key_pair.public_key().to_account_id();
let client = rpc_client.current_client().await;
let nonce = client.api.tx().account_nonce(&account_id).await?;
db.put(SignerNonceKey, nonce);

// Subscribing to RPC events before first event is published
let publish_rpc_event_receiver = rpc_events.subscribe();
let first_header_rpc_event_receiver = rpc_events.subscribe();
let client_rpc_event_receiver = rpc_events.subscribe();
let publish_rpc_event_receiver = rpc_event_sender.subscribe();
let first_header_rpc_event_receiver = rpc_event_sender.subscribe();
let client_rpc_event_receiver = rpc_event_sender.subscribe();

// spawn the RPC Network task for Event Loop to run in the background
// and shut it down, without delays
Expand Down Expand Up @@ -199,7 +206,7 @@ async fn run(
let (block_tx, block_rx) = broadcast::channel::<avail_light_core::types::BlockVerified>(1 << 7);

let data_rx = cfg.app_id.map(AppId).map(|app_id| {
let (data_tx, data_rx) = broadcast::channel::<(u32, AppData)>(1 << 7);
let (data_tx, data_rx) = broadcast::channel::<ApiData>(1 << 7);
spawn_in_span(shutdown.with_cancel(avail_light_core::app_client::run(
(&cfg).into(),
db.clone(),
Expand Down Expand Up @@ -315,25 +322,33 @@ async fn run(
telemetry::otlp::initialize(cfg.project_name.clone(), &cfg.origin, cfg.otel.clone())
.wrap_err("Unable to initialize OpenTelemetry service")?;

let rpc_host = db
.get(RpcNodeKey)
.map(|node| node.host)
.ok_or_else(|| eyre!("No connected host found"))?;

let mut state = ClientState::new(
metrics,
cfg.libp2p.kademlia.operation_mode.into(),
rpc_host,
Multiaddr::empty(),
metric_attributes,
);

spawn_in_span(shutdown.with_cancel(async move {
state
.handle_events(p2p_event_receiver, maintenance_receiver, lc_receiver)
.handle_events(
p2p_event_receiver,
maintenance_receiver,
lc_receiver,
rpc_event_receiver,
)
.await;
}));

Ok(())
}

mod cli;
mod config;

pub fn load_runtime_config(opts: &CliOpts) -> Result<RuntimeConfig> {
let mut cfg = if let Some(config_path) = &opts.config {
fs::metadata(config_path).map_err(|_| eyre!("Provided config file doesn't exist."))?;
Expand Down Expand Up @@ -441,6 +456,7 @@ struct ClientState {
metrics: Metrics,
kad_mode: Mode,
multiaddress: Multiaddr,
rpc_host: String,
metric_attributes: Vec<(String, String)>,
active_blocks: HashMap<u32, BlockStat>,
}
Expand All @@ -449,13 +465,15 @@ impl ClientState {
fn new(
metrics: Metrics,
kad_mode: Mode,
rpc_host: String,
multiaddress: Multiaddr,
metric_attributes: Vec<(String, String)>,
) -> Self {
ClientState {
metrics,
kad_mode,
multiaddress,
rpc_host,
metric_attributes,
active_blocks: Default::default(),
}
Expand All @@ -469,10 +487,15 @@ impl ClientState {
self.kad_mode = value;
}

fn update_rpc_host(&mut self, value: String) {
self.rpc_host = value;
}

fn attributes(&self) -> Vec<(String, String)> {
let mut attrs = vec![
("operating_mode".to_string(), self.kad_mode.to_string()),
("multiaddress".to_string(), self.multiaddress.to_string()),
("rpc_host".to_string(), self.rpc_host.to_string()),
];

attrs.extend(self.metric_attributes.clone());
Expand Down Expand Up @@ -559,6 +582,7 @@ impl ClientState {
mut p2p_receiver: UnboundedReceiver<P2pEvent>,
mut maintenance_receiver: UnboundedReceiver<MaintenanceEvent>,
mut lc_receiver: UnboundedReceiver<LcEvent>,
mut rpc_receiver: broadcast::Receiver<RpcEvent>,
) {
self.metrics.count(MetricCounter::Starts, self.attributes());
loop {
Expand Down Expand Up @@ -667,10 +691,16 @@ impl ClientState {
},
LcEvent::RecordRPCFetchDuration(duration) => {
self.metrics.record(MetricValue::RPCFetchDuration(duration));
}
},
LcEvent::RecordBlockConfidence(confidence) => {
self.metrics.record(MetricValue::BlockConfidence(confidence));
}
},
}
}

Ok(rpc_event) = rpc_receiver.recv() => {
if let RpcEvent::ConnectedHost(host) = rpc_event {
self.update_rpc_host(host);
}
}
// break the loop if all channels are closed
Expand Down
4 changes: 3 additions & 1 deletion compatibility-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use avail_rust::kate_recovery::matrix::Position;
use clap::Parser;
use color_eyre::Result;
use std::time::Duration;
use tokio::sync::broadcast;

#[derive(Parser)]
struct CommandArgs {
Expand Down Expand Up @@ -40,7 +41,8 @@ async fn main() -> Result<()> {
};

let shutdown = Controller::new();
let (rpc_client, _, subscriptions) = rpc::init(db, "DEV", &rpc_cfg, shutdown).await?;
let (rpc_sender, _) = broadcast::channel(1000);
let (rpc_client, subscriptions) = rpc::init(db, "DEV", &rpc_cfg, shutdown, rpc_sender).await?;
tokio::spawn(subscriptions.run());

let mut correct: bool = true;
Expand Down
32 changes: 17 additions & 15 deletions core/src/api/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ fn ws_route(
.and_then(handlers::ws)
}

pub async fn publish<T: Clone + TryInto<PublishMessage>>(
pub async fn publish<T: Clone + TryInto<Option<PublishMessage>>>(
topic: Topic,
mut receiver: broadcast::Receiver<T>,
clients: WsClients,
) where
<T as TryInto<PublishMessage>>::Error: Display,
<T as TryInto<Option<PublishMessage>>>::Error: Display,
{
loop {
let message = match receiver.recv().await {
Expand All @@ -185,25 +185,27 @@ pub async fn publish<T: Clone + TryInto<PublishMessage>>(
return;
},
};

let message: PublishMessage = match message.try_into() {
Ok(message) => message,
let message: Option<PublishMessage> = match message.try_into() {
Ok(Some(message)) => Some(message),
Ok(None) => continue, // Silently skip
Err(error) => {
error!(?topic, "Cannot create message: {error}");
continue;
},
};

match clients.publish(&topic, message).await {
Ok(results) => {
let published = results.iter().filter(|&result| result.is_ok()).count();
let failed = results.iter().filter(|&result| result.is_err()).count();
info!(?topic, published, failed, "Message published to clients");
for error in results.into_iter().filter_map(Result::err) {
debug!(?topic, "Cannot publish message to client: {error}")
}
},
Err(error) => error!(?topic, "Cannot publish message: {error}"),
if let Some(message) = message {
match clients.publish(&topic, message).await {
Ok(results) => {
let published = results.iter().filter(|&result| result.is_ok()).count();
let failed = results.iter().filter(|&result| result.is_err()).count();
info!(?topic, published, failed, "Message published to clients");
for error in results.into_iter().filter_map(Result::err) {
debug!(?topic, "Cannot publish message to client: {error}")
}
},
Err(error) => error!(?topic, "Cannot publish message: {error}"),
}
}
}
}
Expand Down
32 changes: 20 additions & 12 deletions core/src/api/v2/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
LatestSyncKey, RpcNodeKey, VerifiedDataKey, VerifiedHeaderKey, VerifiedSyncDataKey,
VerifiedSyncHeaderKey,
},
network::rpc::Event as RpcEvent,
network::rpc::OutputEvent as RpcEvent,
types::{self, BlockVerified},
utils::{decode_app_data, OptionalExtension},
};
Expand Down Expand Up @@ -517,15 +517,17 @@ impl TryFrom<ApiDigest> for Digest {
}
}

impl TryFrom<RpcEvent> for PublishMessage {
impl TryFrom<RpcEvent> for Option<PublishMessage> {
type Error = Report;

fn try_from(value: RpcEvent) -> Result<Self, Self::Error> {
match value {
RpcEvent::HeaderUpdate { header, .. } => header
.try_into()
.map(Box::new)
.map(PublishMessage::HeaderVerified),
.map(PublishMessage::HeaderVerified)
.map(Some),
RpcEvent::ConnectedHost(_) => Ok(None), // silently skip ConnectedHost event
}
}
}
Expand All @@ -537,14 +539,16 @@ pub struct ConfidenceMessage {
confidence: Option<f64>,
}

impl TryFrom<BlockVerified> for PublishMessage {
impl TryFrom<BlockVerified> for Option<PublishMessage> {
type Error = Report;

fn try_from(value: BlockVerified) -> Result<Self, Self::Error> {
Ok(PublishMessage::ConfidenceAchieved(ConfidenceMessage {
block_number: value.block_num,
confidence: value.confidence,
}))
Ok(Some(PublishMessage::ConfidenceAchieved(
ConfidenceMessage {
block_number: value.block_num,
confidence: value.confidence,
},
)))
}
}

Expand Down Expand Up @@ -620,18 +624,22 @@ pub fn filter_fields(data_transactions: &mut [DataTransaction], fields: &HashSet
}
}

impl TryFrom<(u32, AppData)> for PublishMessage {
#[derive(Clone)]
pub struct ApiData(pub u32, pub AppData);

impl TryFrom<ApiData> for Option<PublishMessage> {
type Error = Report;

fn try_from((block_number, app_data): (u32, AppData)) -> Result<Self, Self::Error> {
fn try_from(ApiData(block_number, app_data): ApiData) -> Result<Self, Self::Error> {
let data_transactions = app_data
.into_iter()
.map(TryFrom::try_from)
.collect::<Result<Vec<_>>>()?;
Ok(PublishMessage::DataVerified(DataMessage {

Ok(Some(PublishMessage::DataVerified(DataMessage {
block_number,
data_transactions,
}))
})))
}
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/app_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use tokio::sync::broadcast;
use tracing::{debug, error, info, instrument};

use crate::{
api::v2::types::ApiData,
data::{AppDataKey, Database, IsSyncedKey, RecordKey, VerifiedDataKey, VerifiedSyncDataKey},
network::{p2p::Client as P2pClient, rpc::Client as RpcClient},
proof,
Expand Down Expand Up @@ -428,7 +429,7 @@ pub async fn run(
mut block_receive: broadcast::Receiver<BlockVerified>,
pp: Arc<PublicParameters>,
sync_range: Range<u32>,
data_verified_sender: broadcast::Sender<(u32, AppData)>,
data_verified_sender: broadcast::Sender<ApiData>,
shutdown: Controller<String>,
) {
info!("Starting for app {app_id}...");
Expand Down Expand Up @@ -502,7 +503,7 @@ pub async fn run(
},
};
set_data_verified_state(db.clone(), &sync_range, block_number);
if let Err(error) = data_verified_sender.send((block_number, data)) {
if let Err(error) = data_verified_sender.send(ApiData(block_number, data)) {
error!("Cannot send data verified message: {error}");
let _ =
shutdown.trigger_shutdown(format!("Cannot send data verified message: {error:#}"));
Expand Down
9 changes: 3 additions & 6 deletions core/src/crawl_client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use crate::{
network::{
p2p::Client,
rpc::{self, Event},
},
network::{p2p::Client, rpc},
telemetry::{otlp::Record, MetricName, Value},
types::{self, BlockVerified, Delay, Origin},
};
Expand Down Expand Up @@ -67,7 +64,7 @@ impl Value for CrawlMetricValue {
}

pub async fn run(
mut message_rx: broadcast::Receiver<Event>,
mut message_rx: broadcast::Receiver<rpc::OutputEvent>,
network_client: Client,
delay: u64,
mode: CrawlMode,
Expand All @@ -79,7 +76,7 @@ pub async fn run(

let delay = Delay(Some(Duration::from_secs(delay)));

while let Ok(rpc::Event::HeaderUpdate {
while let Ok(rpc::OutputEvent::HeaderUpdate {
header,
received_at,
}) = message_rx.recv().await
Expand Down
Loading

0 comments on commit db5eb49

Please sign in to comment.