From 3387ef900fc03f06b2a8310033e8611484ceffa6 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 6 Dec 2019 00:09:46 -0500 Subject: [PATCH] Upgrade to `tokio 0.2` (#163) --- Cargo.toml | 5 + deny.toml | 2 +- tests/wellknown/Cargo.toml | 2 +- tonic-build/src/client.rs | 2 +- tonic-examples/Cargo.toml | 10 +- tonic-examples/src/load_balance/server.rs | 4 +- tonic-examples/src/routeguide/client.rs | 8 +- tonic-interop/Cargo.toml | 15 +- tonic-interop/src/client.rs | 10 +- tonic-interop/src/server.rs | 8 +- tonic/Cargo.toml | 41 ++-- tonic/benches/bench_main.rs | 2 + tonic/src/body.rs | 20 +- tonic/src/client/grpc.rs | 9 +- tonic/src/codec/decode.rs | 26 +- tonic/src/codec/encode.rs | 26 +- tonic/src/codec/mod.rs | 2 +- tonic/src/codec/prost.rs | 26 +- tonic/src/codec/tests.rs | 285 +++++++++++----------- tonic/src/metadata/encoding.rs | 9 +- tonic/src/metadata/key.rs | 2 +- tonic/src/metadata/map.rs | 30 +-- tonic/src/metadata/value.rs | 4 +- tonic/src/status.rs | 8 +- tonic/src/transport/channel.rs | 6 +- tonic/src/transport/endpoint.rs | 10 +- tonic/src/transport/server.rs | 49 ++-- tonic/src/transport/service/either.rs | 2 +- tonic/src/transport/service/layer.rs | 2 +- tonic/src/transport/service/router.rs | 2 +- 30 files changed, 321 insertions(+), 306 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a34e834b6..b28245237 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,8 @@ members = [ "tests/same_name", "tests/wellknown", ] + +[patch.'https://github.com/tower-rs/tower'] +tower-service = "0.3" +tower-make = "0.3" +tower-layer = "0.3" \ No newline at end of file diff --git a/deny.toml b/deny.toml index 54d9c2c00..a7713265a 100644 --- a/deny.toml +++ b/deny.toml @@ -15,8 +15,8 @@ deny = [ { name = "term" }, ] skip = [ - { name = "crossbeam-utils", version = "=0.6.6" }, { name = "crossbeam-queue", version = "=0.2.0" }, + { name = "bytes", version = "=0.4.12" }, ] skip-tree = [ { name = "rand", version = "=0.6.5" }, diff --git a/tests/wellknown/Cargo.toml b/tests/wellknown/Cargo.toml index 1cd06a11a..3fc98e3b0 100644 --- a/tests/wellknown/Cargo.toml +++ b/tests/wellknown/Cargo.toml @@ -10,7 +10,7 @@ license = "MIT" [dependencies] tonic = { path = "../../tonic" } -bytes = "0.4" +bytes = "0.5" prost = "0.5" prost-types = "0.5" diff --git a/tonic-build/src/client.rs b/tonic-build/src/client.rs index e0a688956..e926fe0c7 100644 --- a/tonic-build/src/client.rs +++ b/tonic-build/src/client.rs @@ -23,7 +23,7 @@ pub(crate) fn generate(service: &Service, proto: &str) -> TokenStream { T::ResponseBody: Body + HttpBody + Send + 'static, T::Error: Into, ::Error: Into + Send, - ::Data: Into + Send, { + { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } diff --git a/tonic-examples/Cargo.toml b/tonic-examples/Cargo.toml index 43955c1d7..43835e67b 100644 --- a/tonic-examples/Cargo.toml +++ b/tonic-examples/Cargo.toml @@ -71,11 +71,11 @@ tonic = { path = "../tonic", features = ["tls"] } bytes = "0.4" prost = "0.5" -tokio = "=0.2.0-alpha.6" -futures-preview = { version = "=0.3.0-alpha.19", default-features = false, features = ["alloc"]} -async-stream = "0.1.2" -http = "0.1" -tower = "=0.3.0-alpha.2" +tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs"] } +futures = { version = "0.3", default-features = false, features = ["alloc"]} +async-stream = "0.2" +http = "0.2" +tower = { git = "https://github.com/tower-rs/tower" } # Required for routeguide serde = { version = "1.0", features = ["derive"] } diff --git a/tonic-examples/src/load_balance/server.rs b/tonic-examples/src/load_balance/server.rs index 67e5d108f..aa95f9144 100644 --- a/tonic-examples/src/load_balance/server.rs +++ b/tonic-examples/src/load_balance/server.rs @@ -60,7 +60,7 @@ async fn main() -> Result<(), Box> { for addr in &addrs { let addr = addr.parse()?; - let mut tx = tx.clone(); + let tx = tx.clone(); let server = EchoServer { addr }; let serve = Server::builder() @@ -72,7 +72,7 @@ async fn main() -> Result<(), Box> { eprintln!("Error = {:?}", e); } - tx.try_send(()).unwrap(); + tx.send(()).unwrap(); }); } diff --git a/tonic-examples/src/routeguide/client.rs b/tonic-examples/src/routeguide/client.rs index 66d769d89..e42871281 100644 --- a/tonic-examples/src/routeguide/client.rs +++ b/tonic-examples/src/routeguide/client.rs @@ -3,8 +3,8 @@ use rand::rngs::ThreadRng; use rand::Rng; use route_guide::{Point, Rectangle, RouteNote}; use std::error::Error; -use std::time::{Duration, Instant}; -use tokio::timer::Interval; +use std::time::Duration; +use tokio::time::Instant; use tonic::transport::Channel; use tonic::Request; @@ -62,9 +62,9 @@ async fn run_route_chat(client: &mut RouteGuideClient) -> Result<(), Bo let start = Instant::now(); let outbound = async_stream::stream! { - let mut interval = Interval::new_interval(Duration::from_secs(1)); + let mut interval = tokio::time::interval(Duration::from_secs(1)); - while let Some(time) = interval.next().await { + while let time = interval.tick().await { let elapsed = time.duration_since(start); let note = RouteNote { location: Some(Point { diff --git a/tonic-interop/Cargo.toml b/tonic-interop/Cargo.toml index f487b949a..1c4af9b0a 100644 --- a/tonic-interop/Cargo.toml +++ b/tonic-interop/Cargo.toml @@ -15,17 +15,18 @@ name = "server" path = "src/bin/server.rs" [dependencies] -tokio = "=0.2.0-alpha.6" +tokio = { version = "0.2", features = ["rt-threaded", "time", "macros", "stream", "fs"] } tonic = { path = "../tonic", features = ["tls"] } prost = "0.5" prost-derive = "0.5" bytes = "0.4" -http = "0.1" -futures-core-preview = "=0.3.0-alpha.19" -futures-util-preview = "=0.3.0-alpha.19" -async-stream = "0.1.2" -tower = "=0.3.0-alpha.2" -http-body = "=0.2.0-alpha.3" +http = "0.2" +futures-core = "0.3" +futures-util = "0.3" +async-stream = "0.2" +# tower = "=0.3.0-alpha.2" +tower = { git = "https://github.com/tower-rs/tower" } +http-body = "0.3" console = "0.9" structopt = "0.2" diff --git a/tonic-interop/src/client.rs b/tonic-interop/src/client.rs index 5aefdf1bf..351c2c3fc 100644 --- a/tonic-interop/src/client.rs +++ b/tonic-interop/src/client.rs @@ -1,5 +1,5 @@ use crate::{pb::client::*, pb::*, test_assert, TestAssertion}; -use futures_util::{future, stream, SinkExt, StreamExt}; +use futures_util::{future, stream, StreamExt}; use tokio::sync::mpsc; use tonic::transport::Channel; use tonic::{metadata::MetadataValue, Code, Request, Response, Status}; @@ -148,8 +148,8 @@ pub async fn server_streaming(client: &mut TestClient, assertions: &mut Vec) { - let (mut tx, rx) = mpsc::unbounded_channel(); - tx.try_send(make_ping_pong_request(0)).unwrap(); + let (tx, rx) = mpsc::unbounded_channel(); + tx.send(make_ping_pong_request(0)).unwrap(); let result = client.full_duplex_call(Request::new(rx)).await; @@ -170,9 +170,7 @@ pub async fn ping_pong(client: &mut TestClient, assertions: &mut Vec { diff --git a/tonic-interop/src/server.rs b/tonic-interop/src/server.rs index 15e7145f4..aa6c425b5 100644 --- a/tonic-interop/src/server.rs +++ b/tonic-interop/src/server.rs @@ -2,7 +2,7 @@ use crate::pb::{self, *}; use async_stream::try_stream; use futures_util::{stream, StreamExt, TryStreamExt}; use std::pin::Pin; -use std::time::{Duration, Instant}; +use std::time::Duration; use tonic::{Code, Request, Response, Status}; pub use pb::server::{TestServiceServer, UnimplementedServiceServer}; @@ -65,8 +65,7 @@ impl pb::server::TestService for TestService { let stream = try_stream! { for param in response_parameters { - let deadline = Instant::now() + Duration::from_micros(param.interval_us as u64); - tokio::timer::delay(deadline).await; + tokio::time::delay_for(Duration::from_micros(param.interval_us as u64)).await; let payload = crate::server_payload(param.size as usize); yield StreamingOutputCallResponse { payload: Some(payload) }; @@ -121,8 +120,7 @@ impl pb::server::TestService for TestService { } for param in msg.response_parameters { - let deadline = Instant::now() + Duration::from_micros(param.interval_us as u64); - tokio::timer::delay(deadline).await; + tokio::time::delay_for(Duration::from_micros(param.interval_us as u64)).await; let payload = crate::server_payload(param.size as usize); yield StreamingOutputCallResponse { payload: Some(payload) }; diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index e65606ac1..3ead36806 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -35,24 +35,24 @@ transport = [ tls = ["tokio-rustls"] tls-roots = ["rustls-native-certs"] -[[bench]] -name = "bench_main" -harness = false +# [[bench]] +# name = "bench_main" +# harness = false [dependencies] -bytes = "0.4" -futures-core-preview = "=0.3.0-alpha.19" -futures-util-preview = { version = "=0.3.0-alpha.19", default-features = false } +bytes = "0.5" +futures-core = "0.3" +futures-util = { version = "0.3", default-features = false } tracing = "0.1" -http = "0.1.14" +http = "0.2" base64 = "0.10" percent-encoding = "1.0.1" -tower-service = "=0.3.0-alpha.2" -tokio-codec = "=0.2.0-alpha.6" -async-stream = "0.1.2" -http-body = "=0.2.0-alpha.3" -pin-project = "^0.4" +tower-service = "0.3" +tokio-util = { version = "0.2", features = ["codec"] } +async-stream = "0.2" +http-body = "0.3" +pin-project = "0.4" # prost prost = { version = "0.5", optional = true } @@ -62,18 +62,19 @@ prost-derive = { version = "0.5", optional = true } async-trait = { version = "0.1.13", optional = true } # transport -hyper = { version = "=0.13.0-alpha.4", features = ["unstable-stream"], optional = true } -tokio = { version = "=0.2.0-alpha.6", default-features = false, features = ["tcp"], optional = true } -tower = { version = "=0.3.0-alpha.2", optional = true} -tower-make = "=0.3.0-alpha.2a" -tower-balance = { version = "=0.3.0-alpha.2", optional = true } -tower-load = { version = "=0.3.0-alpha.2", optional = true } +hyper = { git = "https://github.com/hyperium/hyper", features = ["stream"], optional = true } +tokio = { version = "0.2", features = ["tcp"], optional = true } +tower = { git = "https://github.com/tower-rs/tower", optional = true} +tower-make = { version = "0.3", features = ["connect"] } +tower-balance = { git = "https://github.com/tower-rs/tower", optional = true } +tower-load = { git = "https://github.com/tower-rs/tower", optional = true } # rustls -tokio-rustls = { version = "=0.12.0-alpha.5", optional = true } +tokio-rustls = { version = "0.12", optional = true } rustls-native-certs = { version = "0.1", optional = true } [dev-dependencies] +tokio = { version = "0.2", features = ["rt-core", "macros"] } static_assertions = "1.0" rand = "0.7.2" criterion = "0.3" @@ -81,3 +82,5 @@ criterion = "0.3" [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] + + diff --git a/tonic/benches/bench_main.rs b/tonic/benches/bench_main.rs index bcd89e4ce..fe8cd02bc 100755 --- a/tonic/benches/bench_main.rs +++ b/tonic/benches/bench_main.rs @@ -1,3 +1,5 @@ +#![cfg(feature = "broken")] + use criterion::*; mod benchmarks; diff --git a/tonic/src/body.rs b/tonic/src/body.rs index ee0a662ad..d94393563 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -4,7 +4,7 @@ //! of the types in this module are based around [`http_body::Body`]. use crate::{Error, Status}; -use bytes::{Buf, Bytes, IntoBuf}; +use bytes::{Buf, Bytes}; use http_body::Body as HttpBody; use std::{ fmt, @@ -12,8 +12,6 @@ use std::{ task::{Context, Poll}, }; -pub(crate) type BytesBuf = ::Buf; - /// A trait alias for [`http_body::Body`]. pub trait Body: sealed::Sealed + Send + Sync { /// The body data type. @@ -83,7 +81,7 @@ mod sealed { /// A type erased http body. pub struct BoxBody { - inner: Pin + Send + Sync + 'static>>, + inner: Pin + Send + Sync + 'static>>, } struct MapBody(B); @@ -92,7 +90,7 @@ impl BoxBody { /// Create a new `BoxBody` mapping item and error to the default types. pub fn new(inner: B) -> Self where - B: Body + Send + Sync + 'static, + B: Body + Send + Sync + 'static, { BoxBody { inner: Box::pin(inner), @@ -103,7 +101,7 @@ impl BoxBody { pub fn map_from(inner: B) -> Self where B: Body + Send + Sync + 'static, - B::Data: Into, + // B::Data: Into, B::Error: Into, { BoxBody { @@ -120,7 +118,7 @@ impl BoxBody { } impl HttpBody for BoxBody { - type Data = BytesBuf; + type Data = Bytes; type Error = Status; fn is_end_stream(&self) -> bool { @@ -145,10 +143,10 @@ impl HttpBody for BoxBody { impl HttpBody for MapBody where B: Body, - B::Data: Into, + // B::Data: Into, B::Error: Into, { - type Data = BytesBuf; + type Data = Bytes; type Error = Status; fn is_end_stream(&self) -> bool { @@ -164,7 +162,7 @@ where Pin::new_unchecked(&mut me.0).poll_data(cx) }; match futures_util::ready!(v) { - Some(Ok(i)) => Poll::Ready(Some(Ok(i.into().into_buf()))), + Some(Ok(mut i)) => Poll::Ready(Some(Ok(i.to_bytes()))), Some(Err(e)) => { let err = Status::map_error(e.into()); Poll::Ready(Some(Err(err))) @@ -199,7 +197,7 @@ struct EmptyBody { } impl HttpBody for EmptyBody { - type Data = BytesBuf; + type Data = Bytes; type Error = Status; fn is_end_stream(&self) -> bool { diff --git a/tonic/src/client/grpc.rs b/tonic/src/client/grpc.rs index bcc6cbd2b..45aee8d22 100644 --- a/tonic/src/client/grpc.rs +++ b/tonic/src/client/grpc.rs @@ -4,7 +4,6 @@ use crate::{ codec::{encode_client, Codec, Streaming}, Code, Request, Response, Status, }; -use bytes::Bytes; use futures_core::Stream; use futures_util::{future, stream, TryStreamExt}; use http::{ @@ -60,7 +59,7 @@ impl Grpc { T: GrpcService, T::ResponseBody: Body + HttpBody + Send + 'static, ::Error: Into, - ::Data: Into, + // ::Data: Into, C: Codec, M1: Send + Sync + 'static, M2: Send + Sync + 'static, @@ -80,7 +79,7 @@ impl Grpc { T: GrpcService, T::ResponseBody: Body + HttpBody + Send + 'static, ::Error: Into, - ::Data: Into, + // ::Data: Into, S: Stream + Send + Sync + 'static, C: Codec, M1: Send + Sync + 'static, @@ -113,7 +112,7 @@ impl Grpc { T: GrpcService, T::ResponseBody: Body + HttpBody + Send + 'static, ::Error: Into, - ::Data: Into, + // ::Data: Into, C: Codec, M1: Send + Sync + 'static, M2: Send + Sync + 'static, @@ -132,7 +131,7 @@ impl Grpc { where T: GrpcService, T::ResponseBody: Body + HttpBody + Send + 'static, - ::Data: Into, + // ::Data: Into, ::Error: Into, S: Stream + Send + Sync + 'static, C: Codec, diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index fba1ba281..ed64c414e 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -1,6 +1,6 @@ use super::Decoder; use crate::{body::BoxBody, metadata::MetadataMap, Code, Status}; -use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf}; +use bytes::{Buf, BufMut, BytesMut}; use futures_core::Stream; use futures_util::{future, ready}; use http::StatusCode; @@ -46,7 +46,7 @@ impl Streaming { pub(crate) fn new_response(decoder: D, body: B, status_code: StatusCode) -> Self where B: Body + Send + Sync + 'static, - B::Data: Into, + // B::Data: Into, B::Error: Into, D: Decoder + Send + Sync + 'static, { @@ -56,7 +56,7 @@ impl Streaming { pub(crate) fn new_empty(decoder: D, body: B) -> Self where B: Body + Send + Sync + 'static, - B::Data: Into, + // B::Data: Into, B::Error: Into, D: Decoder + Send + Sync + 'static, { @@ -66,7 +66,7 @@ impl Streaming { pub(crate) fn new_request(decoder: D, body: B) -> Self where B: Body + Send + Sync + 'static, - B::Data: Into, + // B::Data: Into, B::Error: Into, D: Decoder + Send + Sync + 'static, { @@ -76,7 +76,7 @@ impl Streaming { fn new(decoder: D, body: B, direction: Direction) -> Self where B: Body + Send + Sync + 'static, - B::Data: Into, + // B::Data: Into, B::Error: Into, D: Decoder + Send + Sync + 'static, { @@ -154,14 +154,12 @@ impl Streaming { } fn decode_chunk(&mut self) -> Result, Status> { - let mut buf = (&self.buf[..]).into_buf(); - if let State::ReadHeader = self.state { - if buf.remaining() < 5 { + if self.buf.remaining() < 5 { return Ok(None); } - let is_compressed = match buf.get_u8() { + let is_compressed = match self.buf.get_u8() { 0 => false, 1 => { trace!("message compressed, compression not supported yet"); @@ -178,7 +176,7 @@ impl Streaming { )); } }; - let len = buf.get_u32_be() as usize; + let len = self.buf.get_u32() as usize; self.state = State::ReadBody { compression: is_compressed, @@ -189,13 +187,10 @@ impl Streaming { if let State::ReadBody { len, .. } = &self.state { // if we haven't read enough of the message then return and keep // reading - if buf.remaining() < *len || self.buf.len() < *len + 5 { + if self.buf.remaining() < *len || self.buf.len() < *len { return Ok(None); } - // advance past the header - self.buf.advance(5); - match self.decoder.decode(&mut self.buf) { Ok(Some(msg)) => { self.state = State::ReadHeader; @@ -251,8 +246,7 @@ impl Stream for Streaming { self.buf.put(data); } else { // FIXME: improve buf usage. - let buf1 = (&self.buf[..]).into_buf(); - if buf1.has_remaining() { + if self.buf.has_remaining() { trace!("unexpected EOF decoding stream"); Err(Status::new( Code::Internal, diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index 5a0f94b1d..f5d974da3 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -1,5 +1,5 @@ -use crate::{body::BytesBuf, Code, Status}; -use bytes::{BufMut, BytesMut, IntoBuf}; +use crate::{Code, Status}; +use bytes::{BufMut, Bytes, BytesMut}; use futures_core::{Stream, TryStream}; use futures_util::{ready, StreamExt, TryStreamExt}; use http::HeaderMap; @@ -7,14 +7,14 @@ use http_body::Body; use pin_project::pin_project; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio_codec::Encoder; +use tokio_util::codec::Encoder; const BUFFER_SIZE: usize = 8 * 1024; pub(crate) fn encode_server( encoder: T, source: U, -) -> EncodeBody>> +) -> EncodeBody>> where T: Encoder + Send + Sync + 'static, T::Item: Send + Sync, @@ -27,7 +27,7 @@ where pub(crate) fn encode_client( encoder: T, source: U, -) -> EncodeBody>> +) -> EncodeBody>> where T: Encoder + Send + Sync + 'static, T::Item: Send + Sync, @@ -37,7 +37,7 @@ where EncodeBody::new_client(stream) } -fn encode(mut encoder: T, source: U) -> impl TryStream +fn encode(mut encoder: T, source: U) -> impl TryStream where T: Encoder, U: Stream>, @@ -59,12 +59,12 @@ where let len = buf.len() - 5; assert!(len <= std::u32::MAX as usize); { - let mut cursor = std::io::Cursor::new(&mut buf[..5]); - cursor.put_u8(0); // byte must be 0, reserve doesn't auto-zero - cursor.put_u32_be(len as u32); + let mut buf = &mut buf[..5]; + buf.put_u8(0); // byte must be 0, reserve doesn't auto-zero + buf.put_u32(len as u32); } - yield Ok(buf.split_to(len + 5).freeze().into_buf()); + yield Ok(buf.split_to(len + 5).freeze()); }, Some(Err(status)) => yield Err(status), None => break, @@ -90,7 +90,7 @@ pub(crate) struct EncodeBody { impl EncodeBody where - S: Stream> + Send + Sync + 'static, + S: Stream> + Send + Sync + 'static, { pub(crate) fn new_client(inner: S) -> Self { Self { @@ -111,9 +111,9 @@ where impl Body for EncodeBody where - S: Stream>, + S: Stream>, { - type Data = BytesBuf; + type Data = Bytes; type Error = Status; fn is_end_stream(&self) -> bool { diff --git a/tonic/src/codec/mod.rs b/tonic/src/codec/mod.rs index 17f20d31d..e13ca58e0 100644 --- a/tonic/src/codec/mod.rs +++ b/tonic/src/codec/mod.rs @@ -16,7 +16,7 @@ pub(crate) use self::encode::{encode_client, encode_server}; #[cfg(feature = "prost")] #[cfg_attr(docsrs, doc(cfg(feature = "prost")))] pub use self::prost::ProstCodec; -pub use tokio_codec::{Decoder, Encoder}; +pub use tokio_util::codec::{Decoder, Encoder}; use crate::Status; diff --git a/tonic/src/codec/prost.rs b/tonic/src/codec/prost.rs index 9779b3275..08f464af7 100644 --- a/tonic/src/codec/prost.rs +++ b/tonic/src/codec/prost.rs @@ -1,6 +1,6 @@ use super::{Codec, Decoder, Encoder}; use crate::{Code, Status}; -use bytes::{BufMut, BytesMut}; +use bytes::{Buf, BufMut, BytesMut}; use prost::Message; use std::marker::PhantomData; @@ -51,8 +51,14 @@ impl Encoder for ProstEncoder { buf.reserve(len); } - item.encode(buf) - .map_err(|_| unreachable!("Message only errors if not enough space")) + let mut v = Vec::with_capacity(len); + + item.encode(&mut v) + .expect("Message only errors if not enough space"); + + buf.extend(v); + + Ok(()) } } @@ -65,9 +71,19 @@ impl Decoder for ProstDecoder { type Error = Status; fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { - Message::decode(buf.take()) + let mut cursor = std::io::Cursor::new(&buf[..]); + + let item = Message::decode(&mut cursor) .map(Option::Some) - .map_err(from_decode_error) + .map_err(from_decode_error)?; + + let amt = cursor.position() as usize; + + drop(cursor); + + buf.advance(amt); + + Ok(item) } } diff --git a/tonic/src/codec/tests.rs b/tonic/src/codec/tests.rs index ac2ca8c2e..a5292fee6 100644 --- a/tonic/src/codec/tests.rs +++ b/tonic/src/codec/tests.rs @@ -1,145 +1,140 @@ -use super::{ - encode_server, - prost::{ProstDecoder, ProstEncoder}, - Streaming, -}; -use crate::Status; -use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf}; -use http_body::Body; -use prost::Message; -use std::{ - io::Cursor, - pin::Pin, - task::{Context, Poll}, -}; - -#[derive(Clone, PartialEq, prost::Message)] -struct Msg { - #[prost(bytes, tag = "1")] - data: Vec, -} - -#[tokio::test] -async fn decode() { - let decoder = ProstDecoder::::default(); - - let data = vec![0u8; 10000]; - let data_len = data.len(); - let msg = Msg { data }; - - let mut buf = BytesMut::new(); - let len = msg.encoded_len(); - - buf.reserve(len + 5); - buf.put_u8(0); - buf.put_u32_be(len as u32); - msg.encode(&mut buf).unwrap(); - - let body = MockBody { - data: buf.freeze(), - partial_len: 10005, - count: 0, - }; - - let mut stream = Streaming::new_request(decoder, body); - - let mut i = 0usize; - while let Some(msg) = stream.message().await.unwrap() { - assert_eq!(msg.data.len(), data_len); - i += 1; - } - assert_eq!(i, 1); -} - -#[tokio::test] -async fn encode() { - let encoder = ProstEncoder::::default(); - - let data = Vec::from(&[0u8; 1024][..]); - let msg = Msg { data }; - - let messages = std::iter::repeat(Ok::<_, Status>(msg)).take(10000); - let source = futures_util::stream::iter(messages); - - let body = encode_server(encoder, source); - - futures_util::pin_mut!(body); - - while let Some(r) = body.next().await { - r.unwrap(); - } -} - -#[derive(Debug)] -struct MockBody { - data: Bytes, - - // the size of the partial message to send - partial_len: usize, - - // the number of times we've sent - count: usize, -} - -impl Body for MockBody { - type Data = Data; - type Error = Status; - - fn poll_data( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - // every other call to poll_data returns data - let should_send = self.count % 2 == 0; - let data_len = self.data.len(); - let partial_len = self.partial_len; - let count = self.count; - if data_len > 0 { - let result = if should_send { - let response = self - .data - .split_to(if count == 0 { partial_len } else { data_len }) - .into_buf(); - Poll::Ready(Some(Ok(Data(response)))) - } else { - cx.waker().wake_by_ref(); - Poll::Pending - }; - // make some fake progress - self.count += 1; - result - } else { - Poll::Ready(None) - } - } - - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - drop(cx); - Poll::Ready(Ok(None)) - } -} - -struct Data(Cursor); - -impl Into for Data { - fn into(self) -> Bytes { - self.0.into_inner() - } -} - -impl Buf for Data { - fn remaining(&self) -> usize { - self.0.remaining() - } - - fn bytes(&self) -> &[u8] { - self.0.bytes() - } - - fn advance(&mut self, cnt: usize) { - self.0.advance(cnt) - } -} +// use super::{ +// encode_server, +// prost::{ProstDecoder, ProstEncoder}, +// Streaming, +// }; +// use crate::Status; +// use bytes04 as bytes; +// use bytes04::{Buf, BufMut, Bytes, BytesMut}; +// use http_body::Body; +// use prost::Message; +// use std::{ +// io::Cursor, +// pin::Pin, +// task::{Context, Poll}, +// }; + +// #[derive(Clone, PartialEq, prost::Message)] +// struct Msg { +// #[prost(bytes, tag = "1")] +// data: Vec, +// } + +// #[tokio::test] +// async fn decode() { +// let decoder = ProstDecoder::::default(); + +// let data = vec![0u8; 10000]; +// let data_len = data.len(); +// let msg = Msg { data }; + +// let mut buf = BytesMut::new(); +// let len = msg.encoded_len(); + +// buf.reserve(len + 5); +// buf.put_u8(0); +// buf.put_u32_be(len as u32); + +// msg.encode(&mut buf).unwrap(); + +// let body = body::MockBody::new(&buf[..], 10005, 0); + +// let mut stream = Streaming::new_request(decoder, body); + +// let mut i = 0usize; +// while let Some(msg) = stream.message().await.unwrap() { +// assert_eq!(msg.data.len(), data_len); +// i += 1; +// } +// assert_eq!(i, 1); +// } + +// #[tokio::test] +// async fn encode() { +// let encoder = ProstEncoder::::default(); + +// let data = Vec::from(&[0u8; 1024][..]); +// let msg = Msg { data }; + +// let messages = std::iter::repeat(Ok::<_, Status>(msg)).take(10000); +// let source = futures_util::stream::iter(messages); + +// let body = encode_server(encoder, source); + +// futures_util::pin_mut!(body); + +// while let Some(r) = body.next().await { +// r.unwrap(); +// } +// } + +// mod body { +// use crate::Status; +// use bytes::Bytes; +// use http_body::Body; +// use std::{ +// pin::Pin, +// task::{Context, Poll}, +// }; + +// #[derive(Debug)] +// pub struct MockBody { +// data: Bytes, + +// // the size of the partial message to send +// partial_len: usize, + +// // the number of times we've sent +// count: usize, +// } + +// impl MockBody { +// pub fn new(b: &[u8], partial_len: usize, count) -> Self { +// MockBody { +// data: Bytes::copy_from_slice(&b[..]), +// partial_len, +// count +// } +// } +// } + +// impl Body for MockBody { +// type Data = Bytes; +// type Error = Status; + +// fn poll_data( +// mut self: Pin<&mut Self>, +// cx: &mut Context<'_>, +// ) -> Poll>> { +// // every other call to poll_data returns data +// let should_send = self.count % 2 == 0; +// let data_len = self.data.len(); +// let partial_len = self.partial_len; +// let count = self.count; +// if data_len > 0 { +// let result = if should_send { +// let response = +// self.data +// .split_to(if count == 0 { partial_len } else { data_len }); +// Poll::Ready(Some(Ok(response))) +// } else { +// cx.waker().wake_by_ref(); +// Poll::Pending +// }; +// // make some fake progress +// self.count += 1; +// result +// } else { +// Poll::Ready(None) +// } +// } + +// fn poll_trailers( +// self: Pin<&mut Self>, +// cx: &mut Context<'_>, +// ) -> Poll, Self::Error>> { +// drop(cx); +// Poll::Ready(Ok(None)) +// } +// } +// } diff --git a/tonic/src/metadata/encoding.rs b/tonic/src/metadata/encoding.rs index 69cfbd0da..38c2db6de 100644 --- a/tonic/src/metadata/encoding.rs +++ b/tonic/src/metadata/encoding.rs @@ -70,7 +70,7 @@ impl self::value_encoding::Sealed for Ascii { } fn from_shared(value: Bytes) -> Result { - HeaderValue::from_shared(value).map_err(|_| InvalidMetadataValueBytes::new()) + HeaderValue::from_maybe_shared(value).map_err(|_| InvalidMetadataValueBytes::new()) } fn from_static(value: &'static str) -> HeaderValue { @@ -78,7 +78,7 @@ impl self::value_encoding::Sealed for Ascii { } fn decode(value: &[u8]) -> Result { - Ok(Bytes::from(value)) + Ok(Bytes::copy_from_slice(value)) } fn equals(a: &HeaderValue, b: &[u8]) -> bool { @@ -112,7 +112,8 @@ impl self::value_encoding::Sealed for Binary { fn from_bytes(value: &[u8]) -> Result { let encoded_value: String = base64::encode_config(value, base64::STANDARD_NO_PAD); - HeaderValue::from_shared(encoded_value.into()).map_err(|_| InvalidMetadataValueBytes::new()) + HeaderValue::from_maybe_shared(Bytes::from(encoded_value)) + .map_err(|_| InvalidMetadataValueBytes::new()) } fn from_shared(value: Bytes) -> Result { @@ -126,7 +127,7 @@ impl self::value_encoding::Sealed for Binary { unsafe { // Because this is valid base64 this must be a valid HTTP header value, // no need to check again by calling from_shared. - HeaderValue::from_shared_unchecked(Bytes::from_static(value.as_ref())) + HeaderValue::from_maybe_shared_unchecked(Bytes::from_static(value.as_ref())) } } diff --git a/tonic/src/metadata/key.rs b/tonic/src/metadata/key.rs index 5a919d3ae..d287a6f96 100644 --- a/tonic/src/metadata/key.rs +++ b/tonic/src/metadata/key.rs @@ -193,7 +193,7 @@ impl<'a, VE: ValueEncoding> From<&'a MetadataKey> for MetadataKey { impl From> for Bytes { #[inline] fn from(name: MetadataKey) -> Bytes { - name.inner.into() + Bytes::copy_from_slice(name.inner.as_ref()) } } diff --git a/tonic/src/metadata/map.rs b/tonic/src/metadata/map.rs index 9dab15b9d..177bb3c96 100644 --- a/tonic/src/metadata/map.rs +++ b/tonic/src/metadata/map.rs @@ -2176,9 +2176,7 @@ mod as_metadata_key { self, map: &mut MetadataMap, ) -> Result, InvalidMetadataKey> { - map.headers - .entry(self.inner) - .map_err(|_| InvalidMetadataKey::new()) + Ok(map.headers.entry(self.inner)) } #[doc(hidden)] @@ -2221,9 +2219,7 @@ mod as_metadata_key { self, map: &mut MetadataMap, ) -> Result, InvalidMetadataKey> { - map.headers - .entry(&self.inner) - .map_err(|_| InvalidMetadataKey::new()) + Ok(map.headers.entry(&self.inner)) } #[doc(hidden)] @@ -2278,9 +2274,11 @@ mod as_metadata_key { if !VE::is_valid_key(self) { return Err(InvalidMetadataKey::new()); } - map.headers - .entry(self) - .map_err(|_| InvalidMetadataKey::new()) + + let key = http::header::HeaderName::from_bytes(self.as_bytes()) + .map_err(|_| InvalidMetadataKey::new())?; + let entry = map.headers.entry(key); + Ok(entry) } #[doc(hidden)] @@ -2338,9 +2336,10 @@ mod as_metadata_key { if !VE::is_valid_key(self.as_str()) { return Err(InvalidMetadataKey::new()); } - map.headers - .entry(self.as_str()) - .map_err(|_| InvalidMetadataKey::new()) + + let key = http::header::HeaderName::from_bytes(self.as_bytes()) + .map_err(|_| InvalidMetadataKey::new())?; + Ok(map.headers.entry(key)) } #[doc(hidden)] @@ -2398,9 +2397,10 @@ mod as_metadata_key { if !VE::is_valid_key(self) { return Err(InvalidMetadataKey::new()); } - map.headers - .entry(self.as_str()) - .map_err(|_| InvalidMetadataKey::new()) + + let key = http::header::HeaderName::from_bytes(self.as_bytes()) + .map_err(|_| InvalidMetadataKey::new())?; + Ok(map.headers.entry(key)) } #[doc(hidden)] diff --git a/tonic/src/metadata/value.rs b/tonic/src/metadata/value.rs index 44cedc5b7..63ecc240f 100644 --- a/tonic/src/metadata/value.rs +++ b/tonic/src/metadata/value.rs @@ -139,7 +139,7 @@ impl MetadataValue { #[inline] pub unsafe fn from_shared_unchecked(src: Bytes) -> Self { MetadataValue { - inner: HeaderValue::from_shared_unchecked(src), + inner: HeaderValue::from_maybe_shared_unchecked(src), phantom: PhantomData, } } @@ -510,7 +510,7 @@ impl FromStr for MetadataValue { impl From> for Bytes { #[inline] fn from(value: MetadataValue) -> Bytes { - Bytes::from(value.inner) + Bytes::copy_from_slice(value.inner.as_bytes()) } } diff --git a/tonic/src/status.rs b/tonic/src/status.rs index 17ad3bf5f..7708719a5 100644 --- a/tonic/src/status.rs +++ b/tonic/src/status.rs @@ -325,7 +325,7 @@ impl Status { .unwrap_or_else(|| Ok(String::new())); let details = header_map .get(GRPC_STATUS_DETAILS_HEADER) - .map(|h| Bytes::from(h.as_bytes())) + .map(|h| Bytes::copy_from_slice(h.as_bytes())) .unwrap_or_else(Bytes::new); match error_message { Ok(message) => Status { @@ -380,19 +380,19 @@ impl Status { .to_string() .into() } else { - Bytes::from(self.message().as_bytes()) + Bytes::copy_from_slice(self.message().as_bytes()) }; header_map.insert( GRPC_STATUS_MESSAGE_HEADER, - HeaderValue::from_shared(to_write).map_err(invalid_header_value_byte)?, + HeaderValue::from_maybe_shared(to_write).map_err(invalid_header_value_byte)?, ); } if !self.details.is_empty() { header_map.insert( GRPC_STATUS_DETAILS_HEADER, - HeaderValue::from_shared(self.details.clone()) + HeaderValue::from_maybe_shared(self.details.clone()) .map_err(invalid_header_value_byte)?, ); } diff --git a/tonic/src/transport/channel.rs b/tonic/src/transport/channel.rs index 2d8eba0dd..0017addd5 100644 --- a/tonic/src/transport/channel.rs +++ b/tonic/src/transport/channel.rs @@ -7,7 +7,7 @@ use super::{ use crate::{body::BoxBody, client::GrpcService}; use bytes::Bytes; use http::{ - uri::{InvalidUriBytes, Uri}, + uri::{InvalidUri, Uri}, Request, Response, }; use std::{ @@ -88,8 +88,8 @@ impl Channel { /// # use tonic::transport::Channel; /// Channel::from_shared("https://example.com"); /// ``` - pub fn from_shared(s: impl Into) -> Result { - let uri = Uri::from_shared(s.into())?; + pub fn from_shared(s: impl Into) -> Result { + let uri = Uri::from_maybe_shared(s.into())?; Ok(Self::builder(uri)) } diff --git a/tonic/src/transport/endpoint.rs b/tonic/src/transport/endpoint.rs index 0a4f32dcc..6db484976 100644 --- a/tonic/src/transport/endpoint.rs +++ b/tonic/src/transport/endpoint.rs @@ -5,7 +5,7 @@ use super::{ tls::{Certificate, Identity}, }; use bytes::Bytes; -use http::uri::{InvalidUriBytes, Uri}; +use http::uri::{InvalidUri, Uri}; use std::{ convert::{TryFrom, TryInto}, fmt, @@ -63,8 +63,8 @@ impl Endpoint { /// # use tonic::transport::Endpoint; /// Endpoint::from_shared("https://example.com".to_string()); /// ``` - pub fn from_shared(s: impl Into) -> Result { - let uri = Uri::from_shared(s.into())?; + pub fn from_shared(s: impl Into) -> Result { + let uri = Uri::from_maybe_shared(s.into())?; Ok(Self::from(uri)) } @@ -179,7 +179,7 @@ impl From for Endpoint { } impl TryFrom for Endpoint { - type Error = InvalidUriBytes; + type Error = InvalidUri; fn try_from(t: Bytes) -> Result { Self::from_shared(t) @@ -187,7 +187,7 @@ impl TryFrom for Endpoint { } impl TryFrom for Endpoint { - type Error = InvalidUriBytes; + type Error = InvalidUri; fn try_from(t: String) -> Result { Self::from_shared(t.into_bytes()) diff --git a/tonic/src/transport/server.rs b/tonic/src/transport/server.rs index 9c87c14bf..f6794b0a9 100644 --- a/tonic/src/transport/server.rs +++ b/tonic/src/transport/server.rs @@ -5,7 +5,10 @@ use super::service::{layer_fn, BoxedIo, Or, Routes, ServiceBuilderExt}; use super::{service::TlsAcceptor, tls::Identity, Certificate}; use crate::body::BoxBody; use futures_core::Stream; -use futures_util::{future, ready, try_future::MapErr, TryFutureExt, TryStreamExt}; +use futures_util::{ + future::{self, MapErr}, + ready, TryFutureExt, TryStreamExt, +}; use http::{Request, Response}; use hyper::{ server::{accept::Accept, conn}, @@ -21,7 +24,7 @@ use std::{ // time::Duration, }; use tower::{ - layer::{util::Stack, Layer}, + layer::{Layer, Stack}, limit::concurrency::ConcurrencyLimitLayer, // timeout::TimeoutLayer, Service, @@ -203,28 +206,30 @@ impl Server { let max_concurrent_streams = self.max_concurrent_streams; // let timeout = self.timeout.clone(); - let incoming = hyper::server::accept::from_stream(async_stream::try_stream! { - let mut tcp = TcpIncoming::bind(addr)?; - - while let Some(stream) = tcp.try_next().await? { - #[cfg(feature = "tls")] - { - if let Some(tls) = &self.tls { - let io = match tls.connect(stream.into_inner()).await { - Ok(io) => io, - Err(error) => { - error!(message = "Unable to accept incoming connection.", %error); - continue - }, - }; - yield BoxedIo::new(io); - continue; + let incoming = hyper::server::accept::from_stream::<_, _, crate::Error>( + async_stream::try_stream! { + let mut tcp = TcpIncoming::bind(addr)?; + + while let Some(stream) = tcp.try_next().await? { + #[cfg(feature = "tls")] + { + if let Some(tls) = &self.tls { + let io = match tls.connect(stream.into_inner()).await { + Ok(io) => io, + Err(error) => { + error!(message = "Unable to accept incoming connection.", %error); + continue + }, + }; + yield BoxedIo::new(io); + continue; + } } - } - yield BoxedIo::new(stream); - } - }); + yield BoxedIo::new(stream); + } + }, + ); let svc = MakeSvc { inner: svc, diff --git a/tonic/src/transport/service/either.rs b/tonic/src/transport/service/either.rs index b4eab6522..a4cb7bdae 100644 --- a/tonic/src/transport/service/either.rs +++ b/tonic/src/transport/service/either.rs @@ -1,4 +1,4 @@ -use futures_util::try_future::{MapErr, TryFutureExt}; +use futures_util::future::{MapErr, TryFutureExt}; use std::{ future::Future, pin::Pin, diff --git a/tonic/src/transport/service/layer.rs b/tonic/src/transport/service/layer.rs index 4d326881e..7d3d17d89 100644 --- a/tonic/src/transport/service/layer.rs +++ b/tonic/src/transport/service/layer.rs @@ -1,6 +1,6 @@ use super::either::Either; use tower::{ - layer::{util::Stack, Layer}, + layer::{Layer, Stack}, ServiceBuilder, }; pub(crate) trait ServiceBuilderExt { diff --git a/tonic/src/transport/service/router.rs b/tonic/src/transport/service/router.rs index 484a9a670..490c588a5 100644 --- a/tonic/src/transport/service/router.rs +++ b/tonic/src/transport/service/router.rs @@ -1,6 +1,6 @@ use futures_util::{ future::Either, - try_future::{MapErr, TryFutureExt}, + future::{MapErr, TryFutureExt}, }; use std::{ fmt,