From 9e4e750a0dc1100134c607ce00141a1e5e05a213 Mon Sep 17 00:00:00 2001 From: Marko Petrlic <petrlic.marko@gmail.com> Date: Mon, 13 May 2024 13:30:06 +0200 Subject: [PATCH 1/4] Extended custom telemetry --- .../bin/node-template/node/src/service.rs | 10 +- .../client/telemetry/src/custom_telemetry.rs | 96 +++++++++++++------ 2 files changed, 76 insertions(+), 30 deletions(-) diff --git a/substrate/bin/node-template/node/src/service.rs b/substrate/bin/node-template/node/src/service.rs index ce07e61cc4dee..c39f363501e52 100644 --- a/substrate/bin/node-template/node/src/service.rs +++ b/substrate/bin/node-template/node/src/service.rs @@ -63,10 +63,16 @@ pub fn new_partial(config: &Configuration) -> Result<Service, ServiceError> { }); let telemetry_handle = telemetry.as_ref().map(|t| t.handle()); - let custom_telemetry_worker = CustomTelemetryWorker { handle: telemetry_handle, sampling_interval_ms: 6_000u128 }; + let custom_telemetry_worker = CustomTelemetryWorker + { + handle: telemetry_handle, + sampling_interval_ms: 6_000u128, + max_interval_buffer_size: 20, + max_block_request_buffer_size: 15, + }; task_manager .spawn_handle() - .spawn("custom_telemetry", None, custom_telemetry_worker.run()); + .spawn("custom_telemetry", None, custom_telemetry_worker.run(None, None)); let select_chain = sc_consensus::LongestChain::new(backend.clone()); diff --git a/substrate/client/telemetry/src/custom_telemetry.rs b/substrate/client/telemetry/src/custom_telemetry.rs index 615f6c12881bb..4085769a21a15 100644 --- a/substrate/client/telemetry/src/custom_telemetry.rs +++ b/substrate/client/telemetry/src/custom_telemetry.rs @@ -6,13 +6,9 @@ use std::{ use serde::Serialize; use wasm_timer::{SystemTime, UNIX_EPOCH}; +use crate::custom_telemetry::external::BlockIntervalFromNode; use crate::{telemetry, TelemetryHandle, SUBSTRATE_INFO}; -/// Maximum amount of intervals that we will keep in our storage. -pub const MAXIMUM_INTERVALS_LENGTH: usize = 50; -/// Maximum amount of block requests info that we will keep in our storage. -pub const MAXIMUM_BLOCK_REQUESTS_LENGTH: usize = 15; - /// #[repr(u8)] #[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] @@ -63,12 +59,22 @@ pub struct BlockMetrics { partial_intervals: Vec<IntervalWithBlockInformation>, /// block_requests: Vec<BlockRequestsDetail>, + /// + max_interval_buffer_size: usize, + /// + max_block_request_buffer_size: usize, } impl BlockMetrics { /// pub const fn new() -> Self { - Self { intervals: Vec::new(), partial_intervals: Vec::new(), block_requests: Vec::new() } + Self { + intervals: Vec::new(), + partial_intervals: Vec::new(), + block_requests: Vec::new(), + max_interval_buffer_size: 0, + max_block_request_buffer_size: 0, + } } } @@ -81,7 +87,7 @@ impl BlockMetrics { return; }; - if lock.intervals.len() >= MAXIMUM_INTERVALS_LENGTH { + if lock.intervals.len() >= lock.max_interval_buffer_size { lock.intervals.remove(0); } @@ -100,12 +106,12 @@ impl BlockMetrics { let Ok(mut lock) = BLOCK_METRICS.lock() else { return; }; - + if is_start { - if lock.partial_intervals.len() >= MAXIMUM_INTERVALS_LENGTH { + if lock.partial_intervals.len() >= lock.max_interval_buffer_size { lock.partial_intervals.remove(0); } - + let value = IntervalWithBlockInformation { kind, block_number, @@ -113,19 +119,19 @@ impl BlockMetrics { start_timestamp: timestamp, end_timestamp: 0, }; - + lock.partial_intervals.push(value); return; } - + let existing_entry_pos = lock.partial_intervals.iter_mut().position(|v| { v.block_hash == block_hash && v.block_number == block_number && v.kind == kind }); - + let Some(pos) = existing_entry_pos else { return; }; - + lock.partial_intervals.remove(pos) }; @@ -140,7 +146,7 @@ impl BlockMetrics { return; }; - if lock.block_requests.len() >= MAXIMUM_BLOCK_REQUESTS_LENGTH { + if lock.block_requests.len() >= lock.max_block_request_buffer_size { lock.block_requests.remove(0); } @@ -169,7 +175,7 @@ impl BlockMetrics { } /// This will be send to the telemetry -mod external { +pub mod external { use super::*; #[derive(Debug, Serialize, Clone)] @@ -182,6 +188,7 @@ mod external { pub end_timestamp: u64, } + /// #[derive(Debug, Default, Serialize, Clone)] pub struct BlockIntervalFromNode { /// @@ -237,31 +244,64 @@ pub struct CustomTelemetryWorker { pub handle: Option<TelemetryHandle>, /// pub sampling_interval_ms: u128, + /// + pub max_interval_buffer_size: usize, + /// + pub max_block_request_buffer_size: usize, } impl CustomTelemetryWorker { /// - pub async fn run(self) { + pub async fn run( + self, + filter_intervals: Option<fn(Vec<BlockIntervalFromNode>) -> Vec<BlockIntervalFromNode>>, + filter_block_requests: Option<fn(Vec<BlockRequestsDetail>) -> Vec<BlockRequestsDetail>>, + ) { const SLEEP_DURATION: Duration = Duration::from_millis(250); + if let Ok(mut lock) = BLOCK_METRICS.lock() { + lock.max_interval_buffer_size = self.max_interval_buffer_size; + lock.max_block_request_buffer_size = self.max_block_request_buffer_size; + } + let mut start = std::time::Instant::now(); loop { if start.elapsed().as_millis() >= self.sampling_interval_ms { - let metrics = BlockMetrics::take_metrics().unwrap_or_default(); - let block_intervals = external::prepare_data(metrics.intervals); - - telemetry!( - self.handle; - SUBSTRATE_INFO; - "block.metrics"; - "block_intervals" => block_intervals, - "block_requests" => metrics.block_requests, - ); - + self.send_telemetry(filter_intervals, filter_block_requests); start = std::time::Instant::now(); } tokio::time::sleep(SLEEP_DURATION).await; } } + + pub async fn send_telemetry( + &self, + filter_intervals: Option<fn(Vec<BlockIntervalFromNode>) -> Vec<BlockIntervalFromNode>>, + filter_block_requests: Option<fn(Vec<BlockRequestsDetail>) -> Vec<BlockRequestsDetail>>, + ) { + let metrics = BlockMetrics::take_metrics().unwrap_or_default(); + + let block_intervals = external::prepare_data(metrics.intervals); + let block_intervals = match filter_intervals { + Some(f) => f(block_intervals), + _ => block_intervals, + }; + + let block_requests = metrics.block_requests; + let block_requests = match filter_block_requests { + Some(f) => f(block_requests), + _ => block_requests, + }; + + if block_intervals.len() > 0 || block_requests.len() > 0 { + telemetry!( + self.handle; + SUBSTRATE_INFO; + "block.metrics"; + "block_intervals" => block_intervals, + "block_requests" => block_requests, + ); + } + } } From 9fe897560cda4cd38020ac8dcace6182542c4338 Mon Sep 17 00:00:00 2001 From: Marko Petrlic <petrlic.marko@gmail.com> Date: Mon, 13 May 2024 13:56:57 +0200 Subject: [PATCH 2/4] Fixed send_telemetry not getting awaited --- substrate/client/telemetry/src/custom_telemetry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/telemetry/src/custom_telemetry.rs b/substrate/client/telemetry/src/custom_telemetry.rs index 4085769a21a15..e5fcce197bc3b 100644 --- a/substrate/client/telemetry/src/custom_telemetry.rs +++ b/substrate/client/telemetry/src/custom_telemetry.rs @@ -267,7 +267,7 @@ impl CustomTelemetryWorker { let mut start = std::time::Instant::now(); loop { if start.elapsed().as_millis() >= self.sampling_interval_ms { - self.send_telemetry(filter_intervals, filter_block_requests); + self.send_telemetry(filter_intervals, filter_block_requests).await; start = std::time::Instant::now(); } From c8bcc19844638b4b3112a75db1c3575eed51426f Mon Sep 17 00:00:00 2001 From: Marko Petrlic <petrlic.marko@gmail.com> Date: Mon, 13 May 2024 14:08:02 +0200 Subject: [PATCH 3/4] removing non existing elements --- substrate/client/telemetry/src/custom_telemetry.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/substrate/client/telemetry/src/custom_telemetry.rs b/substrate/client/telemetry/src/custom_telemetry.rs index e5fcce197bc3b..d22a1679d2b4f 100644 --- a/substrate/client/telemetry/src/custom_telemetry.rs +++ b/substrate/client/telemetry/src/custom_telemetry.rs @@ -87,7 +87,7 @@ impl BlockMetrics { return; }; - if lock.intervals.len() >= lock.max_interval_buffer_size { + if lock.intervals.len() >= lock.max_interval_buffer_size && lock.intervals.len() > 0 { lock.intervals.remove(0); } @@ -108,7 +108,9 @@ impl BlockMetrics { }; if is_start { - if lock.partial_intervals.len() >= lock.max_interval_buffer_size { + if lock.partial_intervals.len() >= lock.max_interval_buffer_size + && lock.partial_intervals.len() > 0 + { lock.partial_intervals.remove(0); } @@ -160,6 +162,9 @@ impl BlockMetrics { }; let metrics = std::mem::take(&mut *lock); + lock.max_interval_buffer_size = metrics.max_interval_buffer_size; + lock.max_block_request_buffer_size = metrics.max_block_request_buffer_size; + Some(metrics) } From 1c4ef5fbdd2b5058ae00d00e04cc377ed797fe9e Mon Sep 17 00:00:00 2001 From: Marko Petrlic <petrlic.marko@gmail.com> Date: Tue, 14 May 2024 17:33:25 +0200 Subject: [PATCH 4/4] Minor changes. Added tests --- .../client/telemetry/src/custom_telemetry.rs | 265 ++++++++++++++++-- 1 file changed, 245 insertions(+), 20 deletions(-) diff --git a/substrate/client/telemetry/src/custom_telemetry.rs b/substrate/client/telemetry/src/custom_telemetry.rs index d22a1679d2b4f..d10263a5b2531 100644 --- a/substrate/client/telemetry/src/custom_telemetry.rs +++ b/substrate/client/telemetry/src/custom_telemetry.rs @@ -1,3 +1,9 @@ +// Execute the following command to run the tests: +// +// cd polkadot-sdk/substrate/client/telemetry/src +// cargo test -- --test-threads 1 +// + use std::{ sync::Mutex, time::{Duration, SystemTimeError}, @@ -87,11 +93,11 @@ impl BlockMetrics { return; }; - if lock.intervals.len() >= lock.max_interval_buffer_size && lock.intervals.len() > 0 { + lock.intervals.push(value); + + if lock.intervals.len() > lock.max_interval_buffer_size { lock.intervals.remove(0); } - - lock.intervals.push(value); } /// @@ -108,12 +114,6 @@ impl BlockMetrics { }; if is_start { - if lock.partial_intervals.len() >= lock.max_interval_buffer_size - && lock.partial_intervals.len() > 0 - { - lock.partial_intervals.remove(0); - } - let value = IntervalWithBlockInformation { kind, block_number, @@ -123,6 +123,11 @@ impl BlockMetrics { }; lock.partial_intervals.push(value); + + if lock.partial_intervals.len() > lock.max_interval_buffer_size { + lock.partial_intervals.remove(0); + } + return; } @@ -148,11 +153,11 @@ impl BlockMetrics { return; }; - if lock.block_requests.len() >= lock.max_block_request_buffer_size { + lock.block_requests.push(value); + + if lock.block_requests.len() > lock.max_block_request_buffer_size { lock.block_requests.remove(0); } - - lock.block_requests.push(value); } /// @@ -183,6 +188,7 @@ impl BlockMetrics { pub mod external { use super::*; + /// #[derive(Debug, Serialize, Clone)] pub struct IntervalFromNode { /// @@ -204,6 +210,7 @@ pub mod external { pub intervals: Vec<IntervalFromNode>, } + /// pub fn prepare_data( mut value: Vec<IntervalWithBlockInformation>, ) -> Vec<BlockIntervalFromNode> { @@ -280,11 +287,30 @@ impl CustomTelemetryWorker { } } + /// pub async fn send_telemetry( &self, filter_intervals: Option<fn(Vec<BlockIntervalFromNode>) -> Vec<BlockIntervalFromNode>>, filter_block_requests: Option<fn(Vec<BlockRequestsDetail>) -> Vec<BlockRequestsDetail>>, ) { + let (block_intervals, block_requests) = + Self::get_and_filter_data(filter_intervals, filter_block_requests); + + if block_intervals.len() > 0 || block_requests.len() > 0 { + telemetry!( + self.handle; + SUBSTRATE_INFO; + "block.metrics"; + "block_intervals" => block_intervals, + "block_requests" => block_requests, + ); + } + } + + pub(crate) fn get_and_filter_data( + filter_intervals: Option<fn(Vec<BlockIntervalFromNode>) -> Vec<BlockIntervalFromNode>>, + filter_block_requests: Option<fn(Vec<BlockRequestsDetail>) -> Vec<BlockRequestsDetail>>, + ) -> (Vec<BlockIntervalFromNode>, Vec<BlockRequestsDetail>) { let metrics = BlockMetrics::take_metrics().unwrap_or_default(); let block_intervals = external::prepare_data(metrics.intervals); @@ -299,14 +325,213 @@ impl CustomTelemetryWorker { _ => block_requests, }; - if block_intervals.len() > 0 || block_requests.len() > 0 { - telemetry!( - self.handle; - SUBSTRATE_INFO; - "block.metrics"; - "block_intervals" => block_intervals, - "block_requests" => block_requests, - ); + (block_intervals, block_requests) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn dummy_interval(block_number: Option<u64>) -> IntervalWithBlockInformation { + IntervalWithBlockInformation { + kind: IntervalKind::Import, + block_number: block_number.unwrap_or(0), + block_hash: "".to_string(), + start_timestamp: 0, + end_timestamp: 0, + } + } + + fn observe_block_request(time_frame: u64) { + let value = BlockRequestsDetail { current_queue_size: 0, requests_handled: 0, time_frame }; + BlockMetrics::observe_block_request(value); + } + + fn reset_global_variable(buffer_size: usize) { + { + let mut lock = BLOCK_METRICS.lock().unwrap(); + lock.max_interval_buffer_size = buffer_size; + lock.max_block_request_buffer_size = buffer_size; + } + + _ = BlockMetrics::take_metrics().unwrap(); + } + + #[test] + fn buffer_interval_0_buffer_size() { + reset_global_variable(0); + + BlockMetrics::observe_interval(dummy_interval(None)); + let metrics = BlockMetrics::take_metrics().unwrap(); + assert_eq!(metrics.intervals.len(), 0); + } + + #[test] + fn buffer_interval_1_buffer_size() { + reset_global_variable(1); + + BlockMetrics::observe_interval(dummy_interval(Some(0))); + let metrics = BlockMetrics::take_metrics().unwrap(); + assert_eq!(metrics.intervals.len(), 1); + assert_eq!(metrics.intervals[0].block_number, 0); + + BlockMetrics::observe_interval(dummy_interval(Some(1))); + let metrics = BlockMetrics::take_metrics().unwrap(); + assert_eq!(metrics.intervals.len(), 1); + assert_eq!(metrics.intervals[0].block_number, 1); + } + + #[test] + fn mem_take_works() { + const BUFFER_SIZE: usize = 10; + reset_global_variable(BUFFER_SIZE); + + BlockMetrics::observe_interval(dummy_interval(Some(0))); + BlockMetrics::observe_interval_partial(IntervalKind::Sync, 0, "".to_string(), 0, true); + BlockMetrics::observe_interval_partial(IntervalKind::Sync, 10, "".to_string(), 0, true); + BlockMetrics::observe_interval_partial(IntervalKind::Sync, 10, "".to_string(), 0, false); + + { + let lock = BLOCK_METRICS.lock().unwrap(); + assert_eq!(lock.max_interval_buffer_size, BUFFER_SIZE); + assert_eq!(lock.intervals.len(), 2); + assert_eq!(lock.partial_intervals.len(), 1); + assert_eq!(lock.block_requests.len(), 0); + } + + let old_metrics = BlockMetrics::take_metrics().unwrap(); + assert_eq!(old_metrics.max_interval_buffer_size, BUFFER_SIZE); + assert_eq!(old_metrics.intervals.len(), 2); + assert_eq!(old_metrics.partial_intervals.len(), 1); + assert_eq!(old_metrics.block_requests.len(), 0); + + let lock = BLOCK_METRICS.lock().unwrap(); + assert_eq!(lock.max_interval_buffer_size, BUFFER_SIZE); + assert_eq!(lock.intervals.len(), 0); + assert_eq!(lock.partial_intervals.len(), 0); + assert_eq!(lock.block_requests.len(), 0); + } + + #[test] + fn buffer_partial_interval_0_buffer_size() { + reset_global_variable(0); + + BlockMetrics::observe_interval_partial(IntervalKind::Sync, 0, "".to_string(), 0, true); + let metrics = BlockMetrics::take_metrics().unwrap(); + assert_eq!(metrics.partial_intervals.len(), 0); + } + + #[test] + fn buffer_partial_interval_1_buffer_size() { + reset_global_variable(1); + + BlockMetrics::observe_interval_partial(IntervalKind::Sync, 0, "".to_string(), 0, true); + let metrics = BlockMetrics::take_metrics().unwrap(); + assert_eq!(metrics.partial_intervals.len(), 1); + assert_eq!(metrics.partial_intervals[0].block_number, 0); + + BlockMetrics::observe_interval_partial(IntervalKind::Sync, 1, "".to_string(), 0, true); + let metrics = BlockMetrics::take_metrics().unwrap(); + assert_eq!(metrics.partial_intervals.len(), 1); + assert_eq!(metrics.partial_intervals[0].block_number, 1); + } + + #[test] + fn buffer_partial_interval_works() { + reset_global_variable(1); + + BlockMetrics::observe_interval_partial(IntervalKind::Sync, 25, "".to_string(), 0, true); + { + let lock = BLOCK_METRICS.lock().unwrap(); + assert_eq!(lock.partial_intervals.len(), 1); + assert_eq!(lock.partial_intervals[0].block_number, 25); + } + + BlockMetrics::observe_interval_partial(IntervalKind::Sync, 25, "".to_string(), 0, false); + { + let lock = BLOCK_METRICS.lock().unwrap(); + assert_eq!(lock.partial_intervals.len(), 0); + assert_eq!(lock.intervals.len(), 1); + assert_eq!(lock.intervals[0].block_number, 25); } } + + #[test] + fn get_and_filter_data_works() { + reset_global_variable(10); + use IntervalKind::*; + + let setup_scenartion = || { + BlockMetrics::observe_interval_partial(Import, 1, "".to_string(), 0, true); + BlockMetrics::observe_interval_partial(Import, 1, "".to_string(), 0, false); + BlockMetrics::observe_interval_partial(Sync, 1, "".to_string(), 0, true); + BlockMetrics::observe_interval_partial(Sync, 1, "".to_string(), 0, false); + BlockMetrics::observe_interval_partial(Proposal, 2, "".to_string(), 0, true); + BlockMetrics::observe_interval_partial(Proposal, 2, "".to_string(), 0, false); + BlockMetrics::observe_interval_partial(Proposal, 3, "".to_string(), 0, true); + observe_block_request(1); + observe_block_request(2); + observe_block_request(3); + }; + + setup_scenartion(); + + let (block_intervals, block_requests) = + CustomTelemetryWorker::get_and_filter_data(None, None); + + assert_eq!(block_intervals.len(), 2); + assert_eq!(block_intervals[0].block_number, 1); + assert_eq!(block_intervals[0].block_number, 1); + assert_eq!(block_intervals[0].intervals.len(), 2); + assert_eq!(block_intervals[1].intervals.len(), 1); + + assert_eq!(block_requests.len(), 3); + assert_eq!(block_requests[0].time_frame, 1); + assert_eq!(block_requests[1].time_frame, 2); + assert_eq!(block_requests[2].time_frame, 3); + + // Second test. Filter Interval data 1 + setup_scenartion(); + + let (block_intervals, block_requests) = + CustomTelemetryWorker::get_and_filter_data(Some(no_data_interval), Some(no_data_request)); + assert_eq!(block_intervals.len(), 0); + assert_eq!(block_requests.len(), 0); + + // Third test. Filter Interval data 2 + setup_scenartion(); + + let (block_intervals, block_requests) = + CustomTelemetryWorker::get_and_filter_data(Some(one_interval), Some(one_request)); + assert_eq!(block_intervals.len(), 1); + assert_eq!(block_intervals[0].block_number, 1); + assert_eq!(block_requests.len(), 1); + assert_eq!(block_requests[0].time_frame, 1); + } + + fn no_data_interval(_: Vec<BlockIntervalFromNode>) -> Vec<BlockIntervalFromNode> { + vec![] + } + + fn no_data_request(_: Vec<BlockRequestsDetail>) -> Vec<BlockRequestsDetail> { + vec![] + } + + fn one_interval(mut value: Vec<BlockIntervalFromNode>) -> Vec<BlockIntervalFromNode> { + while value.len() > 1 { + value.pop(); + } + + value + } + + fn one_request(mut value: Vec<BlockRequestsDetail>) -> Vec<BlockRequestsDetail> { + while value.len() > 1 { + value.pop(); + } + + + value + } }