Skip to content

Commit

Permalink
added more error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
nickjiang2378 committed Oct 23, 2024
1 parent be1ccf2 commit b0f4158
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 34 deletions.
3 changes: 3 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,8 @@ linker = "rust-lld"
[target.x86_64-unknown-linux-musl]
linker = "rust-lld"

[target.aarch64-unknown-linux-gnu]
linker = "rust-lld"

[target.x86_64-pc-windows-msvc]
linker = "rust-lld.exe"
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,9 @@ inherits = "release"
debug = 2
lto = "off"
strip = "none"

[profile.dev.package.website_playground]
debug-assertions = false

[profile.release.package.website_playground]
opt-level = "s"
8 changes: 7 additions & 1 deletion hydro_deploy/core/src/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct AzureHost {
project: String,
os_type: String, // linux or windows
machine_size: String,
architecture: Option<String>, // default to x86_64
image: Option<HashMap<String, String>>,
region: String,
user: Option<String>,
Expand All @@ -63,6 +64,7 @@ impl AzureHost {
project: String,
os_type: String, // linux or windows
machine_size: String,
architecture: Option<String>, // default to x86_64
image: Option<HashMap<String, String>>,
region: String,
user: Option<String>,
Expand All @@ -73,6 +75,7 @@ impl AzureHost {
os_type,
machine_size,
image,
architecture,
region,
user,
launched: OnceLock::new(),
Expand All @@ -84,7 +87,10 @@ impl AzureHost {
#[async_trait]
impl Host for AzureHost {
fn target_type(&self) -> HostTargetType {
HostTargetType::Linux(crate::LinuxArchitecture::AARCH64)
match self.architecture.as_deref() {
Some("aarch64") => HostTargetType::Linux(crate::LinuxArchitecture::AARCH64),
_ => HostTargetType::Linux(crate::LinuxArchitecture::X86_64),
}
}

fn request_port(&self, bind_type: &ServerStrategy) {
Expand Down
3 changes: 2 additions & 1 deletion hydro_deploy/core/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,12 @@ impl Deployment {
project: String,
os_type: String, // linux or windows
machine_size: String,
architecture: Option<String>,
image: Option<HashMap<String, String>>,
region: String,
user: Option<String>,
) -> Arc<AzureHost> {
self.add_host(|id| AzureHost::new(id, project, os_type, machine_size, image, region, user))
self.add_host(|id| AzureHost::new(id, project, os_type, machine_size, architecture, image, region, user))
}

#[allow(clippy::too_many_arguments)]
Expand Down
16 changes: 11 additions & 5 deletions hydro_deploy/core/src/hydroflow_crate/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,20 @@ pub async fn build_crate_memoized(params: BuildParams) -> Result<&'static BuildO
ProgressTracker::rich_leaf("build", move |set_msg| async move {
tokio::task::spawn_blocking(move || {
let mut command = Command::new("cargo");
command.args(["build"]);
command.args([
"build".to_string(),
"--profile".to_string(),
params.profile.unwrap_or("release".to_string()),
]);
// command.args([
// "zigbuild".to_string(),
// "--profile".to_string(),
// profile.unwrap_or("release".to_string()),
// params.profile.unwrap_or("release".to_string()),
// ]);

if let Some(profile) = params.profile.as_ref() {
command.args(["--profile", profile]);
}
// if let Some(profile) = params.profile.as_ref() {
// command.args(["--profile", profile]);
// }

if let Some(bin) = params.bin.as_ref() {
command.args(["--bin", bin]);
Expand Down Expand Up @@ -137,6 +141,8 @@ pub async fn build_crate_memoized(params: BuildParams) -> Result<&'static BuildO
command.env("CARGO_TARGET_DIR", target_dir);
}

ProgressTracker::println(&format!("Command to be executed: {:?}", command));

let mut spawned = command
.current_dir(&params.src)
.stdout(Stdio::piped())
Expand Down
Binary file not shown.
77 changes: 51 additions & 26 deletions hydro_deploy/core/src/kubernetes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ use super::progress::ProgressTracker;
pub mod launched_binary;
pub use launched_binary::*;

const ALPHABET: [char; 36] = [
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's',
't', 'u', 'v', 'w', 'x', 'y', 'z', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0',
];

#[derive(Debug)]
pub struct PodHost {
pub id: usize,
Expand Down Expand Up @@ -80,10 +85,7 @@ impl Host for PodHost {
async fn provision(&self, _resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
if self.launched.get().is_none() {
let client = Client::try_default().await.unwrap();
let alphabet = [
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p','q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0'
];
let pod_id = nanoid!(10, &alphabet); // pod names can only contain alphanumeric characters
let pod_id = nanoid!(10, &ALPHABET); // pod names can only contain alphanumeric characters
let pod_name = format!("hydro-{}", pod_id);

// Blank template for a new pod
Expand All @@ -106,37 +108,55 @@ impl Host for PodHost {
// Check if pod_name has already been created. If not, create it.
let lp = ListParams::default().fields(&format!("metadata.name={}", pod_name)); // only want results for our pod
let mut found_existing_pod = false;
for p in pods.list(&lp).await.unwrap() {
// ProgressTracker::println(format!("Found Pod: {}", p.name_any()).as_str());
if p.name_any() == pod_name {
found_existing_pod = true;
match pods.list(&lp).await {
Ok(pod_list) => {
for p in pod_list {
if p.name_any() == pod_name {
found_existing_pod = true;
}
}
}
Err(e) => {
ProgressTracker::println(format!("Error listing pods: {:?}. Maybe your kubernetes cluster is not up?", e).as_str());
}
}
if !found_existing_pod {
// ProgressTracker::println(format!("Creating new pod {:?}", pod_name).as_str());
ProgressTracker::println(format!("Creating new pod {:?}", pod_name).as_str());
let res = pods.create(&PostParams::default(), &p).await;
match res {
Err(e) => ProgressTracker::println(format!("{:?}", e).as_str()),
Ok(_) => (),
}
}
ProgressTracker::println("Check 1");

// Wait until the pod is running, otherwise we get 500 error.
let wp = WatchParams::default().fields(format!("metadata.name={}", pod_name).as_str()).timeout(10);
let mut stream = pods.watch(&wp, "0").await.unwrap().boxed();
while let Some(status) = stream.try_next().await.unwrap() {
match status {
WatchEvent::Added(o) => {
ProgressTracker::println(&format!("Got {}", o.name_any()));
}
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
if s.phase.clone().unwrap_or_default() == "Running" {
ProgressTracker::println(&format!("Ready to attach to {}", o.name_any()));
break;
loop {
let status_result = stream.try_next().await;
match status_result {
Ok(Some(status)) => match status {
WatchEvent::Added(o) => {
ProgressTracker::println(&format!("Found pod {}", o.name_any()));
}
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
if s.phase.clone().unwrap_or_default() == "Running" {
ProgressTracker::println(&format!("Ready to attach to {}", o.name_any()));
break;
}
}
_ => {}
},
Ok(None) => {
// Pod still being created, likely -- restart the watch stream
stream = pods.watch(&wp, "0").await.unwrap().boxed();
}
Err(e) => {
ProgressTracker::println(&format!("Error watching pod events: {:?}", e));
break;
}
_ => {}
}
}

Expand All @@ -148,8 +168,6 @@ impl Host for PodHost {
})).unwrap();
}

ProgressTracker::println("finished provisioning");

self.launched.get().unwrap().clone()
}

Expand Down Expand Up @@ -262,15 +280,23 @@ impl LaunchedHost for LaunchedPod {
.exec(self.pod_name.as_str(), vec!["tar", "xf", "-", "-C", "/"], &ap)
.await?;
let mut tar_stdin = tar.stdin().unwrap();
tar_stdin.write_all(&data).await?;
// tar_stdin.write_all(&data).await?;
if let Err(e) = tar_stdin.write_all(&data).await {
ProgressTracker::println(&format!("Error writing to stdin: {:?}", e));
return Err(e.into());
}
ProgressTracker::println("Wrote all the stdin");

// Flush the stdin to finish sending the file through
tar_stdin.flush().await?;
if let Err(e) = tar_stdin.flush().await {
ProgressTracker::println(&format!("Error flushing stdin: {:?}", e));
return Err(e.into());
}

ProgressTracker::println("Flushed!");
drop(tar_stdin); // Ensure stdin is closed before joining
let result = tar.join().await;
// tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
match result {
Ok(_) => ProgressTracker::println("Successfully copied binary to pod"),
Err(e) => ProgressTracker::println(&format!("Failed to copy binary to pod: {:?}", e)),
Expand Down Expand Up @@ -301,6 +327,7 @@ impl LaunchedHost for LaunchedPod {

// Execute binary inside the new pod
let ap = AttachParams::default().stdin(true).stdout(true).stderr(true);
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let mut launch_binary = match pods.exec(pod_name, args_list, &ap).await {
Ok(exec) => exec,
Err(e) => {
Expand All @@ -309,8 +336,6 @@ impl LaunchedHost for LaunchedPod {
}
};

// ProgressTracker::println(&format!("Launched binary in pod {:?}", pod_name));

Ok(Box::new(LaunchedPodBinary::new(
&mut launch_binary, id,
)))
Expand Down
3 changes: 2 additions & 1 deletion hydro_deploy/hydro_cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,12 @@ impl Deployment {
os_type: String, // linux or windows
machine_size: String,
region: String,
architecture: Option<String>,
image: Option<HashMap<String, String>>,
user: Option<String>,
) -> PyResult<Py<PyAny>> {
let arc = self.underlying.blocking_write().add_host(|id| {
core::AzureHost::new(id, project, os_type, machine_size, image, region, user)
core::AzureHost::new(id, project, os_type, machine_size, architecture, image, region, user)
});

Ok(Py::new(
Expand Down

0 comments on commit b0f4158

Please sign in to comment.