Skip to content

Commit

Permalink
Update hyper examples
Browse files Browse the repository at this point in the history
  • Loading branch information
khvzak committed Mar 22, 2024
1 parent 849206e commit 94cef89
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 101 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,9 @@ libloading = { version = "0.8", optional = true }
[dev-dependencies]
trybuild = "1.0"
futures = "0.3.5"
hyper = { version = "1", features = ["client", "server"] }
hyper-util = { version = "0.1.3", features = ["server", "client", "client-legacy", "tokio", "http1"] }
hyper = { version = "1.2", features = ["full"] }
hyper-util = { version = "0.1.3", features = ["full"] }
http-body-util = "0.1.1"
bytes = "1.5.0"
reqwest = { version = "0.12", features = ["json"] }
tokio = { version = "1.0", features = ["macros", "rt", "time"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,17 @@ This works using Lua [coroutines](https://www.lua.org/manual/5.3/manual.html#2.6
- [TCP Server](examples/async_tcp_server.rs)


**shell command example**:
``` shell
# async http client
cargo run --example async_http_client --features="async macros lua54"
**shell command examples**:
```shell
# async http client (hyper)
foo@bar:~$ cargo run --example async_http_client --features=lua54,async,macros

# async http client (reqwest)
foo@bar:~$ cargo run --example async_http_reqwest --features=lua54,async,macros,serialize

# async http server
cargo run --example async_http_server --features="async macros lua54"
curl http://localhost:3000
foo@bar:~$ cargo run --example async_http_server --features=lua54,async,macros
foo@bar:~$ curl -v http://localhost:3000
```

### Serialization (serde) support
Expand Down
53 changes: 19 additions & 34 deletions examples/async_http_client.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,24 @@
use bytes::Bytes;
use http_body_util::BodyExt;
use http_body_util::Empty;
use std::collections::HashMap;

use http_body_util::BodyExt as _;
use hyper::body::Incoming;
use hyper_util::client::legacy::Client as HyperClient;
use std::collections::HashMap;
use hyper_util::rt::TokioExecutor;

use mlua::{chunk, ExternalResult, Lua, Result, UserData, UserDataMethods};

struct BodyReader(Incoming);

impl UserData for BodyReader {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
// Every call returns a next chunk
methods.add_async_method_mut("read", |lua, reader, ()| async move {
let mut summarize = Vec::new(); // Create a vector to accumulate the bytes

loop {
match reader.0.frame().await {
Some(Ok(bytes)) => {
if let Ok(data) = bytes.into_data() {
summarize.extend(data); // Append the bytes to the summarize variable
}
}
Some(Err(_)) => break, // Break on error
None => break, // Break if no more frames
if let Some(bytes) = reader.0.frame().await {
if let Some(bytes) = bytes.into_lua_err()?.data_ref() {
return Some(lua.create_string(&bytes)).transpose();
}
}

if !summarize.is_empty() {
// If summarize has collected data, return it as a Lua string
Ok(Some(lua.create_string(&summarize)?))
} else {
// Return None if no data was collected
Ok(None)
}
Ok(None)
});
}
}
Expand All @@ -42,8 +28,7 @@ async fn main() -> Result<()> {
let lua = Lua::new();

let fetch_url = lua.create_async_function(|lua, uri: String| async move {
let client =
HyperClient::builder(hyper_util::rt::TokioExecutor::new()).build_http::<Empty<Bytes>>();
let client = HyperClient::builder(TokioExecutor::new()).build_http::<String>();
let uri = uri.parse().into_lua_err()?;
let resp = client.get(uri).await.into_lua_err()?;

Expand All @@ -66,19 +51,19 @@ async fn main() -> Result<()> {

let f = lua
.load(chunk! {
local res = $fetch_url(...)
local res = $fetch_url(...)
print("status: "..res.status)
for key, vals in pairs(res.headers) do
for _, val in ipairs(vals) do
print(key..": "..val)
end
for _, val in ipairs(vals) do
print(key..": "..val)
end
end
repeat
local body = res.body:read()
if body then
print(body)
end
until not body
local chunk = res.body:read()
if chunk then
print(chunk)
end
until not chunk
})
.into_function()?;

Expand Down
137 changes: 79 additions & 58 deletions examples/async_http_server.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,63 @@
use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::{body::Incoming, service::Service, Request, Response};
use hyper_util::{rt::TokioIo, server::conn::auto};
use std::convert::Infallible;
use std::future::Future;
use std::net::SocketAddr;
use std::rc::Rc;

use futures::future::LocalBoxFuture;
use http_body_util::{combinators::BoxBody, BodyExt as _, Empty, Full};
use hyper::body::{Bytes, Incoming};
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use hyper_util::server::conn::auto::Builder as ServerConnBuilder;
use tokio::net::TcpListener;
use tokio::task::LocalSet;

use mlua::{
chunk, Error as LuaError, Function, Lua, String as LuaString, Table, UserData, UserDataMethods,
chunk, Error as LuaError, Function, Lua, RegistryKey, String as LuaString, Table, UserData,
UserDataMethods,
};
use std::{future::Future, net::SocketAddr, pin::Pin, rc::Rc};
use tokio::{net::TcpListener, task::LocalSet};

/// Wrapper around incoming request that implements UserData
struct LuaRequest(SocketAddr, Request<Incoming>);

impl UserData for LuaRequest {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_method("remote_addr", |_lua, req, ()| Ok((req.0).to_string()));
methods.add_method("method", |_lua, req, ()| Ok((req.1).method().to_string()));
methods.add_method("remote_addr", |_, req, ()| Ok((req.0).to_string()));
methods.add_method("method", |_, req, ()| Ok((req.1).method().to_string()));
methods.add_method("path", |_, req, ()| Ok(req.1.uri().path().to_string()));
}
}

pub struct Svc(Rc<Lua>, SocketAddr);
/// Service that handles incoming requests
#[derive(Clone)]
pub struct Svc {
lua: Rc<Lua>,
handler: Rc<RegistryKey>,
peer_addr: SocketAddr,
}

impl Svc {
pub fn new(lua: Rc<Lua>, handler: Rc<RegistryKey>, peer_addr: SocketAddr) -> Self {
Self {
lua,
handler,
peer_addr,
}
}
}

impl Service<Request<Incoming>> for Svc {
type Response = Response<BoxBody<Bytes, hyper::Error>>;
impl hyper::service::Service<Request<Incoming>> for Svc {
type Response = Response<BoxBody<Bytes, Infallible>>;
type Error = LuaError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

fn call(&self, req: Request<Incoming>) -> Self::Future {
// If handler returns an error then generate 5xx response
let lua = self.0.clone();
let lua_req = LuaRequest(self.1, req);
let lua = self.lua.clone();
let handler_key = self.handler.clone();
let lua_req = LuaRequest(self.peer_addr, req);
Box::pin(async move {
let handler: Function = lua.named_registry_value("http_handler")?;
let handler: Function = lua.registry_value(&handler_key)?;
match handler.call_async::<_, Table>(lua_req).await {
Ok(lua_resp) => {
let status = lua_resp.get::<_, Option<u16>>("status")?.unwrap_or(200);
Expand All @@ -43,30 +71,19 @@ impl Service<Request<Incoming>> for Svc {
}
}

// Set body
let body = lua_resp
.get::<_, Option<LuaString>>("body")?
.map(|b| {
Full::new(Bytes::copy_from_slice(b.clone().as_bytes()))
.map_err(|never| match never {})
.boxed()
})
.unwrap_or_else(|| {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
});
.map(|b| Full::new(Bytes::copy_from_slice(b.as_bytes())).boxed())
.unwrap_or_else(|| Empty::<Bytes>::new().boxed());

Ok(resp.body(body).unwrap())
}
Err(err) => {
eprintln!("{}", err);
Ok(Response::builder()
.status(500)
.body(
Full::new(Bytes::from("Internal Server Error".as_bytes()))
.map_err(|never| match never {})
.boxed(),
)
.body(Full::new(Bytes::from("Internal Server Error")).boxed())
.unwrap())
}
}
Expand All @@ -79,43 +96,47 @@ async fn main() {
let lua = Rc::new(Lua::new());

// Create Lua handler function
let handler: Function = lua
let handler: RegistryKey = lua
.load(chunk! {
function(req)
return {
status = 200,
headers = {
["X-Req-Method"] = req:method(),
["X-Remote-Addr"] = req:remote_addr(),
},
body = "Hello from Lua!\n"
}
end
function(req)
return {
status = 200,
headers = {
["X-Req-Method"] = req:method(),
["X-Req-Path"] = req:path(),
["X-Remote-Addr"] = req:remote_addr(),
},
body = "Hello from Lua!\n"
}
end
})
.eval()
.expect("cannot create Lua handler");
.expect("Failed to create Lua handler");
let handler = Rc::new(handler);

// Store it in the Registry
lua.set_named_registry_value("http_handler", handler)
.expect("cannot store Lua handler");

let addr = "127.0.0.1:3000";
let listen_addr = "127.0.0.1:3000";
let listener = TcpListener::bind(listen_addr).await.unwrap();
println!("Listening on http://{listen_addr}");

let local = LocalSet::new();
let listener = TcpListener::bind(addr).await.unwrap();
loop {
let (stream, peer_addr) = listener.accept().await.unwrap();
let io = TokioIo::new(stream);
let (stream, peer_addr) = match listener.accept().await {
Ok(x) => x,
Err(err) => {
eprintln!("Failed to accept connection: {err}");
continue;
}
};

let svc = Svc(lua.clone(), peer_addr);
let svc = Svc::new(lua.clone(), handler.clone(), peer_addr);
local
.run_until(async move {
if let Err(err) = auto::Builder::new(LocalExec)
let result = ServerConnBuilder::new(LocalExec)
.http1()
.serve_connection(io, svc)
.await
{
println!("Error serving connection: {:?}", err);
.serve_connection(TokioIo::new(stream), svc)
.await;
if let Err(err) = result {
eprintln!("Error serving connection: {err:?}");
}
})
.await;
Expand All @@ -127,7 +148,7 @@ struct LocalExec;

impl<F> hyper::rt::Executor<F> for LocalExec
where
F: std::future::Future + 'static, // not requiring `Send`
F: Future + 'static, // not requiring `Send`
{
fn execute(&self, fut: F) {
tokio::task::spawn_local(fut);
Expand Down

0 comments on commit 94cef89

Please sign in to comment.