Skip to content

Commit 4d41d0d

Browse files
authored
Add backend status stream endpoint (#844)
<!-- ELLIPSIS_HIDDEN --> > [!IMPORTANT] > Add backend status stream endpoint and refactor admin commands to utilize SSE for status updates. > > - **Behavior**: > - Adds `BackendStatus` command to `AdminCommand` in `admin.rs` to stream backend status updates. > - Introduces `print_status_stream()` in `admin.rs` to handle streaming of backend status until a specified status is reached. > - Refactors `Connect` and `Terminate` commands in `admin.rs` to use `print_status_stream()` for status updates. > - **Client**: > - Exposes `sse` module in `client/mod.rs` for public use. > - Adds `backend_status_stream()` in `PlaneClient` in `client/mod.rs` to initiate SSE requests for backend status updates. > - **Misc**: > - Minor import reordering in `admin.rs` for better organization. > > <sup>This description was created by </sup>[<img alt="Ellipsis" src="https://img.shields.io/badge/Ellipsis-blue?color=175173">](https://www.ellipsis.dev?ref=jamsocket%2Fplane&utm_source=github&utm_medium=referral)<sup> for 65c5a3f. It will automatically update as commits are pushed.</sup> <!-- ELLIPSIS_HIDDEN -->
1 parent 97d5748 commit 4d41d0d

File tree

2 files changed

+35
-32
lines changed

2 files changed

+35
-32
lines changed

plane/src/admin.rs

+34-31
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
1-
use std::path::PathBuf;
2-
31
use crate::{
4-
client::{PlaneClient, PlaneClientError},
2+
client::{sse::SseStream, PlaneClient, PlaneClientError},
53
names::{BackendName, DroneName, Name, ProxyName},
64
protocol::{CertManagerRequest, CertManagerResponse, MessageFromProxy, MessageToProxy},
75
types::{
8-
BackendStatus, ClusterName, ClusterState, ConnectRequest, DockerExecutorConfig,
9-
DronePoolName, KeyConfig, Mount, NodeState, SpawnConfig, Subdomain,
6+
backend_state::BackendStatusStreamEntry, BackendStatus, ClusterName, ClusterState,
7+
ConnectRequest, DockerExecutorConfig, DronePoolName, KeyConfig, Mount, NodeState,
8+
SpawnConfig, Subdomain,
109
},
1110
PLANE_GIT_HASH, PLANE_VERSION,
1211
};
1312
use chrono::Duration;
1413
use clap::{Parser, Subcommand};
1514
use colored::Colorize;
15+
use futures_util::{Stream, StreamExt};
16+
use std::path::PathBuf;
1617
use url::Url;
1718

1819
fn show_error(error: &PlaneClientError) {
@@ -156,6 +157,9 @@ pub enum AdminCommand {
156157
ClusterState {
157158
cluster: ClusterName,
158159
},
160+
BackendStatus {
161+
backend: BackendName,
162+
},
159163
}
160164

161165
pub async fn run_admin_command(opts: AdminOpts) {
@@ -165,6 +169,23 @@ pub async fn run_admin_command(opts: AdminOpts) {
165169
}
166170
}
167171

172+
pub async fn print_status_stream(
173+
mut stream: SseStream<BackendStatusStreamEntry>,
174+
until: BackendStatus,
175+
) {
176+
while let Some(status) = stream.next().await {
177+
println!(
178+
"Status: {} at {}",
179+
status.status.to_string().magenta(),
180+
status.time.0.to_string().bright_cyan()
181+
);
182+
183+
if status.status >= until {
184+
break;
185+
}
186+
}
187+
}
188+
168189
pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientError> {
169190
let client = PlaneClient::new(opts.controller);
170191

@@ -227,19 +248,8 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE
227248
}
228249

229250
if !immediate {
230-
let mut stream = client.backend_status_stream(&response.backend_id).await?;
231-
232-
while let Some(status) = stream.next().await {
233-
println!(
234-
"Status: {} at {}",
235-
status.status.to_string().magenta(),
236-
status.time.0.to_string().bright_cyan()
237-
);
238-
239-
if status.status >= BackendStatus::Ready {
240-
break;
241-
}
242-
}
251+
let stream = client.backend_status_stream(&response.backend_id).await?;
252+
print_status_stream(stream, BackendStatus::Ready).await;
243253
}
244254
}
245255
AdminCommand::Terminate {
@@ -259,19 +269,8 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE
259269
);
260270

261271
if !immediate {
262-
let mut stream = client.backend_status_stream(&backend).await?;
263-
264-
while let Some(status) = stream.next().await {
265-
println!(
266-
"Status: {} at {}",
267-
status.status.to_string().magenta(),
268-
status.time.0.to_string().bright_cyan()
269-
);
270-
271-
if status.status >= BackendStatus::Terminated {
272-
break;
273-
}
274-
}
272+
let stream = client.backend_status_stream(&backend).await?;
273+
print_status_stream(stream, BackendStatus::Terminated).await;
275274
}
276275
}
277276
AdminCommand::Drain { cluster, drone } => {
@@ -366,6 +365,10 @@ pub async fn run_admin_command_inner(opts: AdminOpts) -> Result<(), PlaneClientE
366365
let cluster_state = client.cluster_state(&cluster).await?;
367366
show_cluster_state(&cluster_state);
368367
}
368+
AdminCommand::BackendStatus { backend } => {
369+
let stream = client.backend_status_stream(&backend).await?;
370+
print_status_stream(stream, BackendStatus::Terminated).await;
371+
}
369372
};
370373

371374
Ok(())

plane/src/client/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use reqwest::{Response, StatusCode};
1313
use serde::de::DeserializeOwned;
1414
use url::{form_urlencoded, Url};
1515
pub mod controller_address;
16-
mod sse;
16+
pub mod sse;
1717

1818
#[derive(thiserror::Error, Debug)]
1919
pub enum PlaneClientError {

0 commit comments

Comments
 (0)