From 40636326fc75318a88cb790faa51e5fb006fc66a Mon Sep 17 00:00:00 2001 From: Graham King Date: Thu, 14 Dec 2023 15:06:21 -0500 Subject: [PATCH 1/2] fix(subscriber): Don't save poll_ops if no-one is receiving them Do not record poll_ops if there are no current connected clients (watchers). Without this `Aggregator::poll_ops` would grow forever. Follow up to https://github.com/tokio-rs/console/pull/311 and fix for these two: - https://github.com/tokio-rs/console/issues/184 - https://github.com/tokio-rs/console/pull/500 --- console-subscriber/src/aggregator/mod.rs | 31 +++++++++++++----------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index 88d89ca1f..93b93fe29 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -1,12 +1,3 @@ -use super::{Command, Event, Shared, Watch}; -use crate::{ - stats::{self, Unsent}, - ToProto, WatchRequest, -}; -use console_api as proto; -use proto::resources::resource; -use tokio::sync::{mpsc, Notify}; - use std::{ sync::{ atomic::{AtomicBool, Ordering::*}, @@ -14,8 +5,18 @@ use std::{ }, time::{Duration, Instant}, }; + +use console_api as proto; +use proto::resources::resource; +use tokio::sync::{mpsc, Notify}; use tracing_core::{span::Id, Metadata}; +use super::{Command, Event, Shared, Watch}; +use crate::{ + stats::{self, Unsent}, + ToProto, WatchRequest, +}; + mod id_data; mod shrink; use self::id_data::{IdData, Include}; @@ -269,6 +270,9 @@ impl Aggregator { .drop_closed(&mut self.resource_stats, now, self.retention, has_watchers); self.async_ops .drop_closed(&mut self.async_op_stats, now, self.retention, has_watchers); + if !has_watchers && !self.poll_ops.is_empty() { + self.poll_ops.clear(); + } } /// Add the task subscription to the watchers after sending the first update @@ -305,14 +309,10 @@ impl Aggregator { } fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate { - let new_poll_ops = match include { - Include::All => self.poll_ops.clone(), - Include::UpdatedOnly => std::mem::take(&mut self.poll_ops), - }; proto::resources::ResourceUpdate { new_resources: self.resources.as_proto_list(include, &self.base_time), stats_update: self.resource_stats.as_proto(include, &self.base_time), - new_poll_ops, + new_poll_ops: std::mem::take(&mut self.poll_ops), dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64, } } @@ -472,6 +472,9 @@ impl Aggregator { task_id, is_ready, } => { + if self.watchers.is_empty() { + return; + } let poll_op = proto::resources::PollOp { metadata: Some(metadata.into()), resource_id: Some(resource_id.into()), From edbce56e6b22c85f0ab38c3553288103aa3d2beb Mon Sep 17 00:00:00 2001 From: Graham King Date: Sat, 16 Dec 2023 15:35:11 -0500 Subject: [PATCH 2/2] Include feedback --- console-subscriber/src/aggregator/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index 93b93fe29..e3a1eb62a 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -270,7 +270,7 @@ impl Aggregator { .drop_closed(&mut self.resource_stats, now, self.retention, has_watchers); self.async_ops .drop_closed(&mut self.async_op_stats, now, self.retention, has_watchers); - if !has_watchers && !self.poll_ops.is_empty() { + if !has_watchers { self.poll_ops.clear(); } } @@ -472,6 +472,7 @@ impl Aggregator { task_id, is_ready, } => { + // CLI doesn't show historical poll ops, so don't save them if no-one is watching if self.watchers.is_empty() { return; }