Skip to content

Commit

Permalink
PP <-> Bifrost lag observability
Browse files Browse the repository at this point in the history
This PR improves only on the PP <-> Bifrost observability.
We already collect meterics for both PP and Bifrost but I think
those extra metrics can be useful

- Utilization of the PP requests queue. Once full
the PP will start dropping requests as 'Busy' and I thought
it's important to measure how utilized this queue is

- Write to read latency of records. Measures the
latency between the moment a record is created until
it's read by the PP

Fixes restatedev#2756
  • Loading branch information
muhamadazmy committed Feb 20, 2025
1 parent 739b130 commit ea35079
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 51 deletions.
2 changes: 2 additions & 0 deletions crates/core/src/network/partition_processor_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

use assert2::let_assert;
use restate_types::time::MillisSinceEpoch;
use tracing::trace;

use restate_types::identifiers::{
Expand Down Expand Up @@ -367,6 +368,7 @@ where
request_id,
partition_id,
inner: inner_request,
created_at: MillisSinceEpoch::now(),
},
)
.await?
Expand Down
1 change: 1 addition & 0 deletions crates/types/src/net/partition_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct PartitionProcessorRpcRequest {
pub request_id: PartitionProcessorRpcRequestId,
pub partition_id: PartitionId,
pub inner: PartitionProcessorRpcRequestInner,
pub created_at: MillisSinceEpoch,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
45 changes: 39 additions & 6 deletions crates/worker/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
/// the metrics' sink.
use metrics::{describe_counter, describe_gauge, describe_histogram, Unit};

pub const PARTITION_LABEL: &str = "partition";

pub const PARTITION_APPLY_COMMAND: &str = "restate.partition.apply_command.seconds";
pub const PARTITION_ACTUATOR_HANDLED: &str = "restate.partition.actuator_handled.total";
pub const PARTITION_STORAGE_TX_CREATED: &str = "restate.partition.storage_tx_created.total";
Expand All @@ -27,14 +29,21 @@ pub const PARTITION_LAST_PERSISTED_LOG_LSN: &str = "restate.partition.last_persi
pub const PARTITION_IS_EFFECTIVE_LEADER: &str = "restate.partition.is_effective_leader";
pub const PARTITION_IS_ACTIVE: &str = "restate.partition.is_active";

pub const PP_APPLY_COMMAND_DURATION: &str = "restate.partition.apply_command_duration.seconds";
pub const PP_APPLY_COMMAND_BATCH_SIZE: &str = "restate.partition.apply_command_batch_size";
pub const PARTITION_APPLY_COMMAND_DURATION: &str =
"restate.partition.apply_command_duration.seconds";
pub const PARTITION_APPLY_COMMAND_BATCH_SIZE: &str = "restate.partition.apply_command_batch_size";
pub const PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION: &str =
"restate.partition.handle_action_batch_duration.seconds";
pub const PARTITION_HANDLE_INVOKER_EFFECT_COMMAND: &str =
"restate.partition.handle_invoker_effect.seconds";

pub const PARTITION_LABEL: &str = "partition";
pub const PARTITION_RPC_QUEUE_UTILIZATION_PERCENT: &str =
"restate.partition.rpc_queue.utilization.percent";
pub const PARTITION_RECORD_WRITE_TO_READ_LATENCY_SECONDS: &str =
"restate.partition.record_write_to_read_latency.seconds";
pub const PARTITION_REQUEST_TO_SUBMITTED_DURATION: &str =
"restate.partition.invocation_to_submitted.seconds";
pub const PARTITION_REQUEST_TO_OUTPUT_DURATION: &str =
"restate.partition.invocation_to_output.seconds";

pub(crate) fn describe_metrics() {
describe_histogram!(
Expand All @@ -58,12 +67,12 @@ pub(crate) fn describe_metrics() {
"Storage transactions committed by applying partition state machine commands"
);
describe_histogram!(
PP_APPLY_COMMAND_DURATION,
PARTITION_APPLY_COMMAND_DURATION,
Unit::Seconds,
"Time spent processing a single bifrost message"
);
describe_histogram!(
PP_APPLY_COMMAND_BATCH_SIZE,
PARTITION_APPLY_COMMAND_BATCH_SIZE,
Unit::Count,
"Size of the applied command batch"
);
Expand All @@ -83,6 +92,24 @@ pub(crate) fn describe_metrics() {
"Time spent handling an invoker effect command"
);

describe_histogram!(
PARTITION_RECORD_WRITE_TO_READ_LATENCY_SECONDS,
Unit::Seconds,
"Duration between the time a record was first created until it was read for processing by the partition processor"
);

describe_histogram!(
PARTITION_REQUEST_TO_SUBMITTED_DURATION,
Unit::Seconds,
"Duration between the time an ingress request was first created until it was submitted"
);

describe_histogram!(
PARTITION_REQUEST_TO_OUTPUT_DURATION,
Unit::Seconds,
"Duration between the time an ingress request was first created until it produced an output"
);

describe_gauge!(
NUM_ACTIVE_PARTITIONS,
Unit::Count,
Expand Down Expand Up @@ -124,4 +151,10 @@ pub(crate) fn describe_metrics() {
Unit::Seconds,
"Number of seconds since the last record was applied"
);

describe_gauge!(
PARTITION_RPC_QUEUE_UTILIZATION_PERCENT,
Unit::Percent,
"Percent of PP requests queue utilization, 0 to 100%"
)
}
93 changes: 61 additions & 32 deletions crates/worker/src/partition/leadership/leader_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::metric_definitions::{PARTITION_ACTUATOR_HANDLED, PARTITION_HANDLE_LEADER_ACTIONS};
use crate::metric_definitions::{
PARTITION_ACTUATOR_HANDLED, PARTITION_HANDLE_LEADER_ACTIONS, PARTITION_LABEL,
PARTITION_REQUEST_TO_OUTPUT_DURATION, PARTITION_REQUEST_TO_SUBMITTED_DURATION,
};
use crate::partition::invoker_storage_reader::InvokerStorageReader;
use crate::partition::leadership::self_proposer::SelfProposer;
use crate::partition::leadership::{ActionEffect, Error, InvokerStream, TimerService};
Expand All @@ -18,7 +21,7 @@ use crate::partition::{respond_to_rpc, shuffle};
use futures::future::OptionFuture;
use futures::stream::FuturesUnordered;
use futures::{stream, FutureExt, StreamExt};
use metrics::{counter, Counter};
use metrics::{counter, histogram, Counter, Histogram};
use restate_bifrost::CommitToken;
use restate_core::network::Reciprocal;
use restate_core::{TaskCenter, TaskHandle, TaskId};
Expand Down Expand Up @@ -46,6 +49,30 @@ use tracing::{debug, trace};

const BATCH_READY_UP_TO: usize = 10;

struct RpcReciprocal {
inner: Reciprocal<Result<PartitionProcessorRpcResponse, PartitionProcessorRpcError>>,
// only used for metrics
request_created_at: MillisSinceEpoch,
}

impl RpcReciprocal {
fn new(
request_created_at: MillisSinceEpoch,
reciprocal: Reciprocal<Result<PartitionProcessorRpcResponse, PartitionProcessorRpcError>>,
) -> Self {
Self {
request_created_at,
inner: reciprocal,
}
}

fn into_inner(
self,
) -> Reciprocal<Result<PartitionProcessorRpcResponse, PartitionProcessorRpcError>> {
self.inner
}
}

pub struct LeaderState {
partition_id: PartitionId,
pub leader_epoch: LeaderEpoch,
Expand All @@ -61,16 +88,16 @@ pub struct LeaderState {
pub timer_service: Pin<Box<TimerService>>,
self_proposer: SelfProposer,

pub awaiting_rpc_actions: HashMap<
PartitionProcessorRpcRequestId,
Reciprocal<Result<PartitionProcessorRpcResponse, PartitionProcessorRpcError>>,
>,
awaiting_rpc_actions: HashMap<PartitionProcessorRpcRequestId, RpcReciprocal>,
awaiting_rpc_self_propose: FuturesUnordered<SelfAppendFuture>,

invoker_stream: InvokerStream,
shuffle_stream: ReceiverStream<shuffle::OutboxTruncation>,
pub pending_cleanup_timers_to_schedule: VecDeque<(InvocationId, Duration)>,
cleaner_task_id: TaskId,

submitted_histogram: Histogram,
output_histogram: Histogram,
}

impl LeaderState {
Expand Down Expand Up @@ -102,6 +129,9 @@ impl LeaderState {
invoker_stream: invoker_rx,
shuffle_stream: ReceiverStream::new(shuffle_rx),
pending_cleanup_timers_to_schedule: Default::default(),

submitted_histogram: histogram!(PARTITION_REQUEST_TO_SUBMITTED_DURATION, PARTITION_LABEL=>partition_id.to_string()),
output_histogram: histogram!(PARTITION_REQUEST_TO_OUTPUT_DURATION, PARTITION_LABEL=>partition_id.to_string()),
}
}

Expand Down Expand Up @@ -209,11 +239,9 @@ impl LeaderState {
%request_id,
"Failing rpc because I lost leadership",
);
respond_to_rpc(
reciprocal.prepare(Err(PartitionProcessorRpcError::LostLeadership(
self.partition_id,
))),
);
respond_to_rpc(reciprocal.into_inner().prepare(Err(
PartitionProcessorRpcError::LostLeadership(self.partition_id),
)));
}
for fut in self.awaiting_rpc_self_propose.iter_mut() {
fut.fail_with_lost_leadership(self.partition_id);
Expand Down Expand Up @@ -277,19 +305,17 @@ impl LeaderState {
reciprocal: Reciprocal<Result<PartitionProcessorRpcResponse, PartitionProcessorRpcError>>,
partition_key: PartitionKey,
cmd: Command,
request_created_at: MillisSinceEpoch,
) {
match self.awaiting_rpc_actions.entry(request_id) {
Entry::Occupied(o) => {
Entry::Occupied(mut o) => {
// In this case, someone already proposed this command,
// let's just replace the reciprocal and fail the old one to avoid keeping it dangling
let old_reciprocal = o.remove();
let old_reciprocal = o.insert(RpcReciprocal::new(request_created_at, reciprocal));
trace!(%request_id, "Replacing rpc with newer request");
respond_to_rpc(
old_reciprocal.prepare(Err(PartitionProcessorRpcError::Internal(
"retried".to_string(),
))),
);
self.awaiting_rpc_actions.insert(request_id, reciprocal);
respond_to_rpc(old_reciprocal.into_inner().prepare(Err(
PartitionProcessorRpcError::Internal("retried".to_string()),
)));
}
Entry::Vacant(v) => {
// In this case, no one proposed this command yet, let's try to propose it
Expand All @@ -299,7 +325,7 @@ impl LeaderState {
.prepare(Err(PartitionProcessorRpcError::Internal(e.to_string()))),
);
} else {
v.insert(reciprocal);
v.insert(RpcReciprocal::new(request_created_at, reciprocal));
}
}
}
Expand Down Expand Up @@ -407,16 +433,16 @@ impl LeaderState {
..
} => {
if let Some(response_tx) = self.awaiting_rpc_actions.remove(&request_id) {
respond_to_rpc(
response_tx.prepare(Ok(PartitionProcessorRpcResponse::Output(
InvocationOutput {
request_id,
invocation_id,
completion_expiry_time,
response,
},
))),
);
self.output_histogram
.record(response_tx.request_created_at.elapsed());
respond_to_rpc(response_tx.into_inner().prepare(Ok(
PartitionProcessorRpcResponse::Output(InvocationOutput {
request_id,
invocation_id,
completion_expiry_time,
response,
}),
)));
} else {
debug!(%request_id, "Ignoring sending ingress response because there is no awaiting rpc");
}
Expand All @@ -428,7 +454,10 @@ impl LeaderState {
..
} => {
if let Some(response_tx) = self.awaiting_rpc_actions.remove(&request_id) {
respond_to_rpc(response_tx.prepare(Ok(
self.submitted_histogram
.record(response_tx.request_created_at.elapsed());

respond_to_rpc(response_tx.into_inner().prepare(Ok(
PartitionProcessorRpcResponse::Submitted(SubmittedInvocationNotification {
request_id,
execution_time,
Expand Down Expand Up @@ -507,7 +536,7 @@ impl Future for SelfAppendFuture {
let append_result = ready!(self.commit_token.poll_unpin(cx));

if append_result.is_err() {
self.get_mut().fail_with_internal();
self.fail_with_internal();
return Poll::Ready(());
}
self.succeed_with_appended();
Expand Down
10 changes: 9 additions & 1 deletion crates/worker/src/partition/leadership/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::ops::RangeInclusive;
use std::time::Duration;

use futures::{stream, StreamExt, TryStreamExt};
use restate_types::time::MillisSinceEpoch;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, instrument, warn};
Expand Down Expand Up @@ -522,6 +523,7 @@ where
reciprocal: Reciprocal<Result<PartitionProcessorRpcResponse, PartitionProcessorRpcError>>,
partition_key: PartitionKey,
cmd: Command,
request_created_at: MillisSinceEpoch,
) {
match &mut self.state {
State::Follower | State::Candidate { .. } => {
Expand All @@ -534,7 +536,13 @@ where
}
State::Leader(leader_state) => {
leader_state
.handle_rpc_proposal_command(request_id, reciprocal, partition_key, cmd)
.handle_rpc_proposal_command(
request_id,
reciprocal,
partition_key,
cmd,
request_created_at,
)
.await;
}
}
Expand Down
Loading

0 comments on commit ea35079

Please sign in to comment.