From 36757ba9311e56da4f4ffcd95824fd8bec663900 Mon Sep 17 00:00:00 2001 From: Akshay Narayan Date: Thu, 21 Nov 2019 20:45:10 +0000 Subject: [PATCH 1/5] feat: make `Endpoint` connector swappable Change the `connect()` API to lift up the use of `connector()` to `Endpoint::connect()`, allowing users to provide their own implementations (for example, Unix-domain sockets). Any type which impls `tower_make::MakeConnection` is suitable. To avoid breaking the default case of HTTP(S), introduce `connect_with_connector()` and retain `connect()`, which creates the default connector according to the activated feature gate and passes it to `connect_with_connector()`. Fixes: #136 --- tonic/Cargo.toml | 1 + tonic/src/transport/channel.rs | 45 +++++++++++++++++++-- tonic/src/transport/endpoint.rs | 49 ++++++++++++++++++++++- tonic/src/transport/service/connection.rs | 24 +++++------ tonic/src/transport/service/connector.rs | 1 + tonic/src/transport/service/discover.rs | 21 +++++++--- 6 files changed, 118 insertions(+), 23 deletions(-) diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index e65606ac1..411b53ff3 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -74,6 +74,7 @@ tokio-rustls = { version = "=0.12.0-alpha.5", optional = true } rustls-native-certs = { version = "0.1", optional = true } [dev-dependencies] +hyper-unix-connector = "0.1.1" static_assertions = "1.0" rand = "0.7.2" criterion = "0.3" diff --git a/tonic/src/transport/channel.rs b/tonic/src/transport/channel.rs index 2d8eba0dd..28d8f2874 100644 --- a/tonic/src/transport/channel.rs +++ b/tonic/src/transport/channel.rs @@ -97,7 +97,16 @@ impl Channel { /// /// This creates a [`Channel`] that will load balance accross all the /// provided endpoints. - pub fn balance_list(list: impl Iterator) -> Self { + pub fn balance_list_with_connector( + list: impl Iterator, + connector: C, + ) -> Self + where + C: tower_make::MakeConnection + Send + Clone + Unpin + 'static, + C::Connection: Unpin + Send + 'static, + C::Future: Send + 'static, + C::Error: Into> + Send, + { let list = list.collect::>(); let buffer_size = list @@ -111,16 +120,44 @@ impl Channel { .next() .and_then(|e| e.interceptor_headers.clone()); - let discover = ServiceList::new(list); + let discover = ServiceList::new(list, connector); Self::balance(discover, buffer_size, interceptor_headers) } - pub(crate) async fn connect(endpoint: Endpoint) -> Result { + /// Balance a list of [`Endpoint`]'s. + /// + /// This creates a [`Channel`] that will load balance accross all the + /// provided endpoints. + pub fn balance_list(list: impl Iterator) -> Self { + // Backwards API compatibility. + // Uses TCP if the TLS feature is not enabled, and TLS otherwise. + + let list = list.collect::>(); + + #[cfg(feature = "tls")] + let connector = { + let tls_connector = list.iter().next().and_then(|e| e.tls.clone()); + super::service::connector(tls_connector) + }; + + #[cfg(not(feature = "tls"))] + let connector = super::service::connector(); + + Channel::balance_list_with_connector(list.into_iter(), connector) + } + + pub(crate) async fn connect(endpoint: Endpoint, connector: C) -> Result + where + C: tower_make::MakeConnection + Send + 'static, + C::Connection: Unpin + Send + 'static, + C::Future: Send + 'static, + C::Error: Into> + Send, + { let buffer_size = endpoint.buffer_size.clone().unwrap_or(DEFAULT_BUFFER_SIZE); let interceptor_headers = endpoint.interceptor_headers.clone(); - let svc = Connection::new(endpoint) + let svc = Connection::new(endpoint, connector) .await .map_err(|e| super::Error::from_source(super::ErrorKind::Client, e))?; diff --git a/tonic/src/transport/endpoint.rs b/tonic/src/transport/endpoint.rs index 0a4f32dcc..4a08ab5dc 100644 --- a/tonic/src/transport/endpoint.rs +++ b/tonic/src/transport/endpoint.rs @@ -157,7 +157,54 @@ impl Endpoint { /// Create a channel from this config. pub async fn connect(&self) -> Result { - Channel::connect(self.clone()).await + // Backwards API compatibility. + // Uses TCP if the TLS feature is not enabled, and TLS otherwise. + + #[cfg(feature = "tls")] + let connector = super::service::connector(self.tls.clone()); + + #[cfg(not(feature = "tls"))] + let connector = super::service::connector(); + + self.connect_with_connector(connector).await + } + + /// Create a channel using a custom connector. + /// + /// The [`tower_make::MakeConnection`] requirement is an alias for `tower::Service` - for example, a TCP stream as in [`Endpoint::connect`] above. + /// + /// # Example + /// ```rust + /// use hyper::client::connect::HttpConnector; + /// use tonic::transport::Endpoint; + /// + /// // note: This connector is the same as the default provided in `connect()`. + /// let mut connector = HttpConnector::new(); + /// connector.enforce_http(false); + /// connector.set_nodelay(true); + /// + /// let endpoint = Endpoint::from_static("http://example.com"); + /// endpoint.connect_with_connector(connector); //.await + /// ``` + /// + /// # Example with non-default Connector + /// ```rust + /// // Use for unix-domain sockets + /// use hyper_unix_connector::UnixClient; + /// use tonic::transport::Endpoint; + /// + /// let endpoint = Endpoint::from_static("http://example.com"); + /// endpoint.connect_with_connector(UnixClient); //.await + /// ``` + pub async fn connect_with_connector(&self, connector: C) -> Result + where + C: tower_make::MakeConnection + Send + 'static, + C::Connection: Unpin + Send + 'static, + C::Future: Send + 'static, + C::Error: Into> + Send, + { + Channel::connect(self.clone(), connector).await } } diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index 47693b35e..663148ed0 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -1,4 +1,4 @@ -use super::{connector, layer::ServiceBuilderExt, reconnect::Reconnect, AddOrigin}; +use super::{layer::ServiceBuilderExt, reconnect::Reconnect, AddOrigin}; use crate::{body::BoxBody, transport::Endpoint}; use hyper::client::conn::Builder; use hyper::client::service::Connect as HyperConnect; @@ -26,19 +26,23 @@ pub(crate) struct Connection { } impl Connection { - pub(crate) async fn new(endpoint: Endpoint) -> Result { - #[cfg(feature = "tls")] - let connector = connector(endpoint.tls.clone()); - - #[cfg(not(feature = "tls"))] - let connector = connector(); - + pub(crate) async fn new(endpoint: Endpoint, connector: C) -> Result + where + C: tower_make::MakeConnection + Send + 'static, + C::Connection: Unpin + Send + 'static, + C::Future: Send + 'static, + C::Error: Into> + Send, + { let settings = Builder::new() .http2_initial_stream_window_size(endpoint.init_stream_window_size) .http2_initial_connection_window_size(endpoint.init_connection_window_size) .http2_only(true) .clone(); + let mut connector = HyperConnect::new(connector, settings); + let initial_conn = connector.call(endpoint.uri.clone()).await?; + let conn = Reconnect::new(initial_conn, connector, endpoint.uri.clone()); + let stack = ServiceBuilder::new() .layer_fn(|s| AddOrigin::new(s, endpoint.uri.clone())) .optional_layer(endpoint.timeout.map(TimeoutLayer::new)) @@ -46,10 +50,6 @@ impl Connection { .optional_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d))) .into_inner(); - let mut connector = HyperConnect::new(connector, settings); - let initial_conn = connector.call(endpoint.uri.clone()).await?; - let conn = Reconnect::new(initial_conn, connector, endpoint.uri.clone()); - let inner = stack.layer(conn); Ok(Self { diff --git a/tonic/src/transport/service/connector.rs b/tonic/src/transport/service/connector.rs index c02d17367..f8a63b88a 100644 --- a/tonic/src/transport/service/connector.rs +++ b/tonic/src/transport/service/connector.rs @@ -22,6 +22,7 @@ pub(crate) fn connector(tls: Option) -> Connector { Connector::new(tls) } +#[derive(Clone)] pub(crate) struct Connector { http: HttpConnector, #[cfg(feature = "tls")] diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index 4635d8617..38808324b 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -9,24 +9,32 @@ use std::{ }; use tower::discover::{Change, Discover}; -pub(crate) struct ServiceList { +pub(crate) struct ServiceList { list: VecDeque, + connector: C, connecting: Option> + Send + 'static>>>, i: usize, } -impl ServiceList { - pub(crate) fn new(list: Vec) -> Self { +impl ServiceList { + pub(crate) fn new(list: Vec, connector: C) -> Self { Self { list: list.into(), + connector, connecting: None, i: 0, } } } -impl Discover for ServiceList { +impl Discover for ServiceList +where + C: tower_make::MakeConnection + Send + Clone + Unpin + 'static, + C::Connection: Unpin + Send + 'static, + C::Future: Send + 'static, + C::Error: Into> + Send, +{ type Key = usize; type Service = Connection; type Error = crate::Error; @@ -49,7 +57,8 @@ impl Discover for ServiceList { } if let Some(endpoint) = self.list.pop_front() { - let fut = Connection::new(endpoint); + let c = &self.connector; + let fut = Connection::new(endpoint, c.clone()); self.connecting = Some(Box::pin(fut)); } else { return Poll::Pending; @@ -58,7 +67,7 @@ impl Discover for ServiceList { } } -impl fmt::Debug for ServiceList { +impl fmt::Debug for ServiceList { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ServiceList") .field("list", &self.list) From 6cd585545ef77c71ca11c71080bc7c4190533271 Mon Sep 17 00:00:00 2001 From: Akshay Narayan Date: Thu, 21 Nov 2019 22:52:05 +0000 Subject: [PATCH 2/5] build: add `connect_with_connector()` to `Client` --- tonic-build/src/client.rs | 15 +++++++++++++++ tonic/src/transport/mod.rs | 1 + 2 files changed, 16 insertions(+) diff --git a/tonic-build/src/client.rs b/tonic-build/src/client.rs index e0a688956..cd1eed8e8 100644 --- a/tonic-build/src/client.rs +++ b/tonic-build/src/client.rs @@ -62,6 +62,21 @@ fn generate_connect(service_ident: &syn::Ident) -> TokenStream { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; Ok(Self::new(conn)) } + + /// Attempt to create a new client by connecting to a given endpoint using a custom + /// connector. + pub async fn connect_with_connector(dst: D, connector: C) -> Result + where + C: tonic::transport::MakeConnection + Send + 'static, + C::Connection: Unpin + Send + 'static, + C::Future: Send + 'static, + C::Error: Into> + Send, + D: std::convert::TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect_with_connector(connector).await?; + Ok(Self::new(conn)) + } } } } diff --git a/tonic/src/transport/mod.rs b/tonic/src/transport/mod.rs index 64fa72909..4874e1d36 100644 --- a/tonic/src/transport/mod.rs +++ b/tonic/src/transport/mod.rs @@ -107,6 +107,7 @@ pub use self::error::Error; pub use self::server::{Server, ServiceName}; pub use self::tls::{Certificate, Identity}; pub use hyper::Body; +pub use tower_make::MakeConnection; #[cfg(feature = "tls")] pub use self::endpoint::ClientTlsConfig; From a2db60d0bfac3db3f38c4d9473dff3158167a5da Mon Sep 17 00:00:00 2001 From: Akshay Narayan Date: Fri, 22 Nov 2019 16:42:47 +0000 Subject: [PATCH 3/5] fix hyper-unix-connector version for carg-deny --- tonic/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 411b53ff3..c002bb49f 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -74,7 +74,7 @@ tokio-rustls = { version = "=0.12.0-alpha.5", optional = true } rustls-native-certs = { version = "0.1", optional = true } [dev-dependencies] -hyper-unix-connector = "0.1.1" +hyper-unix-connector = "0.1.2" static_assertions = "1.0" rand = "0.7.2" criterion = "0.3" From 8efaba66b773630b4193cb41dc227073f2a25501 Mon Sep 17 00:00:00 2001 From: Akshay Narayan Date: Fri, 22 Nov 2019 16:46:46 +0000 Subject: [PATCH 4/5] move MakeConnection re-export to codegen to clean up imports in generated code --- tonic-build/src/client.rs | 2 +- tonic/src/codegen.rs | 2 ++ tonic/src/transport/mod.rs | 1 - 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tonic-build/src/client.rs b/tonic-build/src/client.rs index cd1eed8e8..fbf5fccdd 100644 --- a/tonic-build/src/client.rs +++ b/tonic-build/src/client.rs @@ -67,7 +67,7 @@ fn generate_connect(service_ident: &syn::Ident) -> TokenStream { /// connector. pub async fn connect_with_connector(dst: D, connector: C) -> Result where - C: tonic::transport::MakeConnection + Send + 'static, + C: MakeConnection + Send + 'static, C::Connection: Unpin + Send + 'static, C::Future: Send + 'static, C::Error: Into> + Send, diff --git a/tonic/src/codegen.rs b/tonic/src/codegen.rs index 52116d7f0..c072e0a5a 100644 --- a/tonic/src/codegen.rs +++ b/tonic/src/codegen.rs @@ -9,6 +9,8 @@ pub use std::future::Future; pub use std::pin::Pin; pub use std::sync::Arc; pub use std::task::{Context, Poll}; +#[cfg(feature = "transport")] +pub use tower_make::MakeConnection; pub use tower_service::Service; pub type StdError = Box; pub use crate::body::Body; diff --git a/tonic/src/transport/mod.rs b/tonic/src/transport/mod.rs index 4874e1d36..64fa72909 100644 --- a/tonic/src/transport/mod.rs +++ b/tonic/src/transport/mod.rs @@ -107,7 +107,6 @@ pub use self::error::Error; pub use self::server::{Server, ServiceName}; pub use self::tls::{Certificate, Identity}; pub use hyper::Body; -pub use tower_make::MakeConnection; #[cfg(feature = "tls")] pub use self::endpoint::ClientTlsConfig; From fd7470996c5d5ceb3da6e48e73590a7d9f806d57 Mon Sep 17 00:00:00 2001 From: Akshay Narayan Date: Fri, 22 Nov 2019 23:02:38 +0000 Subject: [PATCH 5/5] endpoint: Make endpoint contain a connector Instead of providing a connector in the call to `connect()`, make `Endpoint` contain a connector, by default an `HTTPConnector`, which the caller can swap. --- tonic-build/src/client.rs | 4 +- tonic-interop/src/bin/client.rs | 24 ++-- tonic/Cargo.toml | 2 +- tonic/src/transport/channel.rs | 33 +---- tonic/src/transport/endpoint.rs | 149 ++++++++++++---------- tonic/src/transport/service/connection.rs | 29 +++-- tonic/src/transport/service/connector.rs | 12 +- tonic/src/transport/service/discover.rs | 9 +- tonic/src/transport/service/mod.rs | 2 + 9 files changed, 134 insertions(+), 130 deletions(-) diff --git a/tonic-build/src/client.rs b/tonic-build/src/client.rs index fbf5fccdd..687cc07d0 100644 --- a/tonic-build/src/client.rs +++ b/tonic-build/src/client.rs @@ -67,14 +67,14 @@ fn generate_connect(service_ident: &syn::Ident) -> TokenStream { /// connector. pub async fn connect_with_connector(dst: D, connector: C) -> Result where - C: MakeConnection + Send + 'static, + C: MakeConnection + Send + Clone + 'static, C::Connection: Unpin + Send + 'static, C::Future: Send + 'static, C::Error: Into> + Send, D: std::convert::TryInto, D::Error: Into, { - let conn = tonic::transport::Endpoint::new(dst)?.connect_with_connector(connector).await?; + let conn = tonic::transport::Endpoint::new(dst)?.connector(connector).connect().await?; Ok(Self::new(conn)) } } diff --git a/tonic-interop/src/bin/client.rs b/tonic-interop/src/bin/client.rs index 2142310c9..2fdf2f51e 100644 --- a/tonic-interop/src/bin/client.rs +++ b/tonic-interop/src/bin/client.rs @@ -26,22 +26,24 @@ async fn main() -> Result<(), Box> { let test_cases = matches.test_case; - #[allow(unused_mut)] - let mut endpoint = Endpoint::from_static("http://localhost:10000") + let endpoint = Endpoint::from_static("http://localhost:10000") .timeout(Duration::from_secs(5)) .concurrency_limit(30); - if matches.use_tls { + let channel = if matches.use_tls { let pem = tokio::fs::read("tonic-interop/data/ca.pem").await?; let ca = Certificate::from_pem(pem); - endpoint = endpoint.tls_config( - ClientTlsConfig::with_rustls() - .ca_certificate(ca) - .domain_name("foo.test.google.fr"), - ); - } - - let channel = endpoint.connect().await?; + endpoint + .tls_config( + ClientTlsConfig::with_rustls() + .ca_certificate(ca) + .domain_name("foo.test.google.fr"), + ) + .connect() + .await? + } else { + endpoint.connect().await? + }; let mut client = client::TestClient::new(channel.clone()); let mut unimplemented_client = client::UnimplementedClient::new(channel); diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index c002bb49f..4f7dd66bd 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -74,7 +74,7 @@ tokio-rustls = { version = "=0.12.0-alpha.5", optional = true } rustls-native-certs = { version = "0.1", optional = true } [dev-dependencies] -hyper-unix-connector = "0.1.2" +hyper-unix-connector = "0.1.3" static_assertions = "1.0" rand = "0.7.2" criterion = "0.3" diff --git a/tonic/src/transport/channel.rs b/tonic/src/transport/channel.rs index 28d8f2874..4ebdbf5cc 100644 --- a/tonic/src/transport/channel.rs +++ b/tonic/src/transport/channel.rs @@ -97,10 +97,7 @@ impl Channel { /// /// This creates a [`Channel`] that will load balance accross all the /// provided endpoints. - pub fn balance_list_with_connector( - list: impl Iterator, - connector: C, - ) -> Self + pub fn balance_list(list: impl Iterator>) -> Self where C: tower_make::MakeConnection + Send + Clone + Unpin + 'static, C::Connection: Unpin + Send + 'static, @@ -120,34 +117,12 @@ impl Channel { .next() .and_then(|e| e.interceptor_headers.clone()); - let discover = ServiceList::new(list, connector); + let discover = ServiceList::new(list); Self::balance(discover, buffer_size, interceptor_headers) } - /// Balance a list of [`Endpoint`]'s. - /// - /// This creates a [`Channel`] that will load balance accross all the - /// provided endpoints. - pub fn balance_list(list: impl Iterator) -> Self { - // Backwards API compatibility. - // Uses TCP if the TLS feature is not enabled, and TLS otherwise. - - let list = list.collect::>(); - - #[cfg(feature = "tls")] - let connector = { - let tls_connector = list.iter().next().and_then(|e| e.tls.clone()); - super::service::connector(tls_connector) - }; - - #[cfg(not(feature = "tls"))] - let connector = super::service::connector(); - - Channel::balance_list_with_connector(list.into_iter(), connector) - } - - pub(crate) async fn connect(endpoint: Endpoint, connector: C) -> Result + pub(crate) async fn connect(endpoint: Endpoint) -> Result where C: tower_make::MakeConnection + Send + 'static, C::Connection: Unpin + Send + 'static, @@ -157,7 +132,7 @@ impl Channel { let buffer_size = endpoint.buffer_size.clone().unwrap_or(DEFAULT_BUFFER_SIZE); let interceptor_headers = endpoint.interceptor_headers.clone(); - let svc = Connection::new(endpoint, connector) + let svc = Connection::new(endpoint) .await .map_err(|e| super::Error::from_source(super::ErrorKind::Client, e))?; diff --git a/tonic/src/transport/endpoint.rs b/tonic/src/transport/endpoint.rs index 4a08ab5dc..7587a47d2 100644 --- a/tonic/src/transport/endpoint.rs +++ b/tonic/src/transport/endpoint.rs @@ -6,6 +6,7 @@ use super::{ }; use bytes::Bytes; use http::uri::{InvalidUriBytes, Uri}; +use hyper::client::connect::HttpConnector; use std::{ convert::{TryFrom, TryInto}, fmt, @@ -17,13 +18,12 @@ use std::{ /// /// This struct is used to build and configure HTTP/2 channels. #[derive(Clone)] -pub struct Endpoint { +pub struct Endpoint { pub(super) uri: Uri, + pub(super) connector: C, pub(super) timeout: Option, pub(super) concurrency_limit: Option, pub(super) rate_limit: Option<(u64, Duration)>, - #[cfg(feature = "tls")] - pub(super) tls: Option, pub(super) buffer_size: Option, pub(super) interceptor_headers: Option>, @@ -31,7 +31,7 @@ pub struct Endpoint { pub(super) init_connection_window_size: Option, } -impl Endpoint { +impl Endpoint { // FIXME: determine if we want to expose this or not. This is really // just used in codegen for a shortcut. #[doc(hidden)] @@ -46,28 +46,6 @@ impl Endpoint { Ok(me) } - /// Convert an `Endpoint` from a static string. - /// - /// ``` - /// # use tonic::transport::Endpoint; - /// Endpoint::from_static("https://example.com"); - /// ``` - pub fn from_static(s: &'static str) -> Self { - let uri = Uri::from_static(s); - Self::from(uri) - } - - /// Convert an `Endpoint` from shared bytes. - /// - /// ``` - /// # use tonic::transport::Endpoint; - /// Endpoint::from_shared("https://example.com".to_string()); - /// ``` - pub fn from_shared(s: impl Into) -> Result { - let uri = Uri::from_shared(s.into())?; - Ok(Self::from(uri)) - } - /// Apply a timeout to each request. /// /// ``` @@ -147,32 +125,32 @@ impl Endpoint { } /// Configures TLS for the endpoint. + /// + /// Shortcut for configuring a TLS connector and calling [`Endpoint::connector`]. #[cfg(feature = "tls")] - pub fn tls_config(self, tls_config: ClientTlsConfig) -> Self { - Endpoint { - tls: Some(tls_config.tls_connector(self.uri.clone()).unwrap()), - ..self - } - } - - /// Create a channel from this config. - pub async fn connect(&self) -> Result { - // Backwards API compatibility. - // Uses TCP if the TLS feature is not enabled, and TLS otherwise. - - #[cfg(feature = "tls")] - let connector = super::service::connector(self.tls.clone()); - - #[cfg(not(feature = "tls"))] - let connector = super::service::connector(); - - self.connect_with_connector(connector).await + pub fn tls_config( + self, + tls_config: ClientTlsConfig, + ) -> Endpoint< + impl tower_make::MakeConnection< + hyper::Uri, + Connection = impl Unpin + Send + 'static, + Future = impl Send + 'static, + Error = impl Into> + Send, + > + Clone, + > { + let tls_connector = tls_config.tls_connector(self.uri.clone()).unwrap(); + let connector = super::service::tls_connector(Some(tls_connector)); + self.connector(connector) } +} - /// Create a channel using a custom connector. +impl Endpoint { + /// Use a custom connector for the underlying channel. /// - /// The [`tower_make::MakeConnection`] requirement is an alias for `tower::Service` - for example, a TCP stream as in [`Endpoint::connect`] above. + /// Calling [`Endpoint::connect`] requires the connector implement + /// the [`tower_make::MakeConnection`] requirement, which is an alias for `tower::Service` - for example, a TCP stream for the default [`HttpConnector`]. /// /// # Example /// ```rust @@ -185,7 +163,7 @@ impl Endpoint { /// connector.set_nodelay(true); /// /// let endpoint = Endpoint::from_static("http://example.com"); - /// endpoint.connect_with_connector(connector); //.await + /// endpoint.connector(connector).connect(); //.await /// ``` /// /// # Example with non-default Connector @@ -195,28 +173,69 @@ impl Endpoint { /// use tonic::transport::Endpoint; /// /// let endpoint = Endpoint::from_static("http://example.com"); - /// endpoint.connect_with_connector(UnixClient); //.await + /// endpoint.connector(UnixClient).connect(); //.await /// ``` - pub async fn connect_with_connector(&self, connector: C) -> Result - where - C: tower_make::MakeConnection + Send + 'static, - C::Connection: Unpin + Send + 'static, - C::Future: Send + 'static, - C::Error: Into> + Send, - { - Channel::connect(self.clone(), connector).await + pub fn connector(self, connector: D) -> Endpoint { + Endpoint { + uri: self.uri, + connector, + concurrency_limit: self.concurrency_limit, + rate_limit: self.rate_limit, + timeout: self.timeout, + buffer_size: self.buffer_size, + interceptor_headers: self.interceptor_headers, + init_stream_window_size: self.init_stream_window_size, + init_connection_window_size: self.init_connection_window_size, + } + } +} + +impl Endpoint +where + C: tower_make::MakeConnection + Send + Clone + 'static, + C::Connection: Unpin + Send + 'static, + C::Future: Send + 'static, + C::Error: Into> + Send, +{ + /// Create the channel. + pub async fn connect(&self) -> Result { + let e: Self = self.clone(); + Channel::connect(e).await + } +} + +impl Endpoint { + /// Convert an `Endpoint` from a static string. + /// + /// ``` + /// # use tonic::transport::Endpoint; + /// Endpoint::from_static("https://example.com"); + /// ``` + pub fn from_static(s: &'static str) -> Self { + let uri = Uri::from_static(s); + Self::from(uri) + } + + /// Convert an `Endpoint` from shared bytes. + /// + /// ``` + /// # use tonic::transport::Endpoint; + /// Endpoint::from_shared("https://example.com".to_string()); + /// ``` + pub fn from_shared(s: impl Into) -> Result { + let uri = Uri::from_shared(s.into())?; + Ok(Self::from(uri)) } } -impl From for Endpoint { +impl From for Endpoint { fn from(uri: Uri) -> Self { Self { uri, + connector: super::service::connector(), concurrency_limit: None, rate_limit: None, timeout: None, - #[cfg(feature = "tls")] - tls: None, buffer_size: None, interceptor_headers: None, init_stream_window_size: None, @@ -225,7 +244,7 @@ impl From for Endpoint { } } -impl TryFrom for Endpoint { +impl TryFrom for Endpoint { type Error = InvalidUriBytes; fn try_from(t: Bytes) -> Result { @@ -233,7 +252,7 @@ impl TryFrom for Endpoint { } } -impl TryFrom for Endpoint { +impl TryFrom for Endpoint { type Error = InvalidUriBytes; fn try_from(t: String) -> Result { @@ -241,7 +260,7 @@ impl TryFrom for Endpoint { } } -impl TryFrom<&'static str> for Endpoint { +impl TryFrom<&'static str> for Endpoint { type Error = Never; fn try_from(t: &'static str) -> Result { @@ -260,7 +279,7 @@ impl std::fmt::Display for Never { impl std::error::Error for Never {} -impl fmt::Debug for Endpoint { +impl fmt::Debug for Endpoint { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Endpoint").finish() } diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index 663148ed0..fd9738532 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -26,28 +26,39 @@ pub(crate) struct Connection { } impl Connection { - pub(crate) async fn new(endpoint: Endpoint, connector: C) -> Result + pub(crate) async fn new(endpoint: Endpoint) -> Result where C: tower_make::MakeConnection + Send + 'static, C::Connection: Unpin + Send + 'static, C::Future: Send + 'static, C::Error: Into> + Send, { + let Endpoint { + uri, + init_stream_window_size, + init_connection_window_size, + timeout, + concurrency_limit, + rate_limit, + connector, + .. + } = endpoint; + let settings = Builder::new() - .http2_initial_stream_window_size(endpoint.init_stream_window_size) - .http2_initial_connection_window_size(endpoint.init_connection_window_size) + .http2_initial_stream_window_size(init_stream_window_size) + .http2_initial_connection_window_size(init_connection_window_size) .http2_only(true) .clone(); let mut connector = HyperConnect::new(connector, settings); - let initial_conn = connector.call(endpoint.uri.clone()).await?; - let conn = Reconnect::new(initial_conn, connector, endpoint.uri.clone()); + let initial_conn = connector.call(uri.clone()).await?; + let conn = Reconnect::new(initial_conn, connector, uri.clone()); let stack = ServiceBuilder::new() - .layer_fn(|s| AddOrigin::new(s, endpoint.uri.clone())) - .optional_layer(endpoint.timeout.map(TimeoutLayer::new)) - .optional_layer(endpoint.concurrency_limit.map(ConcurrencyLimitLayer::new)) - .optional_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d))) + .layer_fn(|s| AddOrigin::new(s, uri.clone())) + .optional_layer(timeout.map(TimeoutLayer::new)) + .optional_layer(concurrency_limit.map(ConcurrencyLimitLayer::new)) + .optional_layer(rate_limit.map(|(l, d)| RateLimitLayer::new(l, d))) .into_inner(); let inner = stack.layer(conn); diff --git a/tonic/src/transport/service/connector.rs b/tonic/src/transport/service/connector.rs index f8a63b88a..6a6237595 100644 --- a/tonic/src/transport/service/connector.rs +++ b/tonic/src/transport/service/connector.rs @@ -9,7 +9,6 @@ use std::task::{Context, Poll}; use tower_make::MakeConnection; use tower_service::Service; -#[cfg(not(feature = "tls"))] pub(crate) fn connector() -> HttpConnector { let mut http = HttpConnector::new(); http.enforce_http(false); @@ -18,7 +17,7 @@ pub(crate) fn connector() -> HttpConnector { } #[cfg(feature = "tls")] -pub(crate) fn connector(tls: Option) -> Connector { +pub(crate) fn tls_connector(tls: Option) -> Connector { Connector::new(tls) } @@ -32,11 +31,10 @@ pub(crate) struct Connector { impl Connector { #[cfg(feature = "tls")] pub(crate) fn new(tls: Option) -> Self { - let mut http = HttpConnector::new(); - http.enforce_http(false); - http.set_nodelay(true); - - Self { http, tls } + Self { + http: connector(), + tls, + } } } diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index 38808324b..c1dc9adfb 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -10,18 +10,16 @@ use std::{ use tower::discover::{Change, Discover}; pub(crate) struct ServiceList { - list: VecDeque, - connector: C, + list: VecDeque>, connecting: Option> + Send + 'static>>>, i: usize, } impl ServiceList { - pub(crate) fn new(list: Vec, connector: C) -> Self { + pub(crate) fn new(list: Vec>) -> Self { Self { list: list.into(), - connector, connecting: None, i: 0, } @@ -57,8 +55,7 @@ where } if let Some(endpoint) = self.list.pop_front() { - let c = &self.connector; - let fut = Connection::new(endpoint, c.clone()); + let fut = Connection::new(endpoint); self.connecting = Some(Box::pin(fut)); } else { return Poll::Pending; diff --git a/tonic/src/transport/service/mod.rs b/tonic/src/transport/service/mod.rs index 0819f02da..11d85dfa1 100644 --- a/tonic/src/transport/service/mod.rs +++ b/tonic/src/transport/service/mod.rs @@ -13,6 +13,8 @@ mod tls; pub(crate) use self::add_origin::AddOrigin; pub(crate) use self::connection::Connection; pub(crate) use self::connector::connector; +#[cfg(feature = "tls")] +pub(crate) use self::connector::tls_connector; pub(crate) use self::discover::ServiceList; pub(crate) use self::io::BoxedIo; pub(crate) use self::layer::{layer_fn, ServiceBuilderExt};