-
Notifications
You must be signed in to change notification settings - Fork 195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(torii-server): server proxy handlers #2708
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,46 @@ | ||||||||||||
use std::net::{IpAddr, SocketAddr}; | ||||||||||||
|
||||||||||||
use http::StatusCode; | ||||||||||||
use hyper::{Body, Request, Response}; | ||||||||||||
use tracing::error; | ||||||||||||
|
||||||||||||
use super::Handler; | ||||||||||||
|
||||||||||||
pub(crate) const LOG_TARGET: &str = "torii::server::handlers::graphql"; | ||||||||||||
|
||||||||||||
pub struct GraphQLHandler { | ||||||||||||
client_ip: IpAddr, | ||||||||||||
graphql_addr: Option<SocketAddr>, | ||||||||||||
} | ||||||||||||
|
||||||||||||
impl GraphQLHandler { | ||||||||||||
pub fn new(client_ip: IpAddr, graphql_addr: Option<SocketAddr>) -> Self { | ||||||||||||
Self { client_ip, graphql_addr } | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
#[async_trait::async_trait] | ||||||||||||
impl Handler for GraphQLHandler { | ||||||||||||
fn can_handle(&self, req: &Request<Body>) -> bool { | ||||||||||||
req.uri().path().starts_with("/graphql") | ||||||||||||
} | ||||||||||||
|
||||||||||||
async fn handle(&self, req: Request<Body>) -> Response<Body> { | ||||||||||||
if let Some(addr) = self.graphql_addr { | ||||||||||||
let graphql_addr = format!("http://{}", addr); | ||||||||||||
match crate::proxy::GRAPHQL_PROXY_CLIENT.call(self.client_ip, &graphql_addr, req).await | ||||||||||||
{ | ||||||||||||
Ok(response) => response, | ||||||||||||
Err(_error) => { | ||||||||||||
error!(target: LOG_TARGET, "GraphQL proxy error: {:?}", _error); | ||||||||||||
Response::builder() | ||||||||||||
.status(StatusCode::INTERNAL_SERVER_ERROR) | ||||||||||||
.body(Body::empty()) | ||||||||||||
.unwrap() | ||||||||||||
} | ||||||||||||
} | ||||||||||||
Larkooo marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
} else { | ||||||||||||
Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap() | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle Response builder error case, sensei! The - Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap()
+ Response::builder()
+ .status(StatusCode::NOT_FOUND)
+ .body(Body::empty())
+ .unwrap_or_else(|_| Response::new(Body::empty())) 📝 Committable suggestion
Suggested change
|
||||||||||||
} | ||||||||||||
} | ||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
use std::net::{IpAddr, SocketAddr}; | ||
|
||
use http::header::CONTENT_TYPE; | ||
use hyper::{Body, Request, Response, StatusCode}; | ||
use tracing::error; | ||
|
||
use super::Handler; | ||
|
||
pub(crate) const LOG_TARGET: &str = "torii::server::handlers::grpc"; | ||
|
||
pub struct GrpcHandler { | ||
client_ip: IpAddr, | ||
grpc_addr: Option<SocketAddr>, | ||
} | ||
|
||
impl GrpcHandler { | ||
pub fn new(client_ip: IpAddr, grpc_addr: Option<SocketAddr>) -> Self { | ||
Self { client_ip, grpc_addr } | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl Handler for GrpcHandler { | ||
fn can_handle(&self, req: &Request<Body>) -> bool { | ||
req.headers() | ||
.get(CONTENT_TYPE) | ||
.and_then(|ct| ct.to_str().ok()) | ||
.map(|ct| ct.starts_with("application/grpc")) | ||
.unwrap_or(false) | ||
} | ||
|
||
async fn handle(&self, req: Request<Body>) -> Response<Body> { | ||
if let Some(grpc_addr) = self.grpc_addr { | ||
let grpc_addr = format!("http://{}", grpc_addr); | ||
match crate::proxy::GRPC_PROXY_CLIENT.call(self.client_ip, &grpc_addr, req).await { | ||
Ok(response) => response, | ||
Err(_error) => { | ||
error!(target: LOG_TARGET, "{:?}", _error); | ||
Response::builder() | ||
.status(StatusCode::INTERNAL_SERVER_ERROR) | ||
.body(Body::empty()) | ||
.unwrap() | ||
} | ||
} | ||
} else { | ||
Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap() | ||
} | ||
} | ||
Larkooo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
pub mod graphql; | ||
pub mod grpc; | ||
pub mod sql; | ||
pub mod static_files; | ||
|
||
use hyper::{Body, Request, Response}; | ||
|
||
#[async_trait::async_trait] | ||
pub trait Handler: Send + Sync { | ||
// Check if this handler can handle the given request | ||
fn can_handle(&self, req: &Request<Body>) -> bool; | ||
|
||
// Handle the request | ||
async fn handle(&self, req: Request<Body>) -> Response<Body>; | ||
} | ||
Larkooo marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
use std::sync::Arc; | ||
|
||
use base64::engine::general_purpose::STANDARD; | ||
use base64::Engine; | ||
use http::header::CONTENT_TYPE; | ||
use hyper::{Body, Method, Request, Response, StatusCode}; | ||
use sqlx::{Column, Row, SqlitePool, TypeInfo}; | ||
|
||
use super::Handler; | ||
|
||
pub struct SqlHandler { | ||
pool: Arc<SqlitePool>, | ||
} | ||
|
||
impl SqlHandler { | ||
pub fn new(pool: Arc<SqlitePool>) -> Self { | ||
Self { pool } | ||
} | ||
|
||
pub async fn execute_query(&self, query: String) -> Response<Body> { | ||
match sqlx::query(&query).fetch_all(&*self.pool).await { | ||
Ok(rows) => { | ||
let result: Vec<_> = rows | ||
.iter() | ||
.map(|row| { | ||
let mut obj = serde_json::Map::new(); | ||
for (i, column) in row.columns().iter().enumerate() { | ||
let value: serde_json::Value = match column.type_info().name() { | ||
"TEXT" => row | ||
.get::<Option<String>, _>(i) | ||
.map_or(serde_json::Value::Null, serde_json::Value::String), | ||
"INTEGER" | "NULL" => row | ||
.get::<Option<i64>, _>(i) | ||
.map_or(serde_json::Value::Null, |n| { | ||
serde_json::Value::Number(n.into()) | ||
}), | ||
"REAL" => row.get::<Option<f64>, _>(i).map_or( | ||
serde_json::Value::Null, | ||
|f| { | ||
serde_json::Number::from_f64(f).map_or( | ||
serde_json::Value::Null, | ||
serde_json::Value::Number, | ||
) | ||
}, | ||
), | ||
"BLOB" => row | ||
.get::<Option<Vec<u8>>, _>(i) | ||
.map_or(serde_json::Value::Null, |bytes| { | ||
serde_json::Value::String(STANDARD.encode(bytes)) | ||
}), | ||
_ => row | ||
.get::<Option<String>, _>(i) | ||
.map_or(serde_json::Value::Null, serde_json::Value::String), | ||
}; | ||
obj.insert(column.name().to_string(), value); | ||
} | ||
serde_json::Value::Object(obj) | ||
}) | ||
.collect(); | ||
|
||
let json = serde_json::to_string(&result).unwrap(); | ||
|
||
Larkooo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Response::builder() | ||
.status(StatusCode::OK) | ||
.header(CONTENT_TYPE, "application/json") | ||
.body(Body::from(json)) | ||
.unwrap() | ||
} | ||
Err(e) => Response::builder() | ||
.status(StatusCode::BAD_REQUEST) | ||
.body(Body::from(format!("Query error: {:?}", e))) | ||
.unwrap(), | ||
Larkooo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
Larkooo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
fn extract_query(&self, req: Request<Body>) -> Result<String, Response<Body>> { | ||
match *req.method() { | ||
Method::GET => { | ||
let params = req.uri().query().unwrap_or_default(); | ||
Ok(form_urlencoded::parse(params.as_bytes()) | ||
.find(|(key, _)| key == "q") | ||
.map(|(_, value)| value.to_string()) | ||
.unwrap_or_default()) | ||
} | ||
Method::POST => { | ||
// Note: This would need to be adjusted to handle the async body reading | ||
Ok(String::new()) // Placeholder | ||
} | ||
_ => Err(Response::builder() | ||
Larkooo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.status(StatusCode::METHOD_NOT_ALLOWED) | ||
.body(Body::from("Only GET and POST methods are allowed")) | ||
.unwrap()), | ||
} | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl Handler for SqlHandler { | ||
fn can_handle(&self, req: &Request<Body>) -> bool { | ||
req.uri().path().starts_with("/sql") | ||
} | ||
|
||
async fn handle(&self, req: Request<Body>) -> Response<Body> { | ||
match self.extract_query(req) { | ||
Ok(query) => self.execute_query(query).await, | ||
Err(response) => response, | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
use std::net::{IpAddr, SocketAddr}; | ||
|
||
use hyper::{Body, Request, Response, StatusCode}; | ||
use tracing::error; | ||
|
||
use super::Handler; | ||
|
||
pub(crate) const LOG_TARGET: &str = "torii::server::handlers::static"; | ||
|
||
pub struct StaticHandler { | ||
client_ip: IpAddr, | ||
artifacts_addr: Option<SocketAddr>, | ||
} | ||
|
||
impl StaticHandler { | ||
pub fn new(client_ip: IpAddr, artifacts_addr: Option<SocketAddr>) -> Self { | ||
Self { client_ip, artifacts_addr } | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl Handler for StaticHandler { | ||
fn can_handle(&self, req: &Request<Body>) -> bool { | ||
req.uri().path().starts_with("/static") | ||
} | ||
|
||
async fn handle(&self, req: Request<Body>) -> Response<Body> { | ||
if let Some(artifacts_addr) = self.artifacts_addr { | ||
let artifacts_addr = format!("http://{}", artifacts_addr); | ||
match crate::proxy::GRAPHQL_PROXY_CLIENT | ||
.call(self.client_ip, &artifacts_addr, req) | ||
.await | ||
{ | ||
Ok(response) => response, | ||
Err(_error) => { | ||
error!(target: LOG_TARGET, "{:?}", _error); | ||
Response::builder() | ||
.status(StatusCode::INTERNAL_SERVER_ERROR) | ||
.body(Body::empty()) | ||
.unwrap() | ||
} | ||
} | ||
} else { | ||
Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap() | ||
} | ||
} | ||
} | ||
Larkooo marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
pub mod artifacts; | ||
pub(crate) mod handlers; | ||
pub mod proxy; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Strengthen the path matching logic, sensei!
The current path check might match unintended paths like "/graphql-wrong". Consider using exact matching or a more specific pattern.
📝 Committable suggestion