Skip to content

Commit

Permalink
[Ftr] Add Builder for DubboClient (#89)
Browse files Browse the repository at this point in the history
* Rft(dubbo): add ClientBuilder for client

* Rftï(dubbo-build): add build api for client

* style(examples): cargo fmt

* Rft: move connection from client to transport mod

* Rft(dubbo): add default timeout for client
  • Loading branch information
yang20150702 authored Dec 26, 2022
1 parent a863be4 commit aa21f11
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 29 deletions.
12 changes: 9 additions & 3 deletions dubbo-build/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,29 @@ pub fn generate<T: Service>(
inner: TripleClient<T>,
}

impl #service_ident<Connection> {
impl #service_ident<ClientBoxService> {
pub fn connect(host: String) -> Self {
let cli = TripleClient::connect(host);
#service_ident {
inner: cli,
}
}

pub fn build(builder: ClientBuilder) -> Self {
Self {
inner: TripleClient::with_builder(builder),
}
}
}

impl<T> #service_ident<T>
where
T: Service<http::Request<hyperBody>, Response = http::Response<BoxBody>>,
T::Error: Into<StdError>,
{
pub fn new(inner: T) -> Self {
pub fn new(inner: T, builder: ClientBuilder) -> Self {
Self {
inner: TripleClient::new(inner, None),
inner: TripleClient::new(inner, builder),
}
}

Expand Down
2 changes: 1 addition & 1 deletion dubbo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ hyper = { version = "0.14.19", features = ["full"]}
http = "0.2"
tower-service = "0.3.1"
http-body = "0.4.4"
tower = "0.4.12"
tower = { version = "0.4.12", features = ["timeout"]}
futures-util = "0.3.23"
futures-core ="0.3.23"
tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net", "signal"] }
Expand Down
3 changes: 2 additions & 1 deletion dubbo/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ pub use super::triple::server::TripleServer;
pub use super::{empty_body, BoxBody, BoxFuture, StdError};
pub use crate::filter::service::FilterService;
pub use crate::filter::Filter;
pub use crate::triple::client::connection::Connection;
pub use crate::triple::client::builder::{ClientBoxService, ClientBuilder};
pub use crate::triple::transport::connection::Connection;
2 changes: 1 addition & 1 deletion dubbo/src/protocol/triple/triple_invoker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tower_service::Service;

use crate::common::url::Url;
use crate::protocol::Invoker;
use crate::triple::client::connection::Connection;
use crate::triple::transport::connection::Connection;

#[allow(dead_code)]
#[derive(Clone, Default)]
Expand Down
87 changes: 87 additions & 0 deletions dubbo/src/triple/client/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use http::Uri;
use hyper::client::conn::Builder;
use tokio::time::Duration;
use tower::ServiceBuilder;

use crate::triple::transport::connection::Connection;
use crate::utils::boxed::BoxService;

pub type ClientBoxService =
BoxService<http::Request<hyper::Body>, http::Response<crate::BoxBody>, crate::Error>;

#[derive(Clone, Debug, Default)]
pub struct ClientBuilder {
pub uri: Uri,
pub timeout: Option<u64>,
pub connector: &'static str,
}

impl ClientBuilder {
pub fn new() -> ClientBuilder {
ClientBuilder {
uri: Uri::builder().build().unwrap(),
timeout: None,
connector: "",
}
}

pub fn from_static(s: &'static str) -> ClientBuilder {
Self::from(Uri::from_static(s))
}

pub fn with_timeout(self, timeout: u64) -> Self {
Self {
timeout: Some(timeout),
..self
}
}

pub fn with_host(self, host: &'static str) -> Self {
Self {
uri: Uri::from_static(host),
..self
}
}

pub fn connect(self) -> ClientBoxService {
let builder = ServiceBuilder::new();
let timeout = self.timeout.unwrap_or(5);
let builder = builder.timeout(Duration::from_secs(timeout));

let mut b = Builder::new();
let hyper_builder = b.http2_only(true);
let conn = Connection::new()
.with_host(self.uri.clone())
.with_connector(self.connector)
.with_builder(hyper_builder.to_owned());

BoxService::new(builder.service(conn))
}
}

impl From<Uri> for ClientBuilder {
fn from(u: Uri) -> Self {
Self {
uri: u,
timeout: None,
connector: "",
}
}
}
2 changes: 1 addition & 1 deletion dubbo/src/triple/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

pub mod connection;
pub mod builder;
pub mod triple;

pub use triple::TripleClient;
35 changes: 24 additions & 11 deletions dubbo/src/triple/client/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use futures_util::{future, stream, StreamExt, TryStreamExt};
use http::HeaderValue;
use tower_service::Service;

use super::connection::Connection;
use super::builder::{ClientBoxService, ClientBuilder};
use crate::filter::service::FilterService;
use crate::filter::Filter;
use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response};
Expand All @@ -32,33 +32,43 @@ use crate::triple::encode::encode;

#[derive(Debug, Clone, Default)]
pub struct TripleClient<T> {
host: Option<http::Uri>,
builder: Option<ClientBuilder>,
inner: T,
send_compression_encoding: Option<CompressionEncoding>,
}

impl TripleClient<Connection> {
impl TripleClient<ClientBoxService> {
pub fn connect(host: String) -> Self {
let uri = match http::Uri::from_str(&host) {
Ok(v) => Some(v),
Ok(v) => v,
Err(err) => {
tracing::error!("http uri parse error: {}, host: {}", err, host);
panic!("http uri parse error: {}, host: {}", err, host)
}
};

let builder = ClientBuilder::from(uri);

TripleClient {
host: uri.clone(),
inner: Connection::new().with_host(uri.unwrap()),
builder: Some(builder.clone()),
inner: builder.connect(),
send_compression_encoding: Some(CompressionEncoding::Gzip),
}
}

pub fn with_builder(builder: ClientBuilder) -> Self {
TripleClient {
builder: Some(builder.clone()),
inner: builder.connect(),
send_compression_encoding: Some(CompressionEncoding::Gzip),
}
}
}

impl<T> TripleClient<T> {
pub fn new(inner: T, host: Option<http::Uri>) -> Self {
pub fn new(inner: T, builder: ClientBuilder) -> Self {
TripleClient {
host,
builder: Some(builder),
inner,
send_compression_encoding: Some(CompressionEncoding::Gzip),
}
Expand All @@ -68,7 +78,10 @@ impl<T> TripleClient<T> {
where
F: Filter,
{
TripleClient::new(FilterService::new(self.inner, filter), self.host)
TripleClient::new(
FilterService::new(self.inner, filter),
self.builder.unwrap(),
)
}
}

Expand All @@ -82,8 +95,8 @@ where
path: http::uri::PathAndQuery,
body: hyper::Body,
) -> http::Request<hyper::Body> {
let mut parts = match self.host.as_ref() {
Some(v) => v.to_owned().into_parts(),
let mut parts = match self.builder.as_ref() {
Some(v) => v.to_owned().uri.into_parts(),
None => {
tracing::error!("client host is empty");
return http::Request::new(hyper::Body::empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::triple::transport::connector::get_connector;
#[derive(Debug, Clone)]
pub struct Connection {
host: hyper::Uri,
connector: String,
connector: &'static str,
builder: Builder,
}

Expand All @@ -41,12 +41,12 @@ impl Connection {
pub fn new() -> Self {
Connection {
host: hyper::Uri::default(),
connector: "http".to_string(),
connector: "http",
builder: Builder::new(),
}
}

pub fn with_connector<C>(mut self, connector: String) -> Self {
pub fn with_connector(mut self, connector: &'static str) -> Self {
self.connector = connector;
self
}
Expand Down Expand Up @@ -83,7 +83,7 @@ where

fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
let builder = self.builder.clone().http2_only(true).to_owned();
let mut connector = Connect::new(get_connector(self.connector.clone()), builder);
let mut connector = Connect::new(get_connector(self.connector), builder);
let uri = self.host.clone();
let fut = async move {
let mut con = connector.call(uri).await.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions dubbo/src/triple/transport/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ where
}
}

pub fn get_connector(connector: String) -> BoxCloneService<Uri, BoxIO, crate::Error> {
match connector.as_str() {
pub fn get_connector(connector: &'static str) -> BoxCloneService<Uri, BoxIO, crate::Error> {
match connector {
"http" => {
let c = http_connector::HttpConnector::new();
BoxCloneService::new(Connector::new(c))
Expand Down
1 change: 1 addition & 0 deletions dubbo/src/triple/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

pub mod connection;
pub mod connector;
mod io;
pub mod listener;
Expand Down
5 changes: 3 additions & 2 deletions examples/echo/src/echo/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ impl Filter for FakeFilter {
#[tokio::main]
async fn main() {
let mut cli = EchoClient::connect("http://127.0.0.1:8888".to_string());
let mut unary_cli = cli.clone().with_filter(FakeFilter {});
let resp = unary_cli
// let mut unary_cli = cli.clone().with_filter(FakeFilter {});
// let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888"));
let resp = cli
.unary_echo(Request::new(EchoRequest {
message: "message from client".to_string(),
}))
Expand Down
11 changes: 8 additions & 3 deletions examples/echo/src/protos/hello_echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,25 @@ pub mod echo_client {
pub struct EchoClient<T> {
inner: TripleClient<T>,
}
impl EchoClient<Connection> {
impl EchoClient<ClientBoxService> {
pub fn connect(host: String) -> Self {
let cli = TripleClient::connect(host);
EchoClient { inner: cli }
}
pub fn build(builder: ClientBuilder) -> Self {
Self {
inner: TripleClient::with_builder(builder),
}
}
}
impl<T> EchoClient<T>
where
T: Service<http::Request<hyperBody>, Response = http::Response<BoxBody>>,
T::Error: Into<StdError>,
{
pub fn new(inner: T) -> Self {
pub fn new(inner: T, builder: ClientBuilder) -> Self {
Self {
inner: TripleClient::new(inner, None),
inner: TripleClient::new(inner, builder),
}
}
pub fn with_filter<F>(self, filter: F) -> EchoClient<FilterService<T, F>>
Expand Down

0 comments on commit aa21f11

Please sign in to comment.