Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Give names to channels (#5626)
Browse files Browse the repository at this point in the history
* Give names to channels

* Fix

* A couple more changes

* More minor tweaks

* Fix test
  • Loading branch information
tomaka authored Apr 14, 2020
1 parent 482f488 commit 0132d52
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 24 deletions.
2 changes: 1 addition & 1 deletion bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ macro_rules! new_full {
service.spawn_essential_task("babe-proposer", babe);

let network = service.network();
let dht_event_stream = network.event_stream().filter_map(|e| async move { match e {
let dht_event_stream = network.event_stream("authority-discovery").filter_map(|e| async move { match e {
Event::Dht(e) => Some(e),
_ => None,
}}).boxed();
Expand Down
2 changes: 1 addition & 1 deletion client/network-gossip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub trait Network<B: BlockT> {

impl<B: BlockT, H: ExHashT> Network<B> for Arc<NetworkService<B, H>> {
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
Box::pin(NetworkService::event_stream(self))
Box::pin(NetworkService::event_stream(self, "network-gossip"))
}

fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange) {
Expand Down
8 changes: 6 additions & 2 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,13 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
/// If this method is called multiple times, the events are duplicated.
///
/// The stream never ends (unless the `NetworkWorker` gets shut down).
pub fn event_stream(&self) -> impl Stream<Item = Event> {
///
/// The name passed is used to identify the channel in the Prometheus metrics. Note that the
/// parameter is a `&'static str`, and not a `String`, in order to avoid accidentally having
/// an unbounded set of Prometheus metrics, which would be quite bad in terms of memory
pub fn event_stream(&self, name: &'static str) -> impl Stream<Item = Event> {
// Note: when transitioning to stable futures, remove the `Error` entirely
let (tx, rx) = out_events::channel();
let (tx, rx) = out_events::channel(name);
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
rx
}
Expand Down
44 changes: 25 additions & 19 deletions client/network/src/service/out_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ use std::{
};

/// Creates a new channel that can be associated to a [`OutChannels`].
pub fn channel() -> (Sender, Receiver) {
///
/// The name is used in Prometheus reports.
pub fn channel(name: &'static str) -> (Sender, Receiver) {
let (tx, rx) = mpsc::unbounded();
let metrics = Arc::new(Mutex::new(None));
let tx = Sender { inner: tx, metrics: metrics.clone() };
let rx = Receiver { inner: rx, metrics };
let tx = Sender { inner: tx, name, metrics: metrics.clone() };
let rx = Receiver { inner: rx, name, metrics };
(tx, rx)
}

Expand All @@ -60,6 +62,7 @@ pub fn channel() -> (Sender, Receiver) {
/// sync on drop. If someone adds a `#[derive(Clone)]` below, it is **wrong**.
pub struct Sender {
inner: mpsc::UnboundedSender<Event>,
name: &'static str,
/// Clone of [`Receiver::metrics`].
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
}
Expand All @@ -82,6 +85,7 @@ impl Drop for Sender {
/// Receiving side of a channel.
pub struct Receiver {
inner: mpsc::UnboundedReceiver<Event>,
name: &'static str,
/// Initially contains `None`, and will be set to a value once the corresponding [`Sender`]
/// is assigned to an instance of [`OutChannels`].
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
Expand All @@ -94,7 +98,7 @@ impl Stream for Receiver {
if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) {
let metrics = self.metrics.lock().clone();
if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) {
metrics.event_out(&ev);
metrics.event_out(&ev, self.name);
} else {
log::warn!("Inconsistency in out_events: event happened before sender associated");
}
Expand Down Expand Up @@ -161,7 +165,9 @@ impl OutChannels {
});

if let Some(metrics) = &*self.metrics {
metrics.event_in(&event, self.event_streams.len() as u64);
for ev in &self.event_streams {
metrics.event_in(&event, 1, ev.name);
}
}
}
}
Expand Down Expand Up @@ -190,15 +196,15 @@ impl Metrics {
"Number of broadcast network events that have been sent or received across all \
channels"
),
&["event_name", "action"]
&["event_name", "action", "name"]
)?, registry)?,
notifications_sizes: register(CounterVec::new(
Opts::new(
"sub_libp2p_out_events_notifications_sizes",
"Size of notification events that have been sent or received across all \
channels"
),
&["protocol", "action"]
&["protocol", "action", "name"]
)?, registry)?,
num_channels: register(Gauge::new(
"sub_libp2p_out_events_num_channels",
Expand All @@ -207,60 +213,60 @@ impl Metrics {
})
}

fn event_in(&self, event: &Event, num: u64) {
fn event_in(&self, event: &Event, num: u64, name: &str) {
match event {
Event::Dht(_) => {
self.events_total
.with_label_values(&["dht", "sent"])
.with_label_values(&["dht", "sent", name])
.inc_by(num);
}
Event::NotificationStreamOpened { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "sent"])
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "sent", name])
.inc_by(num);
},
Event::NotificationStreamClosed { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "sent"])
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "sent", name])
.inc_by(num);
},
Event::NotificationsReceived { messages, .. } => {
for (engine_id, message) in messages {
self.events_total
.with_label_values(&[&format!("notif-{:?}", engine_id), "sent"])
.with_label_values(&[&format!("notif-{:?}", engine_id), "sent", name])
.inc_by(num);
self.notifications_sizes
.with_label_values(&[&engine_id_to_string(engine_id), "sent"])
.with_label_values(&[&engine_id_to_string(engine_id), "sent", name])
.inc_by(num.saturating_mul(u64::try_from(message.len()).unwrap_or(u64::max_value())));
}
},
}
}

fn event_out(&self, event: &Event) {
fn event_out(&self, event: &Event, name: &str) {
match event {
Event::Dht(_) => {
self.events_total
.with_label_values(&["dht", "received"])
.with_label_values(&["dht", "received", name])
.inc();
}
Event::NotificationStreamOpened { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "received"])
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "received", name])
.inc();
},
Event::NotificationStreamClosed { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "received"])
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "received", name])
.inc();
},
Event::NotificationsReceived { messages, .. } => {
for (engine_id, message) in messages {
self.events_total
.with_label_values(&[&format!("notif-{:?}", engine_id), "received"])
.with_label_values(&[&format!("notif-{:?}", engine_id), "received", name])
.inc();
self.notifications_sizes
.with_label_values(&[&engine_id_to_string(engine_id), "received"])
.with_label_values(&[&engine_id_to_string(engine_id), "received", name])
.inc_by(u64::try_from(message.len()).unwrap_or(u64::max_value()));
}
},
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ fn build_test_full_node(config: config::NetworkConfiguration)
.unwrap();

let service = worker.service().clone();
let event_stream = service.event_stream();
let event_stream = service.event_stream("test");

async_std::task::spawn(async move {
futures::pin_mut!(worker);
Expand Down

0 comments on commit 0132d52

Please sign in to comment.