Skip to content

Commit

Permalink
Nexus notifications have different importance (#1621)
Browse files Browse the repository at this point in the history
Even if it's best effort, notifications about progress should be low
priority, and not starve out notifications related to processes starting
and stopping. Otherwise, we see:

00:16:18.781Z INFO propolis-server (vm_state_driver): live-repair
completed successfully
         = downstairs
        session_id = 67a91355-4dd1-4e8d-9631-15f5fed073d9
00:16:18.781Z WARN propolis-server (vm_state_driver): could not send
notify "Full(..)"; queue is full
        job = notify_queue
        session_id = 67a91355-4dd1-4e8d-9631-15f5fed073d9

Store high priority messages and retry them 3 times
    
Importantly, remove `retry_until_known_result`: if Nexus disappears,
then the task will be stuck trying to notify it indefinitely, and _of
course_ queues will fill up!
  • Loading branch information
jmpesp authored Feb 5, 2025
1 parent 86a2ce1 commit 03f940b
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 58 deletions.
1 change: 1 addition & 0 deletions pantry/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ async fn bulk_write(

Ok(HttpResponseUpdatedNoContent())
}

#[derive(Deserialize, JsonSchema)]
struct BulkReadRequest {
pub offset: u64,
Expand Down
203 changes: 145 additions & 58 deletions upstairs/src/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ use nexus_client::types::{
};
use omicron_uuid_kinds::{GenericUuid, TypedUuid};

#[derive(Debug)]
pub(crate) enum NotifyQos {
High,
Low,
}

#[derive(Debug)]
pub(crate) enum NotifyRequest {
ClientTaskStopped {
Expand Down Expand Up @@ -65,42 +71,111 @@ pub(crate) enum NotifyRequest {
},
}

impl NotifyRequest {
pub(crate) fn qos(&self) -> NotifyQos {
match &self {
NotifyRequest::LiveRepairStart { .. }
| NotifyRequest::LiveRepairFinish { .. }
| NotifyRequest::ReconcileStart { .. }
| NotifyRequest::ReconcileFinish { .. } => NotifyQos::High,

NotifyRequest::ClientTaskStopped { .. }
| NotifyRequest::LiveRepairProgress { .. }
| NotifyRequest::ReconcileProgress { .. } => NotifyQos::Low,
}
}
}

pub(crate) struct NotifyQueue {
tx: mpsc::Sender<(DateTime<Utc>, NotifyRequest)>,
tx_high: mpsc::Sender<(DateTime<Utc>, NotifyRequest)>,
tx_low: mpsc::Sender<(DateTime<Utc>, NotifyRequest)>,
log: Logger,
}

impl NotifyQueue {
/// Insert a time-stamped request into the queue
pub fn send(&self, r: NotifyRequest) {
let now = Utc::now();
if let Err(r) = self.tx.try_send((now, r)) {
warn!(self.log, "could not send notify {r:?}; queue is full");
let qos = r.qos();
let queue = match &qos {
NotifyQos::High => &self.tx_high,
NotifyQos::Low => &self.tx_low,
};

if let Err(e) = queue.try_send((now, r)) {
warn!(self.log, "could not send {qos:?} notify: {e}",);
}
}
}

pub(crate) fn spawn_notify_task(addr: Ipv6Addr, log: &Logger) -> NotifyQueue {
let (tx, rx) = mpsc::channel(128);
let (tx_high, rx_high) = mpsc::channel(128);
let (tx_low, rx_low) = mpsc::channel(128);
let task_log = log.new(slog::o!("job" => "notify"));
tokio::spawn(async move { notify_task_nexus(addr, rx, task_log).await });

tokio::spawn(async move {
notify_task_nexus(addr, rx_high, rx_low, task_log).await
});

NotifyQueue {
tx,
tx_high,
tx_low,
log: log.new(o!("job" => "notify_queue")),
}
}

struct Notification {
message: (DateTime<Utc>, NotifyRequest),
qos: NotifyQos,
retries: usize,
}

async fn notify_task_nexus(
addr: Ipv6Addr,
mut rx: mpsc::Receiver<(DateTime<Utc>, NotifyRequest)>,
mut rx_high: mpsc::Receiver<(DateTime<Utc>, NotifyRequest)>,
mut rx_low: mpsc::Receiver<(DateTime<Utc>, NotifyRequest)>,
log: Logger,
) {
info!(log, "notify_task started");

// Store high QoS messages if they can't be sent
let mut stored_notification: Option<Notification> = None;

let reqwest_client = reqwest::ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(15))
.timeout(std::time::Duration::from_secs(15))
.build()
.unwrap();
while let Some((time, m)) = rx.recv().await {

loop {
let r = tokio::select! {
biased;

Some(n) = async { stored_notification.take() } => Some(n),

i = rx_high.recv() => i.map(|message| Notification {
message,
qos: NotifyQos::High,
retries: 0,
}),

i = rx_low.recv() => i.map(|message| Notification {
message,
qos: NotifyQos::Low,
retries: 0,
}),
};

let Some(Notification {
message: (time, m),
qos,
retries,
}) = r
else {
error!(log, "one of the notify channels was closed!");
break;
};

debug!(log, "notify {m:?}");
let client = reqwest_client.clone();
let Some(nexus_client) = get_nexus_client(&log, client, addr).await
Expand All @@ -114,21 +189,23 @@ async fn notify_task_nexus(
);
continue;
};
let (r, s) = match m {

let (r, s) = match &m {
NotifyRequest::ClientTaskStopped {
upstairs_id,
downstairs_id,
reason,
} => {
let upstairs_id = TypedUuid::from_untyped_uuid(upstairs_id);
let downstairs_id = TypedUuid::from_untyped_uuid(downstairs_id);
let upstairs_id = TypedUuid::from_untyped_uuid(*upstairs_id);
let downstairs_id =
TypedUuid::from_untyped_uuid(*downstairs_id);
let reason = match reason {
ClientRunResult::ConnectionTimeout => {
DownstairsClientStoppedReason::ConnectionTimeout
}
ClientRunResult::ConnectionFailed(_) => {
// skip this notification, it's too noisy during connection
// retries
// skip this notification, it's too noisy during
// connection retries
//DownstairsClientStoppedReason::ConnectionFailed
continue;
}
Expand Down Expand Up @@ -159,16 +236,13 @@ async fn notify_task_nexus(
};

(
omicron_common::retry_until_known_result(&log, || async {
nexus_client
.cpapi_downstairs_client_stopped(
&upstairs_id,
&downstairs_id,
&DownstairsClientStopped { time, reason },
)
.await
})
.await,
nexus_client
.cpapi_downstairs_client_stopped(
&upstairs_id,
&downstairs_id,
&DownstairsClientStopped { time, reason },
)
.await,
"client stopped",
)
}
Expand All @@ -184,7 +258,7 @@ async fn notify_task_nexus(
session_id,
ref repairs,
} => {
let upstairs_id = TypedUuid::from_untyped_uuid(upstairs_id);
let upstairs_id = TypedUuid::from_untyped_uuid(*upstairs_id);
let (description, repair_type) =
if matches!(m, NotifyRequest::LiveRepairStart { .. }) {
("live repair start", UpstairsRepairType::Live)
Expand All @@ -193,9 +267,9 @@ async fn notify_task_nexus(
};
let info = RepairStartInfo {
time,
repair_id: TypedUuid::from_untyped_uuid(repair_id),
repair_id: TypedUuid::from_untyped_uuid(*repair_id),
repair_type,
session_id: TypedUuid::from_untyped_uuid(session_id),
session_id: TypedUuid::from_untyped_uuid(*session_id),
repairs: repairs
.iter()
.map(|(region_uuid, target_addr)| {
Expand All @@ -208,12 +282,9 @@ async fn notify_task_nexus(
};

(
omicron_common::retry_until_known_result(&log, || async {
nexus_client
.cpapi_upstairs_repair_start(&upstairs_id, &info)
.await
})
.await,
nexus_client
.cpapi_upstairs_repair_start(&upstairs_id, &info)
.await,
description,
)
}
Expand All @@ -229,8 +300,8 @@ async fn notify_task_nexus(
current_item,
total_items,
} => {
let upstairs_id = TypedUuid::from_untyped_uuid(upstairs_id);
let repair_id = TypedUuid::from_untyped_uuid(repair_id);
let upstairs_id = TypedUuid::from_untyped_uuid(*upstairs_id);
let repair_id = TypedUuid::from_untyped_uuid(*repair_id);
let description =
if matches!(m, NotifyRequest::LiveRepairProgress { .. }) {
"live repair progress"
Expand All @@ -239,20 +310,17 @@ async fn notify_task_nexus(
};

(
omicron_common::retry_until_known_result(&log, || async {
nexus_client
.cpapi_upstairs_repair_progress(
&upstairs_id,
&repair_id,
&RepairProgress {
current_item,
total_items,
time,
},
)
.await
})
.await,
nexus_client
.cpapi_upstairs_repair_progress(
&upstairs_id,
&repair_id,
&RepairProgress {
current_item: *current_item,
total_items: *total_items,
time,
},
)
.await,
description,
)
}
Expand All @@ -270,7 +338,7 @@ async fn notify_task_nexus(
aborted,
ref repairs,
} => {
let upstairs_id = TypedUuid::from_untyped_uuid(upstairs_id);
let upstairs_id = TypedUuid::from_untyped_uuid(*upstairs_id);
let (description, repair_type) =
if matches!(m, NotifyRequest::LiveRepairFinish { .. }) {
("live repair finish", UpstairsRepairType::Live)
Expand All @@ -279,9 +347,9 @@ async fn notify_task_nexus(
};
let info = RepairFinishInfo {
time,
repair_id: TypedUuid::from_untyped_uuid(repair_id),
repair_id: TypedUuid::from_untyped_uuid(*repair_id),
repair_type,
session_id: TypedUuid::from_untyped_uuid(session_id),
session_id: TypedUuid::from_untyped_uuid(*session_id),
repairs: repairs
.iter()
.map(|(region_uuid, target_addr)| {
Expand All @@ -291,30 +359,49 @@ async fn notify_task_nexus(
}
})
.collect(),
aborted,
aborted: *aborted,
};

(
omicron_common::retry_until_known_result(&log, || async {
nexus_client
.cpapi_upstairs_repair_finish(&upstairs_id, &info)
.await
})
.await,
nexus_client
.cpapi_upstairs_repair_finish(&upstairs_id, &info)
.await,
description,
)
}
};

match r {
Ok(_) => {
info!(log, "notified Nexus of {s}");
}

Err(e) => {
error!(log, "failed to notify Nexus of {s}: {e}");

// If there's a problem notifying Nexus, it could be due to
// Nexus being gone before the DNS was updated. If this is the
// case, then retrying should eventually pick a different Nexus
// and succeed. Store high priority messages so they can be
// resent.
if matches!(qos, NotifyQos::High) {
// If we've retried too many times, then drop this message.
// Unfortunately if this is true then other notifications
// will also likely fail.
if retries > 3 {
warn!(log, "retries > 3, dropping {m:?}");
} else {
stored_notification = Some(Notification {
message: (time, m),
qos,
retries: retries + 1,
});
}
}
}
}
}

info!(log, "notify_task exiting");
}

Expand Down

0 comments on commit 03f940b

Please sign in to comment.