From 2e964c78c666ecd6e6cfc37689d30300cad81f4c Mon Sep 17 00:00:00 2001 From: Luca Palmieri Date: Wed, 18 Nov 2020 19:26:49 +0000 Subject: [PATCH] feat(transport): Connect lazily in the load balanced channel (#493) --- tonic/src/transport/service/discover.rs | 60 +++++++++---------------- 1 file changed, 21 insertions(+), 39 deletions(-) diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index 925525c30..a5feccb50 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -1,9 +1,8 @@ -use super::super::{service, BoxFuture}; +use super::super::service; use super::connection::Connection; use crate::transport::Endpoint; use std::{ - future::Future, hash::Hash, pin::Pin, task::{Context, Poll}, @@ -16,15 +15,11 @@ type DiscoverResult = Result, E>; pub(crate) struct DynamicServiceStream { changes: Receiver>, - connecting: Option<(K, BoxFuture)>, } impl DynamicServiceStream { pub(crate) fn new(changes: Receiver>) -> Self { - Self { - changes, - connecting: None, - } + Self { changes } } } @@ -37,39 +32,26 @@ impl Discover for DynamicServiceStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - loop { - if let Some((key, connecting)) = &mut self.connecting { - let svc = futures_core::ready!(Pin::new(connecting).poll(cx))?; - let key = key.to_owned(); - self.connecting = None; - let change = Ok(Change::Insert(key, svc)); - return Poll::Ready(change); - }; - - let c = &mut self.changes; - match Pin::new(&mut *c).poll_next(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(None) => { - return Poll::Pending; + let c = &mut self.changes; + match Pin::new(&mut *c).poll_next(cx) { + Poll::Pending | Poll::Ready(None) => Poll::Pending, + Poll::Ready(Some(change)) => match change { + Change::Insert(k, endpoint) => { + let mut http = hyper::client::connect::HttpConnector::new(); + http.set_nodelay(endpoint.tcp_nodelay); + http.set_keepalive(endpoint.tcp_keepalive); + http.enforce_http(false); + #[cfg(feature = "tls")] + let connector = service::connector(http, endpoint.tls.clone()); + + #[cfg(not(feature = "tls"))] + let connector = service::connector(http); + let connection = Connection::lazy(connector, endpoint); + let change = Ok(Change::Insert(k, connection)); + Poll::Ready(change) } - Poll::Ready(Some(change)) => match change { - Change::Insert(k, endpoint) => { - let mut http = hyper::client::connect::HttpConnector::new(); - http.set_nodelay(endpoint.tcp_nodelay); - http.set_keepalive(endpoint.tcp_keepalive); - http.enforce_http(false); - #[cfg(feature = "tls")] - let connector = service::connector(http, endpoint.tls.clone()); - - #[cfg(not(feature = "tls"))] - let connector = service::connector(http); - let fut = Connection::connect(connector, endpoint); - self.connecting = Some((k, Box::pin(fut))); - continue; - } - Change::Remove(k) => return Poll::Ready(Ok(Change::Remove(k))), - }, - } + Change::Remove(k) => Poll::Ready(Ok(Change::Remove(k))), + }, } } }