Skip to content

Commit

Permalink
upgrade grpc example deps, fixing #180 (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
LeonHartley authored Aug 23, 2024
1 parent 18ec30a commit dda269f
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 59 deletions.
17 changes: 9 additions & 8 deletions examples/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ publish = false


[dependencies]
tonic = "0.8"
prost = "0.11"
hyper = "0.14"
tonic = "0.12"
prost = "0.13"
hyper = "1.4"
async-stream = "0.3"
turmoil = { path = "../.." }
tracing = "0.1"
tracing-subscriber = "0.3"
tokio = "1"
tower = "0.4"
tower = "0.5"
hyper-util = "0.1"

[build-dependencies]
tonic-build = "0.8"
prost = "0.11"
prost-build = "0.11"
protox = "0.2"
tonic-build = "0.12"
prost = "0.13"
prost-build = "0.13"
protox = "0.7.0"
120 changes: 69 additions & 51 deletions examples/grpc/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
use hyper::server::accept::from_stream;
use hyper::Server;
use std::net::{IpAddr, Ipv4Addr};
use tonic::transport::Endpoint;
use tonic::transport::{Endpoint, Server};
use tonic::Status;
use tonic::{Request, Response};
use tower::make::Shared;
use tracing::{info_span, Instrument};
use turmoil::{net, Builder};
use turmoil::Builder;

#[allow(non_snake_case)]
mod proto {
tonic::include_proto!("helloworld");
}

use crate::connector::{TurmoilTcpConnector, TurmoilTcpStream};
use crate::proto::greeter_client::GreeterClient;
use proto::greeter_server::{Greeter, GreeterServer};
use proto::{HelloReply, HelloRequest};

use crate::proto::greeter_client::GreeterClient;
use turmoil::net::TcpListener;

fn main() {
configure_tracing();
Expand All @@ -30,15 +28,16 @@ fn main() {
sim.host("server", move || {
let greeter = greeter.clone();
async move {
Server::builder(from_stream(async_stream::stream! {
let listener = net::TcpListener::bind(addr).await?;
loop {
yield listener.accept().await.map(|(s, _)| s);
}
}))
.serve(Shared::new(greeter))
.await
.unwrap();
Server::builder()
.add_service(greeter)
.serve_with_incoming(async_stream::stream! {
let listener = TcpListener::bind(addr).await?;
loop {
yield listener.accept().await.map(|(s, _)| TurmoilTcpStream(s));
}
})
.await
.unwrap();

Ok(())
}
Expand All @@ -49,7 +48,7 @@ fn main() {
"client",
async move {
let ch = Endpoint::new("http://server:9999")?
.connect_with_connector(connector::connector())
.connect_with_connector(TurmoilTcpConnector)
.await?;
let mut greeter_client = GreeterClient::new(ch);

Expand Down Expand Up @@ -111,68 +110,87 @@ impl Greeter for MyGreeter {
}

mod connector {
use hyper::Uri;
use hyper_util::client::legacy::connect::Connected;
use hyper_util::rt::TokioIo;
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin};

use hyper::{
client::connect::{Connected, Connection},
Uri,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tonic::transport::server::TcpConnectInfo;
use tower::Service;
use turmoil::net::TcpStream;

type Fut = Pin<Box<dyn Future<Output = Result<TurmoilConnection, std::io::Error>> + Send>>;
#[derive(Clone)]
pub struct TurmoilTcpConnector;

impl Service<Uri> for TurmoilTcpConnector {
type Response = TokioIo<TurmoilTcpStream>;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

pub fn connector(
) -> impl Service<Uri, Response = TurmoilConnection, Error = std::io::Error, Future = Fut> + Clone
{
tower::service_fn(|uri: Uri| {
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, uri: Uri) -> Self::Future {
Box::pin(async move {
let conn = TcpStream::connect(uri.authority().unwrap().as_str()).await?;
Ok::<_, std::io::Error>(TurmoilConnection(conn))
}) as Fut
})
let stream = TcpStream::connect(uri.authority().unwrap().as_str()).await?;
Ok(TokioIo::new(TurmoilTcpStream(stream)))
})
}
}

pub struct TurmoilConnection(turmoil::net::TcpStream);
pub struct TurmoilTcpStream(pub TcpStream);

impl AsyncRead for TurmoilConnection {
impl hyper_util::client::legacy::connect::Connection for TurmoilTcpStream {
fn connected(&self) -> Connected {
Connected::new()
}
}

impl tonic::transport::server::Connected for TurmoilTcpStream {
type ConnectInfo = TcpConnectInfo;

fn connect_info(&self) -> Self::ConnectInfo {
TcpConnectInfo {
local_addr: self.0.local_addr().ok(),
remote_addr: self.0.peer_addr().ok(),
}
}
}

impl AsyncRead for TurmoilTcpStream {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}

impl AsyncWrite for TurmoilConnection {
impl AsyncWrite for TurmoilTcpStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
cx: &mut Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.0).poll_flush(cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.0).poll_shutdown(cx)
}
}

impl Connection for TurmoilConnection {
fn connected(&self) -> hyper::client::connect::Connected {
Connected::new()
}
}
}

0 comments on commit dda269f

Please sign in to comment.