From aa21f1176b412512903fa2cdc82a9d866e2d29de Mon Sep 17 00:00:00 2001 From: Yang Yang <962032265@qq.com> Date: Mon, 26 Dec 2022 21:38:16 +0800 Subject: [PATCH] [Ftr] Add Builder for DubboClient (#89) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Rft(dubbo): add ClientBuilder for client * Rftï(dubbo-build): add build api for client * style(examples): cargo fmt * Rft: move connection from client to transport mod * Rft(dubbo): add default timeout for client --- dubbo-build/src/client.rs | 12 ++- dubbo/Cargo.toml | 2 +- dubbo/src/codegen.rs | 3 +- dubbo/src/protocol/triple/triple_invoker.rs | 2 +- dubbo/src/triple/client/builder.rs | 87 +++++++++++++++++++ dubbo/src/triple/client/mod.rs | 2 +- dubbo/src/triple/client/triple.rs | 35 +++++--- .../{client => transport}/connection.rs | 8 +- dubbo/src/triple/transport/connector/mod.rs | 4 +- dubbo/src/triple/transport/mod.rs | 1 + examples/echo/src/echo/client.rs | 5 +- examples/echo/src/protos/hello_echo.rs | 11 ++- 12 files changed, 143 insertions(+), 29 deletions(-) create mode 100644 dubbo/src/triple/client/builder.rs rename dubbo/src/triple/{client => transport}/connection.rs (94%) diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs index 10fa4468..8d58ab16 100644 --- a/dubbo-build/src/client.rs +++ b/dubbo-build/src/client.rs @@ -70,13 +70,19 @@ pub fn generate( inner: TripleClient, } - impl #service_ident { + impl #service_ident { pub fn connect(host: String) -> Self { let cli = TripleClient::connect(host); #service_ident { inner: cli, } } + + pub fn build(builder: ClientBuilder) -> Self { + Self { + inner: TripleClient::with_builder(builder), + } + } } impl #service_ident @@ -84,9 +90,9 @@ pub fn generate( T: Service, Response = http::Response>, T::Error: Into, { - pub fn new(inner: T) -> Self { + pub fn new(inner: T, builder: ClientBuilder) -> Self { Self { - inner: TripleClient::new(inner, None), + inner: TripleClient::new(inner, builder), } } diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml index bdde2ea3..145b4a0e 100644 --- a/dubbo/Cargo.toml +++ b/dubbo/Cargo.toml @@ -14,7 +14,7 @@ hyper = { version = "0.14.19", features = ["full"]} http = "0.2" tower-service = "0.3.1" http-body = "0.4.4" -tower = "0.4.12" +tower = { version = "0.4.12", features = ["timeout"]} futures-util = "0.3.23" futures-core ="0.3.23" tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net", "signal"] } diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs index 9ffce899..16c8c1ee 100644 --- a/dubbo/src/codegen.rs +++ b/dubbo/src/codegen.rs @@ -38,4 +38,5 @@ pub use super::triple::server::TripleServer; pub use super::{empty_body, BoxBody, BoxFuture, StdError}; pub use crate::filter::service::FilterService; pub use crate::filter::Filter; -pub use crate::triple::client::connection::Connection; +pub use crate::triple::client::builder::{ClientBoxService, ClientBuilder}; +pub use crate::triple::transport::connection::Connection; diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index 2ec3f4a8..01938513 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -21,7 +21,7 @@ use tower_service::Service; use crate::common::url::Url; use crate::protocol::Invoker; -use crate::triple::client::connection::Connection; +use crate::triple::transport::connection::Connection; #[allow(dead_code)] #[derive(Clone, Default)] diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs new file mode 100644 index 00000000..087f2a25 --- /dev/null +++ b/dubbo/src/triple/client/builder.rs @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use http::Uri; +use hyper::client::conn::Builder; +use tokio::time::Duration; +use tower::ServiceBuilder; + +use crate::triple::transport::connection::Connection; +use crate::utils::boxed::BoxService; + +pub type ClientBoxService = + BoxService, http::Response, crate::Error>; + +#[derive(Clone, Debug, Default)] +pub struct ClientBuilder { + pub uri: Uri, + pub timeout: Option, + pub connector: &'static str, +} + +impl ClientBuilder { + pub fn new() -> ClientBuilder { + ClientBuilder { + uri: Uri::builder().build().unwrap(), + timeout: None, + connector: "", + } + } + + pub fn from_static(s: &'static str) -> ClientBuilder { + Self::from(Uri::from_static(s)) + } + + pub fn with_timeout(self, timeout: u64) -> Self { + Self { + timeout: Some(timeout), + ..self + } + } + + pub fn with_host(self, host: &'static str) -> Self { + Self { + uri: Uri::from_static(host), + ..self + } + } + + pub fn connect(self) -> ClientBoxService { + let builder = ServiceBuilder::new(); + let timeout = self.timeout.unwrap_or(5); + let builder = builder.timeout(Duration::from_secs(timeout)); + + let mut b = Builder::new(); + let hyper_builder = b.http2_only(true); + let conn = Connection::new() + .with_host(self.uri.clone()) + .with_connector(self.connector) + .with_builder(hyper_builder.to_owned()); + + BoxService::new(builder.service(conn)) + } +} + +impl From for ClientBuilder { + fn from(u: Uri) -> Self { + Self { + uri: u, + timeout: None, + connector: "", + } + } +} diff --git a/dubbo/src/triple/client/mod.rs b/dubbo/src/triple/client/mod.rs index 333a60a4..7a8e0131 100644 --- a/dubbo/src/triple/client/mod.rs +++ b/dubbo/src/triple/client/mod.rs @@ -15,7 +15,7 @@ * limitations under the License. */ -pub mod connection; +pub mod builder; pub mod triple; pub use triple::TripleClient; diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 36c9ce94..d4864efd 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -21,7 +21,7 @@ use futures_util::{future, stream, StreamExt, TryStreamExt}; use http::HeaderValue; use tower_service::Service; -use super::connection::Connection; +use super::builder::{ClientBoxService, ClientBuilder}; use crate::filter::service::FilterService; use crate::filter::Filter; use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response}; @@ -32,33 +32,43 @@ use crate::triple::encode::encode; #[derive(Debug, Clone, Default)] pub struct TripleClient { - host: Option, + builder: Option, inner: T, send_compression_encoding: Option, } -impl TripleClient { +impl TripleClient { pub fn connect(host: String) -> Self { let uri = match http::Uri::from_str(&host) { - Ok(v) => Some(v), + Ok(v) => v, Err(err) => { tracing::error!("http uri parse error: {}, host: {}", err, host); panic!("http uri parse error: {}, host: {}", err, host) } }; + let builder = ClientBuilder::from(uri); + TripleClient { - host: uri.clone(), - inner: Connection::new().with_host(uri.unwrap()), + builder: Some(builder.clone()), + inner: builder.connect(), + send_compression_encoding: Some(CompressionEncoding::Gzip), + } + } + + pub fn with_builder(builder: ClientBuilder) -> Self { + TripleClient { + builder: Some(builder.clone()), + inner: builder.connect(), send_compression_encoding: Some(CompressionEncoding::Gzip), } } } impl TripleClient { - pub fn new(inner: T, host: Option) -> Self { + pub fn new(inner: T, builder: ClientBuilder) -> Self { TripleClient { - host, + builder: Some(builder), inner, send_compression_encoding: Some(CompressionEncoding::Gzip), } @@ -68,7 +78,10 @@ impl TripleClient { where F: Filter, { - TripleClient::new(FilterService::new(self.inner, filter), self.host) + TripleClient::new( + FilterService::new(self.inner, filter), + self.builder.unwrap(), + ) } } @@ -82,8 +95,8 @@ where path: http::uri::PathAndQuery, body: hyper::Body, ) -> http::Request { - let mut parts = match self.host.as_ref() { - Some(v) => v.to_owned().into_parts(), + let mut parts = match self.builder.as_ref() { + Some(v) => v.to_owned().uri.into_parts(), None => { tracing::error!("client host is empty"); return http::Request::new(hyper::Body::empty()); diff --git a/dubbo/src/triple/client/connection.rs b/dubbo/src/triple/transport/connection.rs similarity index 94% rename from dubbo/src/triple/client/connection.rs rename to dubbo/src/triple/transport/connection.rs index ce453096..d9bda6fb 100644 --- a/dubbo/src/triple/client/connection.rs +++ b/dubbo/src/triple/transport/connection.rs @@ -27,7 +27,7 @@ use crate::triple::transport::connector::get_connector; #[derive(Debug, Clone)] pub struct Connection { host: hyper::Uri, - connector: String, + connector: &'static str, builder: Builder, } @@ -41,12 +41,12 @@ impl Connection { pub fn new() -> Self { Connection { host: hyper::Uri::default(), - connector: "http".to_string(), + connector: "http", builder: Builder::new(), } } - pub fn with_connector(mut self, connector: String) -> Self { + pub fn with_connector(mut self, connector: &'static str) -> Self { self.connector = connector; self } @@ -83,7 +83,7 @@ where fn call(&mut self, req: http::Request) -> Self::Future { let builder = self.builder.clone().http2_only(true).to_owned(); - let mut connector = Connect::new(get_connector(self.connector.clone()), builder); + let mut connector = Connect::new(get_connector(self.connector), builder); let uri = self.host.clone(); let fut = async move { let mut con = connector.call(uri).await.unwrap(); diff --git a/dubbo/src/triple/transport/connector/mod.rs b/dubbo/src/triple/transport/connector/mod.rs index 82b9281c..6e0e0622 100644 --- a/dubbo/src/triple/transport/connector/mod.rs +++ b/dubbo/src/triple/transport/connector/mod.rs @@ -73,8 +73,8 @@ where } } -pub fn get_connector(connector: String) -> BoxCloneService { - match connector.as_str() { +pub fn get_connector(connector: &'static str) -> BoxCloneService { + match connector { "http" => { let c = http_connector::HttpConnector::new(); BoxCloneService::new(Connector::new(c)) diff --git a/dubbo/src/triple/transport/mod.rs b/dubbo/src/triple/transport/mod.rs index 228efff3..8d1efb55 100644 --- a/dubbo/src/triple/transport/mod.rs +++ b/dubbo/src/triple/transport/mod.rs @@ -15,6 +15,7 @@ * limitations under the License. */ +pub mod connection; pub mod connector; mod io; pub mod listener; diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs index 8b3cc9bf..73bc02ef 100644 --- a/examples/echo/src/echo/client.rs +++ b/examples/echo/src/echo/client.rs @@ -32,8 +32,9 @@ impl Filter for FakeFilter { #[tokio::main] async fn main() { let mut cli = EchoClient::connect("http://127.0.0.1:8888".to_string()); - let mut unary_cli = cli.clone().with_filter(FakeFilter {}); - let resp = unary_cli + // let mut unary_cli = cli.clone().with_filter(FakeFilter {}); + // let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888")); + let resp = cli .unary_echo(Request::new(EchoRequest { message: "message from client".to_string(), })) diff --git a/examples/echo/src/protos/hello_echo.rs b/examples/echo/src/protos/hello_echo.rs index 86daf6fb..08a1d299 100644 --- a/examples/echo/src/protos/hello_echo.rs +++ b/examples/echo/src/protos/hello_echo.rs @@ -36,20 +36,25 @@ pub mod echo_client { pub struct EchoClient { inner: TripleClient, } - impl EchoClient { + impl EchoClient { pub fn connect(host: String) -> Self { let cli = TripleClient::connect(host); EchoClient { inner: cli } } + pub fn build(builder: ClientBuilder) -> Self { + Self { + inner: TripleClient::with_builder(builder), + } + } } impl EchoClient where T: Service, Response = http::Response>, T::Error: Into, { - pub fn new(inner: T) -> Self { + pub fn new(inner: T, builder: ClientBuilder) -> Self { Self { - inner: TripleClient::new(inner, None), + inner: TripleClient::new(inner, builder), } } pub fn with_filter(self, filter: F) -> EchoClient>