Skip to content

Commit

Permalink
refactor(serverless): better error handling (#237)
Browse files Browse the repository at this point in the history
* refactor(serverless): better error handling

* chore: add changeset
  • Loading branch information
QuiiBz authored Nov 4, 2022
1 parent 6b44882 commit 747774b
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 63 deletions.
5 changes: 5 additions & 0 deletions .changeset/short-scissors-explode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Improve error handling
7 changes: 4 additions & 3 deletions packages/serverless/src/deployments/assets.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::{collections::HashMap, env, fs, io, path::Path};
use std::{collections::HashMap, env, fs, path::Path};

use anyhow::Result;
use hyper::body::Bytes;
use lagon_runtime::http::Response;

use super::Deployment;

pub fn handle_asset(deployment: &Deployment, asset: &String) -> io::Result<Response> {
let path = Path::new(env::current_dir().unwrap().as_path())
pub fn handle_asset(deployment: &Deployment, asset: &String) -> Result<Response> {
let path = Path::new(env::current_dir()?.as_path())
.join("deployments")
.join(deployment.id.clone())
.join(asset);
Expand Down
15 changes: 8 additions & 7 deletions packages/serverless/src/deployments/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::io::Write;
use std::path::PathBuf;
use std::{env, fs, io, path::Path};
use std::{env, fs, path::Path};

use anyhow::Result;
use log::info;

use super::Deployment;

pub fn create_deployments_folder() -> io::Result<()> {
pub fn create_deployments_folder() -> Result<()> {
let path = Path::new("deployments");

if !path.exists() {
Expand All @@ -23,16 +24,16 @@ pub fn has_deployment_code(deployment: &Deployment) -> bool {
path.exists()
}

pub fn get_deployment_code(deployment: &Deployment) -> io::Result<String> {
let path = Path::new(env::current_dir().unwrap().as_path())
pub fn get_deployment_code(deployment: &Deployment) -> Result<String> {
let path = Path::new(env::current_dir()?.as_path())
.join("deployments")
.join(deployment.id.clone() + ".js");
let code = fs::read_to_string(path)?;

Ok(code)
}

pub fn write_deployment(deployment_id: &str, buf: &[u8]) -> io::Result<()> {
pub fn write_deployment(deployment_id: &str, buf: &[u8]) -> Result<()> {
let mut file =
fs::File::create(Path::new("deployments").join(deployment_id.to_owned() + ".js"))?;

Expand All @@ -42,7 +43,7 @@ pub fn write_deployment(deployment_id: &str, buf: &[u8]) -> io::Result<()> {
Ok(())
}

pub fn write_deployment_asset(deployment_id: &str, asset: &str, buf: &[u8]) -> io::Result<()> {
pub fn write_deployment_asset(deployment_id: &str, asset: &str, buf: &[u8]) -> Result<()> {
let asset = asset.replace("public/", "");
let asset = asset.as_str();

Expand All @@ -60,7 +61,7 @@ pub fn write_deployment_asset(deployment_id: &str, asset: &str, buf: &[u8]) -> i
Ok(())
}

pub fn rm_deployment(deployment_id: &str) -> io::Result<()> {
pub fn rm_deployment(deployment_id: &str) -> Result<()> {
fs::remove_file(Path::new("deployments").join(deployment_id.to_owned() + ".js"))?;

// It's possible that the folder doesn't exists if the deployment has no assets
Expand Down
36 changes: 17 additions & 19 deletions packages/serverless/src/deployments/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{
collections::{HashMap, HashSet},
fs, io,
fs,
path::Path,
sync::Arc,
};

use anyhow::{anyhow, Result};
use log::{error, info};
use mysql::{prelude::Queryable, PooledConn};
use s3::Bucket;
Expand Down Expand Up @@ -57,7 +58,7 @@ impl Deployment {
pub async fn get_deployments(
mut conn: PooledConn,
bucket: Bucket,
) -> Arc<RwLock<HashMap<String, Deployment>>> {
) -> Result<Arc<RwLock<HashMap<String, Deployment>>>> {
let deployments = Arc::new(RwLock::new(HashMap::new()));

let mut deployments_list: HashMap<String, Deployment> = HashMap::new();
Expand Down Expand Up @@ -124,8 +125,7 @@ pub async fn get_deployments(
timeout,
});
},
)
.unwrap();
)?;

let deployments_list: Vec<Deployment> = deployments_list.values().cloned().collect();

Expand Down Expand Up @@ -158,16 +158,15 @@ pub async fn get_deployments(
}
}

deployments
Ok(deployments)
}

async fn delete_old_deployments(deployments: &[Deployment]) -> io::Result<()> {
async fn delete_old_deployments(deployments: &[Deployment]) -> Result<()> {
info!("Deleting old deployments");
let local_deployments_files = fs::read_dir(Path::new("deployments"))?;

for local_deployment_file in local_deployments_files {
let local_deployment_file = local_deployment_file.unwrap();
let local_deployment_file_name = local_deployment_file
let local_deployment_file_name = local_deployment_file?
.file_name()
.into_string()
.unwrap_or_else(|_| "".into());
Expand All @@ -191,28 +190,27 @@ async fn delete_old_deployments(deployments: &[Deployment]) -> io::Result<()> {
Ok(())
}

pub async fn download_deployment(deployment: &Deployment, bucket: &Bucket) -> io::Result<()> {
pub async fn download_deployment(deployment: &Deployment, bucket: &Bucket) -> Result<()> {
match bucket.get_object(deployment.id.clone() + ".js").await {
Ok(object) => {
write_deployment(&deployment.id, object.bytes())?;

if !deployment.assets.is_empty() {
for asset in &deployment.assets {
let object = bucket
match bucket
.get_object(deployment.id.clone() + "/" + asset.as_str())
.await;

if let Err(error) = object {
return Err(io::Error::new(io::ErrorKind::Other, error));
}

let object = object.unwrap();
write_deployment_asset(&deployment.id, asset, object.bytes())?;
.await
{
Ok(object) => {
write_deployment_asset(&deployment.id, asset, object.bytes())?
}
Err(error) => return Err(anyhow!(error)),
};
}
}

Ok(())
}
Err(error) => Err(io::Error::new(io::ErrorKind::Other, error)),
Err(error) => Err(anyhow!(error)),
}
}
17 changes: 9 additions & 8 deletions packages/serverless/src/deployments/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc};

use anyhow::Result;
use log::error;
use s3::Bucket;
use serde_json::Value;
Expand All @@ -10,23 +11,23 @@ use super::{download_deployment, filesystem::rm_deployment, Deployment};
pub fn listen_pub_sub(
bucket: Bucket,
deployments: Arc<RwLock<HashMap<String, Deployment>>>,
) -> JoinHandle<()> {
) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
let url = dotenv::var("REDIS_URL").expect("REDIS_URL must be set");
let client = redis::Client::open(url).unwrap();
let mut conn = client.get_connection().unwrap();
let client = redis::Client::open(url)?;
let mut conn = client.get_connection()?;
let mut pub_sub = conn.as_pubsub();

pub_sub.subscribe("deploy").unwrap();
pub_sub.subscribe("undeploy").unwrap();
pub_sub.subscribe("deploy")?;
pub_sub.subscribe("undeploy")?;
// TODO: current, domains

loop {
let msg = pub_sub.get_message().unwrap();
let msg = pub_sub.get_message()?;
let channel = msg.get_channel_name();
let payload: String = msg.get_payload().unwrap();
let payload: String = msg.get_payload()?;

let value: Value = serde_json::from_str(&payload).unwrap();
let value: Value = serde_json::from_str(&payload)?;

let deployment = Deployment {
id: value["deploymentId"].as_str().unwrap().to_string(),
Expand Down
10 changes: 5 additions & 5 deletions packages/serverless/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ impl SimpleLogger {
match Client::new() {
Ok(axiom_client) => {
tokio::spawn(async move {
axiom_client
if let Err(error) = axiom_client
.datasets
.ingest_stream("serverless", rx.into_stream())
.await
.unwrap();
{
eprintln!("Error ingesting into Axiom: {}", error);
}
});
}
Err(e) => {
println!("Axiom is not configured: {}", e);
}
Err(error) => println!("Axiom is not configured: {}", error),
}

Self {
Expand Down
39 changes: 18 additions & 21 deletions packages/serverless/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ async fn handle_request(
// Remove the leading '/' from the url
url.remove(0);

let hostname = req.headers().get(HOST).unwrap().to_str()?.to_string();
let hostname = match req.headers().get(HOST) {
Some(hostname) => hostname.to_str()?.to_string(),
None => return Ok(Builder::new().status(404).body(PAGE_404.into())?),
};

let thread_ids_reader = thread_ids.read().await;

Expand Down Expand Up @@ -237,23 +240,16 @@ async fn handle_request(

Ok(hyper_response)
}
RunResult::Timeout | RunResult::MemoryLimit => Ok(HyperResponse::builder()
.status(502)
.body(PAGE_502.into())
.unwrap()),
RunResult::Error(_) => Ok(HyperResponse::builder()
.status(500)
.body(PAGE_500.into())
.unwrap()),
RunResult::NotFound => Ok(HyperResponse::builder()
.status(404)
.body(PAGE_404.into())
.unwrap()),
RunResult::Timeout | RunResult::MemoryLimit => {
Ok(HyperResponse::builder().status(502).body(PAGE_502.into())?)
}
RunResult::Error(_) => Ok(HyperResponse::builder().status(500).body(PAGE_500.into())?),
RunResult::NotFound => Ok(HyperResponse::builder().status(404).body(PAGE_404.into())?),
}
}

#[tokio::main]
async fn main() {
async fn main() -> Result<()> {
dotenv::dotenv().expect("Failed to load .env file");
let _flush_guard = init_logger().expect("Failed to init logger");

Expand All @@ -270,23 +266,22 @@ async fn main() {
let opts = OptsBuilder::from_opts(opts).ssl_opts(Some(
SslOpts::default().with_danger_accept_invalid_certs(true),
));
let pool = Pool::new(opts).unwrap();
let conn = pool.get_conn().unwrap();
let pool = Pool::new(opts)?;
let conn = pool.get_conn()?;

let bucket_name = dotenv::var("S3_BUCKET").expect("S3_BUCKET must be set");
let region = "eu-west-3".parse().unwrap();
let region = "eu-west-3".parse()?;
let credentials = Credentials::new(
Some(&dotenv::var("S3_ACCESS_KEY_ID").expect("S3_ACCESS_KEY_ID must be set")),
Some(&dotenv::var("S3_SECRET_ACCESS_KEY").expect("S3_SECRET_ACCESS_KEY must be set")),
None,
None,
None,
)
.unwrap();
)?;

let bucket = Bucket::new(&bucket_name, region, credentials).unwrap();
let bucket = Bucket::new(&bucket_name, region, credentials)?;

let deployments = get_deployments(conn, bucket.clone()).await;
let deployments = get_deployments(conn, bucket.clone()).await?;
let redis = listen_pub_sub(bucket.clone(), deployments.clone());

let pool = LocalPoolHandle::new(POOL_SIZE);
Expand Down Expand Up @@ -324,4 +319,6 @@ async fn main() {
}

runtime.dispose();

Ok(())
}

0 comments on commit 747774b

Please sign in to comment.