diff --git a/examples/routeguide-tutorial.md b/examples/routeguide-tutorial.md index 428d1de45..7987e5ce9 100644 --- a/examples/routeguide-tutorial.md +++ b/examples/routeguide-tutorial.md @@ -185,6 +185,7 @@ prost = "0.7" futures-core = "0.3" futures-util = "0.3" tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "sync", "time"] } +tokio-stream = "0.1" async-stream = "0.2" serde = { version = "1.0", features = ["derive"] } @@ -273,6 +274,7 @@ use std::pin::Pin; use std::sync::Arc; use tokio::sync::mpsc; use tonic::{Request, Response, Status}; +use tokio_stream::wrappers::ReceiverStream; ``` ```rust @@ -282,7 +284,7 @@ impl RouteGuide for RouteGuideService { unimplemented!() } - type ListFeaturesStream = mpsc::Receiver>; + type ListFeaturesStream = ReceiverStream>; async fn list_features( &self, @@ -402,7 +404,7 @@ Now let's look at one of our streaming RPCs. `list_features` is a server-side st need to send back multiple `Feature`s to our client. ```rust -type ListFeaturesStream = mpsc::Receiver>; +type ListFeaturesStream = ReceiverStream>; async fn list_features( &self, @@ -419,7 +421,7 @@ async fn list_features( } }); - Ok(Response::new(rx)) + Ok(Response::new(ReceiverStream::new(rx))) } ``` diff --git a/examples/src/routeguide/server.rs b/examples/src/routeguide/server.rs index fea85738b..98d15eaf2 100644 --- a/examples/src/routeguide/server.rs +++ b/examples/src/routeguide/server.rs @@ -5,6 +5,7 @@ use std::time::Instant; use futures::{Stream, StreamExt}; use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Server; use tonic::{Request, Response, Status}; @@ -36,8 +37,7 @@ impl RouteGuide for RouteGuideService { Ok(Response::new(Feature::default())) } - type ListFeaturesStream = - Pin> + Send + Sync + 'static>>; + type ListFeaturesStream = ReceiverStream>; async fn list_features( &self, @@ -59,9 +59,7 @@ impl RouteGuide for RouteGuideService { println!(" /// done sending"); }); - Ok(Response::new(Box::pin( - tokio_stream::wrappers::ReceiverStream::new(rx), - ))) + Ok(Response::new(ReceiverStream::new(rx))) } async fn record_route(