From ea3507974e6b5e4d64541dfe1f77e732ddf5c51b Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Thu, 20 Feb 2025 16:16:18 +0100 Subject: [PATCH] PP <-> Bifrost lag observability This PR improves only on the PP <-> Bifrost observability. We already collect meterics for both PP and Bifrost but I think those extra metrics can be useful - Utilization of the PP requests queue. Once full the PP will start dropping requests as 'Busy' and I thought it's important to measure how utilized this queue is - Write to read latency of records. Measures the latency between the moment a record is created until it's read by the PP Fixes #2756 --- .../network/partition_processor_rpc_client.rs | 2 + crates/types/src/net/partition_processor.rs | 1 + crates/worker/src/metric_definitions.rs | 45 +++++++-- .../src/partition/leadership/leader_state.rs | 93 ++++++++++++------- crates/worker/src/partition/leadership/mod.rs | 10 +- crates/worker/src/partition/mod.rs | 43 ++++++--- 6 files changed, 143 insertions(+), 51 deletions(-) diff --git a/crates/core/src/network/partition_processor_rpc_client.rs b/crates/core/src/network/partition_processor_rpc_client.rs index e3b8bb4910..6b2706f0b2 100644 --- a/crates/core/src/network/partition_processor_rpc_client.rs +++ b/crates/core/src/network/partition_processor_rpc_client.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use assert2::let_assert; +use restate_types::time::MillisSinceEpoch; use tracing::trace; use restate_types::identifiers::{ @@ -367,6 +368,7 @@ where request_id, partition_id, inner: inner_request, + created_at: MillisSinceEpoch::now(), }, ) .await? diff --git a/crates/types/src/net/partition_processor.rs b/crates/types/src/net/partition_processor.rs index 234d15a11c..5529ca8222 100644 --- a/crates/types/src/net/partition_processor.rs +++ b/crates/types/src/net/partition_processor.rs @@ -33,6 +33,7 @@ pub struct PartitionProcessorRpcRequest { pub request_id: PartitionProcessorRpcRequestId, pub partition_id: PartitionId, pub inner: PartitionProcessorRpcRequestInner, + pub created_at: MillisSinceEpoch, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/worker/src/metric_definitions.rs b/crates/worker/src/metric_definitions.rs index be6a44832a..20b045f384 100644 --- a/crates/worker/src/metric_definitions.rs +++ b/crates/worker/src/metric_definitions.rs @@ -12,6 +12,8 @@ /// the metrics' sink. use metrics::{describe_counter, describe_gauge, describe_histogram, Unit}; +pub const PARTITION_LABEL: &str = "partition"; + pub const PARTITION_APPLY_COMMAND: &str = "restate.partition.apply_command.seconds"; pub const PARTITION_ACTUATOR_HANDLED: &str = "restate.partition.actuator_handled.total"; pub const PARTITION_STORAGE_TX_CREATED: &str = "restate.partition.storage_tx_created.total"; @@ -27,14 +29,21 @@ pub const PARTITION_LAST_PERSISTED_LOG_LSN: &str = "restate.partition.last_persi pub const PARTITION_IS_EFFECTIVE_LEADER: &str = "restate.partition.is_effective_leader"; pub const PARTITION_IS_ACTIVE: &str = "restate.partition.is_active"; -pub const PP_APPLY_COMMAND_DURATION: &str = "restate.partition.apply_command_duration.seconds"; -pub const PP_APPLY_COMMAND_BATCH_SIZE: &str = "restate.partition.apply_command_batch_size"; +pub const PARTITION_APPLY_COMMAND_DURATION: &str = + "restate.partition.apply_command_duration.seconds"; +pub const PARTITION_APPLY_COMMAND_BATCH_SIZE: &str = "restate.partition.apply_command_batch_size"; pub const PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION: &str = "restate.partition.handle_action_batch_duration.seconds"; pub const PARTITION_HANDLE_INVOKER_EFFECT_COMMAND: &str = "restate.partition.handle_invoker_effect.seconds"; - -pub const PARTITION_LABEL: &str = "partition"; +pub const PARTITION_RPC_QUEUE_UTILIZATION_PERCENT: &str = + "restate.partition.rpc_queue.utilization.percent"; +pub const PARTITION_RECORD_WRITE_TO_READ_LATENCY_SECONDS: &str = + "restate.partition.record_write_to_read_latency.seconds"; +pub const PARTITION_REQUEST_TO_SUBMITTED_DURATION: &str = + "restate.partition.invocation_to_submitted.seconds"; +pub const PARTITION_REQUEST_TO_OUTPUT_DURATION: &str = + "restate.partition.invocation_to_output.seconds"; pub(crate) fn describe_metrics() { describe_histogram!( @@ -58,12 +67,12 @@ pub(crate) fn describe_metrics() { "Storage transactions committed by applying partition state machine commands" ); describe_histogram!( - PP_APPLY_COMMAND_DURATION, + PARTITION_APPLY_COMMAND_DURATION, Unit::Seconds, "Time spent processing a single bifrost message" ); describe_histogram!( - PP_APPLY_COMMAND_BATCH_SIZE, + PARTITION_APPLY_COMMAND_BATCH_SIZE, Unit::Count, "Size of the applied command batch" ); @@ -83,6 +92,24 @@ pub(crate) fn describe_metrics() { "Time spent handling an invoker effect command" ); + describe_histogram!( + PARTITION_RECORD_WRITE_TO_READ_LATENCY_SECONDS, + Unit::Seconds, + "Duration between the time a record was first created until it was read for processing by the partition processor" + ); + + describe_histogram!( + PARTITION_REQUEST_TO_SUBMITTED_DURATION, + Unit::Seconds, + "Duration between the time an ingress request was first created until it was submitted" + ); + + describe_histogram!( + PARTITION_REQUEST_TO_OUTPUT_DURATION, + Unit::Seconds, + "Duration between the time an ingress request was first created until it produced an output" + ); + describe_gauge!( NUM_ACTIVE_PARTITIONS, Unit::Count, @@ -124,4 +151,10 @@ pub(crate) fn describe_metrics() { Unit::Seconds, "Number of seconds since the last record was applied" ); + + describe_gauge!( + PARTITION_RPC_QUEUE_UTILIZATION_PERCENT, + Unit::Percent, + "Percent of PP requests queue utilization, 0 to 100%" + ) } diff --git a/crates/worker/src/partition/leadership/leader_state.rs b/crates/worker/src/partition/leadership/leader_state.rs index b2e8f216b5..aebfbdc28a 100644 --- a/crates/worker/src/partition/leadership/leader_state.rs +++ b/crates/worker/src/partition/leadership/leader_state.rs @@ -8,7 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::metric_definitions::{PARTITION_ACTUATOR_HANDLED, PARTITION_HANDLE_LEADER_ACTIONS}; +use crate::metric_definitions::{ + PARTITION_ACTUATOR_HANDLED, PARTITION_HANDLE_LEADER_ACTIONS, PARTITION_LABEL, + PARTITION_REQUEST_TO_OUTPUT_DURATION, PARTITION_REQUEST_TO_SUBMITTED_DURATION, +}; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::leadership::self_proposer::SelfProposer; use crate::partition::leadership::{ActionEffect, Error, InvokerStream, TimerService}; @@ -18,7 +21,7 @@ use crate::partition::{respond_to_rpc, shuffle}; use futures::future::OptionFuture; use futures::stream::FuturesUnordered; use futures::{stream, FutureExt, StreamExt}; -use metrics::{counter, Counter}; +use metrics::{counter, histogram, Counter, Histogram}; use restate_bifrost::CommitToken; use restate_core::network::Reciprocal; use restate_core::{TaskCenter, TaskHandle, TaskId}; @@ -46,6 +49,30 @@ use tracing::{debug, trace}; const BATCH_READY_UP_TO: usize = 10; +struct RpcReciprocal { + inner: Reciprocal>, + // only used for metrics + request_created_at: MillisSinceEpoch, +} + +impl RpcReciprocal { + fn new( + request_created_at: MillisSinceEpoch, + reciprocal: Reciprocal>, + ) -> Self { + Self { + request_created_at, + inner: reciprocal, + } + } + + fn into_inner( + self, + ) -> Reciprocal> { + self.inner + } +} + pub struct LeaderState { partition_id: PartitionId, pub leader_epoch: LeaderEpoch, @@ -61,16 +88,16 @@ pub struct LeaderState { pub timer_service: Pin>, self_proposer: SelfProposer, - pub awaiting_rpc_actions: HashMap< - PartitionProcessorRpcRequestId, - Reciprocal>, - >, + awaiting_rpc_actions: HashMap, awaiting_rpc_self_propose: FuturesUnordered, invoker_stream: InvokerStream, shuffle_stream: ReceiverStream, pub pending_cleanup_timers_to_schedule: VecDeque<(InvocationId, Duration)>, cleaner_task_id: TaskId, + + submitted_histogram: Histogram, + output_histogram: Histogram, } impl LeaderState { @@ -102,6 +129,9 @@ impl LeaderState { invoker_stream: invoker_rx, shuffle_stream: ReceiverStream::new(shuffle_rx), pending_cleanup_timers_to_schedule: Default::default(), + + submitted_histogram: histogram!(PARTITION_REQUEST_TO_SUBMITTED_DURATION, PARTITION_LABEL=>partition_id.to_string()), + output_histogram: histogram!(PARTITION_REQUEST_TO_OUTPUT_DURATION, PARTITION_LABEL=>partition_id.to_string()), } } @@ -209,11 +239,9 @@ impl LeaderState { %request_id, "Failing rpc because I lost leadership", ); - respond_to_rpc( - reciprocal.prepare(Err(PartitionProcessorRpcError::LostLeadership( - self.partition_id, - ))), - ); + respond_to_rpc(reciprocal.into_inner().prepare(Err( + PartitionProcessorRpcError::LostLeadership(self.partition_id), + ))); } for fut in self.awaiting_rpc_self_propose.iter_mut() { fut.fail_with_lost_leadership(self.partition_id); @@ -277,19 +305,17 @@ impl LeaderState { reciprocal: Reciprocal>, partition_key: PartitionKey, cmd: Command, + request_created_at: MillisSinceEpoch, ) { match self.awaiting_rpc_actions.entry(request_id) { - Entry::Occupied(o) => { + Entry::Occupied(mut o) => { // In this case, someone already proposed this command, // let's just replace the reciprocal and fail the old one to avoid keeping it dangling - let old_reciprocal = o.remove(); + let old_reciprocal = o.insert(RpcReciprocal::new(request_created_at, reciprocal)); trace!(%request_id, "Replacing rpc with newer request"); - respond_to_rpc( - old_reciprocal.prepare(Err(PartitionProcessorRpcError::Internal( - "retried".to_string(), - ))), - ); - self.awaiting_rpc_actions.insert(request_id, reciprocal); + respond_to_rpc(old_reciprocal.into_inner().prepare(Err( + PartitionProcessorRpcError::Internal("retried".to_string()), + ))); } Entry::Vacant(v) => { // In this case, no one proposed this command yet, let's try to propose it @@ -299,7 +325,7 @@ impl LeaderState { .prepare(Err(PartitionProcessorRpcError::Internal(e.to_string()))), ); } else { - v.insert(reciprocal); + v.insert(RpcReciprocal::new(request_created_at, reciprocal)); } } } @@ -407,16 +433,16 @@ impl LeaderState { .. } => { if let Some(response_tx) = self.awaiting_rpc_actions.remove(&request_id) { - respond_to_rpc( - response_tx.prepare(Ok(PartitionProcessorRpcResponse::Output( - InvocationOutput { - request_id, - invocation_id, - completion_expiry_time, - response, - }, - ))), - ); + self.output_histogram + .record(response_tx.request_created_at.elapsed()); + respond_to_rpc(response_tx.into_inner().prepare(Ok( + PartitionProcessorRpcResponse::Output(InvocationOutput { + request_id, + invocation_id, + completion_expiry_time, + response, + }), + ))); } else { debug!(%request_id, "Ignoring sending ingress response because there is no awaiting rpc"); } @@ -428,7 +454,10 @@ impl LeaderState { .. } => { if let Some(response_tx) = self.awaiting_rpc_actions.remove(&request_id) { - respond_to_rpc(response_tx.prepare(Ok( + self.submitted_histogram + .record(response_tx.request_created_at.elapsed()); + + respond_to_rpc(response_tx.into_inner().prepare(Ok( PartitionProcessorRpcResponse::Submitted(SubmittedInvocationNotification { request_id, execution_time, @@ -507,7 +536,7 @@ impl Future for SelfAppendFuture { let append_result = ready!(self.commit_token.poll_unpin(cx)); if append_result.is_err() { - self.get_mut().fail_with_internal(); + self.fail_with_internal(); return Poll::Ready(()); } self.succeed_with_appended(); diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 6d8dc48423..a01b1c8f4f 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -18,6 +18,7 @@ use std::ops::RangeInclusive; use std::time::Duration; use futures::{stream, StreamExt, TryStreamExt}; +use restate_types::time::MillisSinceEpoch; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, instrument, warn}; @@ -522,6 +523,7 @@ where reciprocal: Reciprocal>, partition_key: PartitionKey, cmd: Command, + request_created_at: MillisSinceEpoch, ) { match &mut self.state { State::Follower | State::Candidate { .. } => { @@ -534,7 +536,13 @@ where } State::Leader(leader_state) => { leader_state - .handle_rpc_proposal_command(request_id, reciprocal, partition_key, cmd) + .handle_rpc_proposal_command( + request_id, + reciprocal, + partition_key, + cmd, + request_created_at, + ) .await; } } diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 006ef722a1..e422d71082 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -17,7 +17,7 @@ use anyhow::Context; use assert2::let_assert; use enumset::EnumSet; use futures::{FutureExt, Stream, StreamExt, TryStreamExt as _}; -use metrics::histogram; +use metrics::{gauge, histogram}; use tokio::sync::{mpsc, watch}; use tokio::time::MissedTickBehavior; use tracing::{debug, error, info, instrument, trace, warn, Span}; @@ -65,8 +65,9 @@ use restate_wal_protocol::control::AnnounceLeader; use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; use crate::metric_definitions::{ - PARTITION_LABEL, PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION, PP_APPLY_COMMAND_BATCH_SIZE, - PP_APPLY_COMMAND_DURATION, + PARTITION_APPLY_COMMAND_BATCH_SIZE, PARTITION_APPLY_COMMAND_DURATION, PARTITION_LABEL, + PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION, PARTITION_RECORD_WRITE_TO_READ_LATENCY_SECONDS, + PARTITION_RPC_QUEUE_UTILIZATION_PERCENT, }; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::leadership::{LeadershipState, PartitionProcessorMetadata}; @@ -367,6 +368,18 @@ where ); } + // Telemetry setup + let partition_id_str: &'static str = Box::leak(Box::new(self.partition_id.to_string())); + let apply_command_latency = + histogram!(PARTITION_APPLY_COMMAND_DURATION, PARTITION_LABEL => partition_id_str); + let record_actions_latency = histogram!(PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION); + let record_write_to_read_latencty = histogram!(PARTITION_RECORD_WRITE_TO_READ_LATENCY_SECONDS, PARTITION_LABEL => partition_id_str); + let command_batch_size = + histogram!(PARTITION_APPLY_COMMAND_BATCH_SIZE, PARTITION_LABEL => partition_id_str); + + let rpc_queue_utilization = + gauge!(PARTITION_RPC_QUEUE_UTILIZATION_PERCENT, PARTITION_LABEL => partition_id_str); + // Start reading after the last applied lsn let key_query = KeyFilter::Within(self.partition_key_range.clone()); let mut record_stream = self @@ -382,6 +395,9 @@ where trace!(?entry, "Read entry"); let lsn = entry.sequence_number(); if entry.is_data_record() { + entry.as_record().inspect(|record| { + record_write_to_read_latencty.record(record.created_at().elapsed()); + }); entry .try_decode_arc::() .map(|envelope| Ok((lsn, envelope?))) @@ -411,20 +427,17 @@ where tokio::time::interval(Duration::from_millis(500 + rand::random::() % 524)); status_update_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); - let partition_id_str: &'static str = Box::leak(Box::new(self.partition_id.to_string())); - // Telemetry setup - let apply_command_latency = - histogram!(PP_APPLY_COMMAND_DURATION, PARTITION_LABEL => partition_id_str); - let record_actions_latency = histogram!(PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION); - let command_batch_size = - histogram!(PP_APPLY_COMMAND_BATCH_SIZE, PARTITION_LABEL => partition_id_str); - let mut action_collector = ActionCollector::default(); let mut command_buffer = Vec::with_capacity(self.max_command_batch_size); info!("Partition {} started", self.partition_id); loop { + let utilization = + (self.rpc_rx.len() as f64 * 100.0) / self.rpc_rx.max_capacity() as f64; + + rpc_queue_utilization.set(utilization); + tokio::select! { Some(command) = self.control_rx.recv() => { if let Err(err) = self.on_command(command).await { @@ -553,7 +566,10 @@ where let ( response_tx, PartitionProcessorRpcRequest { - request_id, inner, .. + request_id, + inner, + created_at, + .. }, ) = rpc.split(); match inner { @@ -591,6 +607,7 @@ where response_tx, service_invocation.partition_key(), Command::Invoke(service_invocation), + created_at, ) .await } @@ -611,6 +628,7 @@ where response_tx, service_invocation.partition_key(), Command::Invoke(service_invocation), + created_at, ) .await } @@ -641,6 +659,7 @@ where block_on_inflight: true, response_sink: ServiceInvocationResponseSink::Ingress { request_id }, }), + created_at, ) .await }