Skip to content

Commit

Permalink
Upgrade to Tokio 1.0.0 ecosystem (#530)
Browse files Browse the repository at this point in the history
* Upgrade Tonic to Tokio 1.0

Work in progress for updating Tonic to Tokio 1.0. Since tower has not
been released to crates.io, a git dependency is taken instead.

* Upgrade Tonic to Tokio 1.0 phase 2

* tonic: remove tower-* deps

* Apply suggestions from code review

Co-authored-by: Ed Marshall <[email protected]>
Co-authored-by: Lucio Franco <[email protected]>
  • Loading branch information
3 people authored Jan 11, 2021
1 parent fe4d5b9 commit fdda5ae
Show file tree
Hide file tree
Showing 42 changed files with 185 additions and 185 deletions.
21 changes: 11 additions & 10 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,28 +148,29 @@ path = "src/hyper_warp_multiplex/server.rs"

[dependencies]
tonic = { path = "../tonic", features = ["tls"] }
prost = "0.6"
tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs", "macros", "uds"] }
prost = "0.7"
tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net"] }
tokio-stream = { version = "0.1", features = ["net"] }
async-stream = "0.3"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
async-stream = "0.2"
tower = "0.3"
tower = { version = "0.4" }
# Required for routeguide
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rand = "0.7"
rand = "0.8"
# Tracing
tracing = "0.1.16"
tracing-subscriber = { version = "0.2", features = ["tracing-log"] }
tracing-attributes = "0.1"
tracing-futures = "0.2"
# Required for wellknown types
prost-types = "0.6"
prost-types = "0.7"
# Hyper example
hyper = "0.13"
warp = { version = "0.2", default-features = false }
hyper = "0.14"
warp = { git = "https://github.com/aknuds1/warp", branch = "chore/upgrade-tokio", default-features = false }
http = "0.2"
http-body = "0.3"
pin-project = "0.4.17"
http-body = "0.4"
pin-project = "1.0"
# Health example
tonic-health = { path = "../tonic-health" }
listenfd = "0.3"
Expand Down
6 changes: 3 additions & 3 deletions examples/routeguide-tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ use futures_util::stream;
```rust
async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let mut rng = rand::thread_rng();
let point_count: i32 = rng.gen_range(2, 100);
let point_count: i32 = rng.gen_range(2..100);

let mut points = vec![];
for _ in 0..=point_count {
Expand All @@ -723,8 +723,8 @@ async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(),

```rust
fn random_point(rng: &mut ThreadRng) -> Point {
let latitude = (rng.gen_range(0, 180) - 90) * 10_000_000;
let longitude = (rng.gen_range(0, 360) - 180) * 10_000_000;
let latitude = (rng.gen_range(0..180) - 90) * 10_000_000;
let longitude = (rng.gen_range(0..360) - 180) * 10_000_000;
Point {
latitude,
longitude,
Expand Down
6 changes: 4 additions & 2 deletions examples/src/autoreload/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

match listenfd::ListenFd::from_env().take_tcp_listener(0)? {
Some(listener) => {
let mut listener = tokio::net::TcpListener::from_std(listener)?;
let listener = tokio_stream::wrappers::TcpListenerStream::new(
tokio::net::TcpListener::from_std(listener)?,
);

server.serve_with_incoming(listener.incoming()).await?;
server.serve_with_incoming(listener).await?;
}
None => {
server.serve(addr).await?;
Expand Down
6 changes: 1 addition & 5 deletions examples/src/blocking/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ impl BlockingClient {
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let mut rt = Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap();
let rt = Builder::new_multi_thread().enable_all().build().unwrap();
let client = rt.block_on(GreeterClient::connect(dst))?;

Ok(Self { rt, client })
Expand Down
16 changes: 8 additions & 8 deletions examples/src/dynamic_load_balance/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,42 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let e1 = Endpoint::from_static("http://[::1]:50051");
let e2 = Endpoint::from_static("http://[::1]:50052");

let (channel, mut rx) = Channel::balance_channel(10);
let (channel, rx) = Channel::balance_channel(10);
let mut client = EchoClient::new(channel);

let done = Arc::new(AtomicBool::new(false));
let demo_done = done.clone();
tokio::spawn(async move {
tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("Added first endpoint");
let change = Change::Insert("1", e1);
let res = rx.send(change).await;
println!("{:?}", res);
tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("Added second endpoint");
let change = Change::Insert("2", e2);
let res = rx.send(change).await;
println!("{:?}", res);
tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("Removed first endpoint");
let change = Change::Remove("1");
let res = rx.send(change).await;
println!("{:?}", res);

tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("Removed second endpoint");
let change = Change::Remove("2");
let res = rx.send(change).await;
println!("{:?}", res);

tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("Added third endpoint");
let e3 = Endpoint::from_static("http://[::1]:50051");
let change = Change::Insert("3", e3);
let res = rx.send(change).await;
println!("{:?}", res);

tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("Removed third endpoint");
let change = Change::Remove("3");
let res = rx.send(change).await;
Expand All @@ -62,7 +62,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

while !done.load(SeqCst) {
tokio::time::delay_for(tokio::time::Duration::from_millis(500)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let request = tonic::Request::new(EchoRequest {
message: "hello".into(),
});
Expand Down
3 changes: 1 addition & 2 deletions examples/src/health/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use tonic::{transport::Server, Request, Response, Status};
use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloReply, HelloRequest};
use std::time::Duration;
use tokio::time::delay_for;
use tonic_health::server::HealthReporter;

pub mod hello_world {
Expand Down Expand Up @@ -34,7 +33,7 @@ async fn twiddle_service_status(mut reporter: HealthReporter) {
let mut iter = 0u64;
loop {
iter += 1;
delay_for(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(1)).await;

if iter % 2 == 0 {
reporter.set_serving::<GreeterServer<MyGreeter>>().await;
Expand Down
6 changes: 3 additions & 3 deletions examples/src/routeguide/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Bo

async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let mut rng = rand::thread_rng();
let point_count: i32 = rng.gen_range(2, 100);
let point_count: i32 = rng.gen_range(2..100);

let mut points = vec![];
for _ in 0..=point_count {
Expand Down Expand Up @@ -115,8 +115,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

fn random_point(rng: &mut ThreadRng) -> Point {
let latitude = (rng.gen_range(0, 180) - 90) * 10_000_000;
let longitude = (rng.gen_range(0, 360) - 180) * 10_000_000;
let latitude = (rng.gen_range(0..180) - 90) * 10_000_000;
let longitude = (rng.gen_range(0..360) - 180) * 10_000_000;
Point {
latitude,
longitude,
Expand Down
9 changes: 6 additions & 3 deletions examples/src/routeguide/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ impl RouteGuide for RouteGuideService {
Ok(Response::new(Feature::default()))
}

type ListFeaturesStream = mpsc::Receiver<Result<Feature, Status>>;
type ListFeaturesStream =
Pin<Box<dyn Stream<Item = Result<Feature, Status>> + Send + Sync + 'static>>;

async fn list_features(
&self,
request: Request<Rectangle>,
) -> Result<Response<Self::ListFeaturesStream>, Status> {
println!("ListFeatures = {:?}", request);

let (mut tx, rx) = mpsc::channel(4);
let (tx, rx) = mpsc::channel(4);
let features = self.features.clone();

tokio::spawn(async move {
Expand All @@ -58,7 +59,9 @@ impl RouteGuide for RouteGuideService {
println!(" /// done sending");
});

Ok(Response::new(rx))
Ok(Response::new(Box::pin(
tokio_stream::wrappers::ReceiverStream::new(rx),
)))
}

async fn record_route(
Expand Down
4 changes: 2 additions & 2 deletions examples/src/timeout/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::time::Duration;
use tokio::time::delay_for;
use tokio::time::sleep;
use tonic::{transport::Server, Request, Response, Status};

use hello_world::greeter_server::{Greeter, GreeterServer};
Expand All @@ -20,7 +20,7 @@ impl Greeter for MyGreeter {
) -> Result<Response<HelloReply>, Status> {
println!("Got a request from {:?}", request.remote_addr());

delay_for(Duration::from_millis(5000)).await;
sleep(Duration::from_millis(5000)).await;

let reply = hello_world::HelloReply {
message: format!("Hello {}!", request.into_inner().name),
Expand Down
22 changes: 15 additions & 7 deletions examples/src/uds/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![cfg_attr(not(unix), allow(unused_imports))]

use futures::stream::TryStreamExt;
use futures::TryFutureExt;
use std::path::Path;
#[cfg(unix)]
use tokio::net::UnixListener;
Expand Down Expand Up @@ -40,13 +40,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

tokio::fs::create_dir_all(Path::new(path).parent().unwrap()).await?;

let mut uds = UnixListener::bind(path)?;

let greeter = MyGreeter::default();

let incoming = {
let uds = UnixListener::bind(path)?;

async_stream::stream! {
while let item = uds.accept().map_ok(|(st, _)| unix::UnixStream(st)).await {
yield item;
}
}
};

Server::builder()
.add_service(GreeterServer::new(greeter))
.serve_with_incoming(uds.incoming().map_ok(unix::UnixStream))
.serve_with_incoming(incoming)
.await?;

Ok(())
Expand All @@ -59,7 +67,7 @@ mod unix {
task::{Context, Poll},
};

use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tonic::transport::server::Connected;

#[derive(Debug)]
Expand All @@ -71,8 +79,8 @@ mod unix {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}
Expand Down
23 changes: 12 additions & 11 deletions interop/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,24 @@ name = "server"
path = "src/bin/server.rs"

[dependencies]
tokio = { version = "0.2", features = ["rt-threaded", "time", "macros", "stream", "fs"] }
tokio = { version = "1.0", features = ["rt-multi-thread", "time", "macros", "fs"] }
tokio-stream = "0.1"
async-stream = "0.3"
tonic = { path = "../tonic", features = ["tls"] }
prost = "0.6"
prost-derive = "0.6"
bytes = "0.5"
prost = "0.7"
prost-derive = "0.7"
bytes = "1.0"
http = "0.2"
futures-core = "0.3"
futures-util = "0.3"
async-stream = "0.2"
tower = "0.3"
http-body = "0.3"
hyper = "0.13"
console = "0.9"
tower = { version = "0.4" }
http-body = "0.4"
hyper = "0.14"
console = "0.14"
structopt = "0.3"
tracing = "0.1"
tracing-subscriber = "0.2.0-alpha"
tracing-log = "0.1.0"
tracing-subscriber = "0.2"
tracing-log = "0.1"

[build-dependencies]
tonic-build = { path = "../tonic-build", features = ["prost"] }
6 changes: 5 additions & 1 deletion interop/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ pub async fn ping_pong(client: &mut TestClient, assertions: &mut Vec<TestAsserti
let (tx, rx) = mpsc::unbounded_channel();
tx.send(make_ping_pong_request(0)).unwrap();

let result = client.full_duplex_call(Request::new(rx)).await;
let result = client
.full_duplex_call(Request::new(
tokio_stream::wrappers::UnboundedReceiverStream::new(rx),
))
.await;

assertions.push(test_assert!(
"call must be successful",
Expand Down
5 changes: 1 addition & 4 deletions interop/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,7 @@ pub enum TestAssertion {

impl TestAssertion {
pub fn is_failed(&self) -> bool {
match self {
TestAssertion::Failed { .. } => true,
_ => false,
}
matches!(self, TestAssertion::Failed { .. })
}
}

Expand Down
6 changes: 3 additions & 3 deletions interop/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl pb::test_service_server::TestService for TestService {

let stream = try_stream! {
for param in response_parameters {
tokio::time::delay_for(Duration::from_micros(param.interval_us as u64)).await;
tokio::time::sleep(Duration::from_micros(param.interval_us as u64)).await;

let payload = crate::server_payload(param.size as usize);
yield StreamingOutputCallResponse { payload: Some(payload) };
Expand All @@ -90,7 +90,7 @@ impl pb::test_service_server::TestService for TestService {
) -> Result<StreamingInputCallResponse> {
let mut stream = req.into_inner();

let mut aggregated_payload_size = 0 as i32;
let mut aggregated_payload_size = 0;
while let Some(msg) = stream.try_next().await? {
aggregated_payload_size += msg.payload.unwrap().body.len() as i32;
}
Expand Down Expand Up @@ -127,7 +127,7 @@ impl pb::test_service_server::TestService for TestService {
}

for param in msg.response_parameters {
tokio::time::delay_for(Duration::from_micros(param.interval_us as u64)).await;
tokio::time::sleep(Duration::from_micros(param.interval_us as u64)).await;

let payload = crate::server_payload(param.size as usize);
yield StreamingOutputCallResponse { payload: Some(payload) };
Expand Down
2 changes: 1 addition & 1 deletion tests/ambiguous_methods/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ license = "MIT"

[dependencies]
tonic = { path= "../../tonic" }
prost = "0.6"
prost = "0.7"

[build-dependencies]
tonic-build = { path= "../../tonic-build" }
4 changes: 2 additions & 2 deletions tests/extern_path/my_application/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ license = "MIT"

[dependencies]
tonic = { path= "../../../tonic" }
prost = "0.6"
prost-types = "0.6"
prost = "0.7"
prost-types = "0.7"
uuid = { package = "uuid1", path= "../uuid" }

[build-dependencies]
Expand Down
Loading

0 comments on commit fdda5ae

Please sign in to comment.