Skip to content

Commit

Permalink
PP <-> Bifrost lag observability
Browse files Browse the repository at this point in the history
This PR improves only on the PP <-> Bifrost observability.
We already collect meterics for both PP and Bifrost but I think
those extra metrics can be useful

- Utilization of the PP requests queue. Once full
the PP will start dropping requests as 'Busy' and I thought
it's important to measure how utilized this queue is

- Write to read latency of records. Measures the
latency between the moment a record is created until
it's read by the PP

Fixes restatedev#2756
  • Loading branch information
muhamadazmy committed Feb 21, 2025
1 parent 739b130 commit 8854a68
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl PeriodicTailChecker {
"Started a background periodic tail checker for this loglet",
);
// Optimization. Don't run the check if the tail/seal has been updated recently.
// Unfortunately this requires a litte bit more setup in the TailOffsetWatch so we don't do
// Unfortunately this requires a little bit more setup in the TailOffsetWatch so we don't do
// it.
loop {
let Some(loglet) = loglet.upgrade() else {
Expand Down
36 changes: 30 additions & 6 deletions crates/worker/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
/// the metrics' sink.
use metrics::{describe_counter, describe_gauge, describe_histogram, Unit};

pub const PARTITION_LABEL: &str = "partition";

pub const PARTITION_APPLY_COMMAND: &str = "restate.partition.apply_command.seconds";
pub const PARTITION_ACTUATOR_HANDLED: &str = "restate.partition.actuator_handled.total";
pub const PARTITION_STORAGE_TX_CREATED: &str = "restate.partition.storage_tx_created.total";
Expand All @@ -23,18 +25,22 @@ pub const PARTITION_TIME_SINCE_LAST_STATUS_UPDATE: &str =
"restate.partition.time_since_last_status_update";
pub const PARTITION_TIME_SINCE_LAST_RECORD: &str = "restate.partition.time_since_last_record";
pub const PARTITION_LAST_APPLIED_LOG_LSN: &str = "restate.partition.last_applied_lsn";
pub const PARTITION_LAST_APPLIED_LSN_LAG: &str = "restate.partition.applied_lsn_lag";
pub const PARTITION_LAST_PERSISTED_LOG_LSN: &str = "restate.partition.last_persisted_lsn";
pub const PARTITION_IS_EFFECTIVE_LEADER: &str = "restate.partition.is_effective_leader";
pub const PARTITION_IS_ACTIVE: &str = "restate.partition.is_active";

pub const PP_APPLY_COMMAND_DURATION: &str = "restate.partition.apply_command_duration.seconds";
pub const PP_APPLY_COMMAND_BATCH_SIZE: &str = "restate.partition.apply_command_batch_size";
pub const PARTITION_APPLY_COMMAND_DURATION: &str =
"restate.partition.apply_command_duration.seconds";
pub const PARTITION_APPLY_COMMAND_BATCH_SIZE: &str = "restate.partition.apply_command_batch_size";
pub const PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION: &str =
"restate.partition.handle_action_batch_duration.seconds";
pub const PARTITION_HANDLE_INVOKER_EFFECT_COMMAND: &str =
"restate.partition.handle_invoker_effect.seconds";

pub const PARTITION_LABEL: &str = "partition";
pub const PARTITION_RPC_QUEUE_UTILIZATION_PERCENT: &str =
"restate.partition.rpc_queue.utilization.percent";
pub const PARTITION_RECORD_WRITE_TO_READ_LATENCY_SECONDS: &str =
"restate.partition.record_write_to_read_latency.seconds";

pub(crate) fn describe_metrics() {
describe_histogram!(
Expand All @@ -58,12 +64,12 @@ pub(crate) fn describe_metrics() {
"Storage transactions committed by applying partition state machine commands"
);
describe_histogram!(
PP_APPLY_COMMAND_DURATION,
PARTITION_APPLY_COMMAND_DURATION,
Unit::Seconds,
"Time spent processing a single bifrost message"
);
describe_histogram!(
PP_APPLY_COMMAND_BATCH_SIZE,
PARTITION_APPLY_COMMAND_BATCH_SIZE,
Unit::Count,
"Size of the applied command batch"
);
Expand All @@ -83,6 +89,12 @@ pub(crate) fn describe_metrics() {
"Time spent handling an invoker effect command"
);

describe_histogram!(
PARTITION_RECORD_WRITE_TO_READ_LATENCY_SECONDS,
Unit::Seconds,
"Duration between the time a record was first created until it was read for processing by the partition processor"
);

describe_gauge!(
NUM_ACTIVE_PARTITIONS,
Unit::Count,
Expand Down Expand Up @@ -113,6 +125,12 @@ pub(crate) fn describe_metrics() {
"Raw value of the last applied log LSN"
);

describe_gauge!(
PARTITION_LAST_APPLIED_LSN_LAG,
Unit::Count,
"Number of records between last applied lsn and the log tail"
);

describe_gauge!(
PARTITION_LAST_PERSISTED_LOG_LSN,
Unit::Count,
Expand All @@ -124,4 +142,10 @@ pub(crate) fn describe_metrics() {
Unit::Seconds,
"Number of seconds since the last record was applied"
);

describe_gauge!(
PARTITION_RPC_QUEUE_UTILIZATION_PERCENT,
Unit::Percent,
"Percent of PP requests queue utilization, 0 to 100%"
)
}
14 changes: 6 additions & 8 deletions crates/worker/src/partition/leadership/leader_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ use tracing::{debug, trace};

const BATCH_READY_UP_TO: usize = 10;

type RpcReciprocal = Reciprocal<Result<PartitionProcessorRpcResponse, PartitionProcessorRpcError>>;

pub struct LeaderState {
partition_id: PartitionId,
pub leader_epoch: LeaderEpoch,
Expand All @@ -61,10 +63,7 @@ pub struct LeaderState {
pub timer_service: Pin<Box<TimerService>>,
self_proposer: SelfProposer,

pub awaiting_rpc_actions: HashMap<
PartitionProcessorRpcRequestId,
Reciprocal<Result<PartitionProcessorRpcResponse, PartitionProcessorRpcError>>,
>,
awaiting_rpc_actions: HashMap<PartitionProcessorRpcRequestId, RpcReciprocal>,
awaiting_rpc_self_propose: FuturesUnordered<SelfAppendFuture>,

invoker_stream: InvokerStream,
Expand Down Expand Up @@ -279,17 +278,16 @@ impl LeaderState {
cmd: Command,
) {
match self.awaiting_rpc_actions.entry(request_id) {
Entry::Occupied(o) => {
Entry::Occupied(mut o) => {
// In this case, someone already proposed this command,
// let's just replace the reciprocal and fail the old one to avoid keeping it dangling
let old_reciprocal = o.remove();
let old_reciprocal = o.insert(reciprocal);
trace!(%request_id, "Replacing rpc with newer request");
respond_to_rpc(
old_reciprocal.prepare(Err(PartitionProcessorRpcError::Internal(
"retried".to_string(),
))),
);
self.awaiting_rpc_actions.insert(request_id, reciprocal);
}
Entry::Vacant(v) => {
// In this case, no one proposed this command yet, let's try to propose it
Expand Down Expand Up @@ -507,7 +505,7 @@ impl Future for SelfAppendFuture {
let append_result = ready!(self.commit_token.poll_unpin(cx));

if append_result.is_err() {
self.get_mut().fail_with_internal();
self.fail_with_internal();
return Poll::Ready(());
}
self.succeed_with_appended();
Expand Down
34 changes: 23 additions & 11 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use anyhow::Context;
use assert2::let_assert;
use enumset::EnumSet;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt as _};
use metrics::histogram;
use metrics::{gauge, histogram};
use tokio::sync::{mpsc, watch};
use tokio::time::MissedTickBehavior;
use tracing::{debug, error, info, instrument, trace, warn, Span};
Expand Down Expand Up @@ -65,8 +65,9 @@ use restate_wal_protocol::control::AnnounceLeader;
use restate_wal_protocol::{Command, Destination, Envelope, Header, Source};

use crate::metric_definitions::{
PARTITION_LABEL, PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION, PP_APPLY_COMMAND_BATCH_SIZE,
PP_APPLY_COMMAND_DURATION,
PARTITION_APPLY_COMMAND_BATCH_SIZE, PARTITION_APPLY_COMMAND_DURATION, PARTITION_LABEL,
PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION, PARTITION_RECORD_WRITE_TO_READ_LATENCY_SECONDS,
PARTITION_RPC_QUEUE_UTILIZATION_PERCENT,
};
use crate::partition::invoker_storage_reader::InvokerStorageReader;
use crate::partition::leadership::{LeadershipState, PartitionProcessorMetadata};
Expand Down Expand Up @@ -367,8 +368,19 @@ where
);
}

// Telemetry setup
let partition_id_str = self.partition_id.to_string();
let apply_command_latency = histogram!(PARTITION_APPLY_COMMAND_DURATION, PARTITION_LABEL => partition_id_str.clone());
let record_actions_latency = histogram!(PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION);
let record_write_to_read_latencty = histogram!(PARTITION_RECORD_WRITE_TO_READ_LATENCY_SECONDS, PARTITION_LABEL => partition_id_str.clone());
let command_batch_size = histogram!(PARTITION_APPLY_COMMAND_BATCH_SIZE, PARTITION_LABEL => partition_id_str.clone());

let rpc_queue_utilization =
gauge!(PARTITION_RPC_QUEUE_UTILIZATION_PERCENT, PARTITION_LABEL => partition_id_str);

// Start reading after the last applied lsn
let key_query = KeyFilter::Within(self.partition_key_range.clone());

let mut record_stream = self
.bifrost
.create_reader(
Expand All @@ -382,6 +394,9 @@ where
trace!(?entry, "Read entry");
let lsn = entry.sequence_number();
if entry.is_data_record() {
entry.as_record().inspect(|record| {
record_write_to_read_latencty.record(record.created_at().elapsed());
});
entry
.try_decode_arc::<Envelope>()
.map(|envelope| Ok((lsn, envelope?)))
Expand Down Expand Up @@ -411,20 +426,17 @@ where
tokio::time::interval(Duration::from_millis(500 + rand::random::<u64>() % 524));
status_update_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);

let partition_id_str: &'static str = Box::leak(Box::new(self.partition_id.to_string()));
// Telemetry setup
let apply_command_latency =
histogram!(PP_APPLY_COMMAND_DURATION, PARTITION_LABEL => partition_id_str);
let record_actions_latency = histogram!(PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION);
let command_batch_size =
histogram!(PP_APPLY_COMMAND_BATCH_SIZE, PARTITION_LABEL => partition_id_str);

let mut action_collector = ActionCollector::default();
let mut command_buffer = Vec::with_capacity(self.max_command_batch_size);

info!("Partition {} started", self.partition_id);

loop {
let utilization =
(self.rpc_rx.len() as f64 * 100.0) / self.rpc_rx.max_capacity() as f64;

rpc_queue_utilization.set(utilization);

tokio::select! {
Some(command) = self.control_rx.recv() => {
if let Err(err) = self.on_command(command).await {
Expand Down
Loading

0 comments on commit 8854a68

Please sign in to comment.