Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(serverless): better error handling #237

Merged
merged 2 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/short-scissors-explode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Improve error handling
7 changes: 4 additions & 3 deletions packages/serverless/src/deployments/assets.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::{collections::HashMap, env, fs, io, path::Path};
use std::{collections::HashMap, env, fs, path::Path};

use anyhow::Result;
use hyper::body::Bytes;
use lagon_runtime::http::Response;

use super::Deployment;

pub fn handle_asset(deployment: &Deployment, asset: &String) -> io::Result<Response> {
let path = Path::new(env::current_dir().unwrap().as_path())
pub fn handle_asset(deployment: &Deployment, asset: &String) -> Result<Response> {
let path = Path::new(env::current_dir()?.as_path())
.join("deployments")
.join(deployment.id.clone())
.join(asset);
Expand Down
15 changes: 8 additions & 7 deletions packages/serverless/src/deployments/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::io::Write;
use std::path::PathBuf;
use std::{env, fs, io, path::Path};
use std::{env, fs, path::Path};

use anyhow::Result;
use log::info;

use super::Deployment;

pub fn create_deployments_folder() -> io::Result<()> {
pub fn create_deployments_folder() -> Result<()> {
let path = Path::new("deployments");

if !path.exists() {
Expand All @@ -23,16 +24,16 @@ pub fn has_deployment_code(deployment: &Deployment) -> bool {
path.exists()
}

pub fn get_deployment_code(deployment: &Deployment) -> io::Result<String> {
let path = Path::new(env::current_dir().unwrap().as_path())
pub fn get_deployment_code(deployment: &Deployment) -> Result<String> {
let path = Path::new(env::current_dir()?.as_path())
.join("deployments")
.join(deployment.id.clone() + ".js");
let code = fs::read_to_string(path)?;

Ok(code)
}

pub fn write_deployment(deployment_id: &str, buf: &[u8]) -> io::Result<()> {
pub fn write_deployment(deployment_id: &str, buf: &[u8]) -> Result<()> {
let mut file =
fs::File::create(Path::new("deployments").join(deployment_id.to_owned() + ".js"))?;

Expand All @@ -42,7 +43,7 @@ pub fn write_deployment(deployment_id: &str, buf: &[u8]) -> io::Result<()> {
Ok(())
}

pub fn write_deployment_asset(deployment_id: &str, asset: &str, buf: &[u8]) -> io::Result<()> {
pub fn write_deployment_asset(deployment_id: &str, asset: &str, buf: &[u8]) -> Result<()> {
let asset = asset.replace("public/", "");
let asset = asset.as_str();

Expand All @@ -60,7 +61,7 @@ pub fn write_deployment_asset(deployment_id: &str, asset: &str, buf: &[u8]) -> i
Ok(())
}

pub fn rm_deployment(deployment_id: &str) -> io::Result<()> {
pub fn rm_deployment(deployment_id: &str) -> Result<()> {
fs::remove_file(Path::new("deployments").join(deployment_id.to_owned() + ".js"))?;

// It's possible that the folder doesn't exists if the deployment has no assets
Expand Down
36 changes: 17 additions & 19 deletions packages/serverless/src/deployments/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{
collections::{HashMap, HashSet},
fs, io,
fs,
path::Path,
sync::Arc,
};

use anyhow::{anyhow, Result};
use log::{error, info};
use mysql::{prelude::Queryable, PooledConn};
use s3::Bucket;
Expand Down Expand Up @@ -57,7 +58,7 @@ impl Deployment {
pub async fn get_deployments(
mut conn: PooledConn,
bucket: Bucket,
) -> Arc<RwLock<HashMap<String, Deployment>>> {
) -> Result<Arc<RwLock<HashMap<String, Deployment>>>> {
let deployments = Arc::new(RwLock::new(HashMap::new()));

let mut deployments_list: HashMap<String, Deployment> = HashMap::new();
Expand Down Expand Up @@ -124,8 +125,7 @@ pub async fn get_deployments(
timeout,
});
},
)
.unwrap();
)?;

let deployments_list: Vec<Deployment> = deployments_list.values().cloned().collect();

Expand Down Expand Up @@ -158,16 +158,15 @@ pub async fn get_deployments(
}
}

deployments
Ok(deployments)
}

async fn delete_old_deployments(deployments: &[Deployment]) -> io::Result<()> {
async fn delete_old_deployments(deployments: &[Deployment]) -> Result<()> {
info!("Deleting old deployments");
let local_deployments_files = fs::read_dir(Path::new("deployments"))?;

for local_deployment_file in local_deployments_files {
let local_deployment_file = local_deployment_file.unwrap();
let local_deployment_file_name = local_deployment_file
let local_deployment_file_name = local_deployment_file?
.file_name()
.into_string()
.unwrap_or_else(|_| "".into());
Expand All @@ -191,28 +190,27 @@ async fn delete_old_deployments(deployments: &[Deployment]) -> io::Result<()> {
Ok(())
}

pub async fn download_deployment(deployment: &Deployment, bucket: &Bucket) -> io::Result<()> {
pub async fn download_deployment(deployment: &Deployment, bucket: &Bucket) -> Result<()> {
match bucket.get_object(deployment.id.clone() + ".js").await {
Ok(object) => {
write_deployment(&deployment.id, object.bytes())?;

if !deployment.assets.is_empty() {
for asset in &deployment.assets {
let object = bucket
match bucket
.get_object(deployment.id.clone() + "/" + asset.as_str())
.await;

if let Err(error) = object {
return Err(io::Error::new(io::ErrorKind::Other, error));
}

let object = object.unwrap();
write_deployment_asset(&deployment.id, asset, object.bytes())?;
.await
{
Ok(object) => {
write_deployment_asset(&deployment.id, asset, object.bytes())?
}
Err(error) => return Err(anyhow!(error)),
};
}
}

Ok(())
}
Err(error) => Err(io::Error::new(io::ErrorKind::Other, error)),
Err(error) => Err(anyhow!(error)),
}
}
17 changes: 9 additions & 8 deletions packages/serverless/src/deployments/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc};

use anyhow::Result;
use log::error;
use s3::Bucket;
use serde_json::Value;
Expand All @@ -10,23 +11,23 @@ use super::{download_deployment, filesystem::rm_deployment, Deployment};
pub fn listen_pub_sub(
bucket: Bucket,
deployments: Arc<RwLock<HashMap<String, Deployment>>>,
) -> JoinHandle<()> {
) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
let url = dotenv::var("REDIS_URL").expect("REDIS_URL must be set");
let client = redis::Client::open(url).unwrap();
let mut conn = client.get_connection().unwrap();
let client = redis::Client::open(url)?;
let mut conn = client.get_connection()?;
let mut pub_sub = conn.as_pubsub();

pub_sub.subscribe("deploy").unwrap();
pub_sub.subscribe("undeploy").unwrap();
pub_sub.subscribe("deploy")?;
pub_sub.subscribe("undeploy")?;
// TODO: current, domains

loop {
let msg = pub_sub.get_message().unwrap();
let msg = pub_sub.get_message()?;
let channel = msg.get_channel_name();
let payload: String = msg.get_payload().unwrap();
let payload: String = msg.get_payload()?;

let value: Value = serde_json::from_str(&payload).unwrap();
let value: Value = serde_json::from_str(&payload)?;

let deployment = Deployment {
id: value["deploymentId"].as_str().unwrap().to_string(),
Expand Down
10 changes: 5 additions & 5 deletions packages/serverless/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ impl SimpleLogger {
match Client::new() {
Ok(axiom_client) => {
tokio::spawn(async move {
axiom_client
if let Err(error) = axiom_client
.datasets
.ingest_stream("serverless", rx.into_stream())
.await
.unwrap();
{
eprintln!("Error ingesting into Axiom: {}", error);
}
});
}
Err(e) => {
println!("Axiom is not configured: {}", e);
}
Err(error) => println!("Axiom is not configured: {}", error),
}

Self {
Expand Down
39 changes: 18 additions & 21 deletions packages/serverless/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ async fn handle_request(
// Remove the leading '/' from the url
url.remove(0);

let hostname = req.headers().get(HOST).unwrap().to_str()?.to_string();
let hostname = match req.headers().get(HOST) {
Some(hostname) => hostname.to_str()?.to_string(),
None => return Ok(Builder::new().status(404).body(PAGE_404.into())?),
};

let thread_ids_reader = thread_ids.read().await;

Expand Down Expand Up @@ -237,23 +240,16 @@ async fn handle_request(

Ok(hyper_response)
}
RunResult::Timeout | RunResult::MemoryLimit => Ok(HyperResponse::builder()
.status(502)
.body(PAGE_502.into())
.unwrap()),
RunResult::Error(_) => Ok(HyperResponse::builder()
.status(500)
.body(PAGE_500.into())
.unwrap()),
RunResult::NotFound => Ok(HyperResponse::builder()
.status(404)
.body(PAGE_404.into())
.unwrap()),
RunResult::Timeout | RunResult::MemoryLimit => {
Ok(HyperResponse::builder().status(502).body(PAGE_502.into())?)
}
RunResult::Error(_) => Ok(HyperResponse::builder().status(500).body(PAGE_500.into())?),
RunResult::NotFound => Ok(HyperResponse::builder().status(404).body(PAGE_404.into())?),
}
}

#[tokio::main]
async fn main() {
async fn main() -> Result<()> {
dotenv::dotenv().expect("Failed to load .env file");
let _flush_guard = init_logger().expect("Failed to init logger");

Expand All @@ -270,23 +266,22 @@ async fn main() {
let opts = OptsBuilder::from_opts(opts).ssl_opts(Some(
SslOpts::default().with_danger_accept_invalid_certs(true),
));
let pool = Pool::new(opts).unwrap();
let conn = pool.get_conn().unwrap();
let pool = Pool::new(opts)?;
let conn = pool.get_conn()?;

let bucket_name = dotenv::var("S3_BUCKET").expect("S3_BUCKET must be set");
let region = "eu-west-3".parse().unwrap();
let region = "eu-west-3".parse()?;
let credentials = Credentials::new(
Some(&dotenv::var("S3_ACCESS_KEY_ID").expect("S3_ACCESS_KEY_ID must be set")),
Some(&dotenv::var("S3_SECRET_ACCESS_KEY").expect("S3_SECRET_ACCESS_KEY must be set")),
None,
None,
None,
)
.unwrap();
)?;

let bucket = Bucket::new(&bucket_name, region, credentials).unwrap();
let bucket = Bucket::new(&bucket_name, region, credentials)?;

let deployments = get_deployments(conn, bucket.clone()).await;
let deployments = get_deployments(conn, bucket.clone()).await?;
let redis = listen_pub_sub(bucket.clone(), deployments.clone());

let pool = LocalPoolHandle::new(POOL_SIZE);
Expand Down Expand Up @@ -324,4 +319,6 @@ async fn main() {
}

runtime.dispose();

Ok(())
}