From 5611c1bdd3739cd03ac23619e4efee2a92b32d8e Mon Sep 17 00:00:00 2001 From: luyanbo Date: Thu, 2 Feb 2023 15:58:55 +0800 Subject: [PATCH] use new pattern --- dubbo-build/src/client.rs | 52 ++-- dubbo/src/triple/client/builder.rs | 76 ++++-- dubbo/src/triple/client/triple.rs | 77 ++---- examples/echo/src/echo/client.rs | 3 +- examples/echo/src/echo/grpc.examples.echo.rs | 35 +-- examples/echo/src/protos/hello_echo.rs | 237 ++++++++++--------- examples/greeter/src/greeter/client.rs | 7 +- 7 files changed, 220 insertions(+), 267 deletions(-) diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs index 0a3ff474..2c55bc94 100644 --- a/dubbo-build/src/client.rs +++ b/dubbo-build/src/client.rs @@ -66,51 +66,39 @@ pub fn generate( #service_doc #(#struct_attributes)* #[derive(Debug, Clone, Default)] - pub struct #service_ident { - inner: TripleClient, + pub struct #service_ident { + inner: TripleClient, } - impl #service_ident { + impl #service_ident { pub fn connect(host: String) -> Self { - let mut cli = TripleClient::connect(host.clone()); - cli = cli.with_directory(Box::new(StaticDirectory::new(&host))); + let cli = TripleClient::connect(host); #service_ident { inner: cli, } } - pub fn build(builder: ClientBuilder) -> Self { - Self { - inner: TripleClient::with_builder(builder), - } - } - } + // pub fn build(builder: ClientBuilder) -> Self { + // Self { + // inner: TripleClient::new(builder), + // } + // } - impl #service_ident - where - T: Service, Response = http::Response>, - T::Error: Into, - { - pub fn new(inner: T, builder: ClientBuilder) -> Self { + pub fn new(builder: ClientBuilder) -> Self { Self { - inner: TripleClient::new(inner, builder), + inner: TripleClient::new(builder), } } - pub fn with_filter(self, filter: F) -> #service_ident> - where - F: Filter, - { - let inner = self.inner.with_filter(filter); - #service_ident { - inner, - } - } - - pub fn with_directory(mut self, directory: Box) -> Self { - self.inner = self.inner.with_directory(directory); - self - } + // pub fn with_filter(self, filter: F) -> #service_ident> + // where + // F: Filter, + // { + // let inner = self.inner.with_filter(filter); + // #service_ident { + // inner, + // } + // } #methods diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index 087f2a25..59ceb02b 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -20,30 +20,39 @@ use hyper::client::conn::Builder; use tokio::time::Duration; use tower::ServiceBuilder; +use crate::cluster::directory::StaticDirectory; +use crate::codegen::{Directory, Filter}; +use crate::triple::compression::CompressionEncoding; use crate::triple::transport::connection::Connection; use crate::utils::boxed::BoxService; +use super::TripleClient; + pub type ClientBoxService = BoxService, http::Response, crate::Error>; #[derive(Clone, Debug, Default)] pub struct ClientBuilder { - pub uri: Uri, pub timeout: Option, pub connector: &'static str, + directory: Option>, } impl ClientBuilder { pub fn new() -> ClientBuilder { ClientBuilder { - uri: Uri::builder().build().unwrap(), timeout: None, connector: "", + directory: None, } } - pub fn from_static(s: &'static str) -> ClientBuilder { - Self::from(Uri::from_static(s)) + pub fn from_static(host: &str) -> ClientBuilder { + Self { + timeout: None, + connector: "", + directory: Some(Box::new(StaticDirectory::new(&host))), + } } pub fn with_timeout(self, timeout: u64) -> Self { @@ -53,35 +62,50 @@ impl ClientBuilder { } } + // pub fn with_filter(self, filter: F) -> Self + // where + // F: Filter, + // { + // Self { + // FilterService::new(self.inner, filter), + // ..self + // } + // } + + /// host: http://0.0.0.0:8888 + pub fn with_directory(self, directory: Box) -> Self { + Self { + directory: Some(directory), + ..self + } + } + pub fn with_host(self, host: &'static str) -> Self { Self { - uri: Uri::from_static(host), + directory: Some(Box::new(StaticDirectory::new(&host))), ..self } } - pub fn connect(self) -> ClientBoxService { - let builder = ServiceBuilder::new(); - let timeout = self.timeout.unwrap_or(5); - let builder = builder.timeout(Duration::from_secs(timeout)); + pub fn build(self) -> TripleClient { + TripleClient { + send_compression_encoding: Some(CompressionEncoding::Gzip), + directory: self.directory, + } + } - let mut b = Builder::new(); - let hyper_builder = b.http2_only(true); - let conn = Connection::new() - .with_host(self.uri.clone()) - .with_connector(self.connector) - .with_builder(hyper_builder.to_owned()); + // pub fn connect1(self) -> ClientBoxService { + // let builder = ServiceBuilder::new(); + // let timeout = self.timeout.unwrap_or(5); + // let builder = builder.timeout(Duration::from_secs(timeout)); - BoxService::new(builder.service(conn)) - } -} + // let mut b = Builder::new(); + // let hyper_builder = b.http2_only(true); + // let conn = Connection::new() + // .with_host(self.uri.clone()) + // .with_connector(self.connector) + // .with_builder(hyper_builder.to_owned()); -impl From for ClientBuilder { - fn from(u: Uri) -> Self { - Self { - uri: u, - timeout: None, - connector: "", - } - } + // BoxService::new(builder.service(conn)) + // } } diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index b6037f2a..36b98c49 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -36,77 +36,30 @@ use crate::triple::decode::Decoding; use crate::triple::encode::encode; #[derive(Debug, Clone, Default)] -pub struct TripleClient { - builder: Option, - inner: T, - send_compression_encoding: Option, - directory: Option>, +pub struct TripleClient { + pub(crate) send_compression_encoding: Option, + pub(crate) directory: Option>, } -impl TripleClient { +impl TripleClient { pub fn connect(host: String) -> Self { - let uri = match http::Uri::from_str(&host) { - Ok(v) => v, - Err(err) => { - tracing::error!("http uri parse error: {}, host: {}", err, host); - panic!("http uri parse error: {}, host: {}", err, host) - } - }; - - let builder = ClientBuilder::from(uri); - - TripleClient { - builder: Some(builder.clone()), - inner: builder.connect(), - send_compression_encoding: Some(CompressionEncoding::Gzip), - directory: None, - } - } - - pub fn with_builder(builder: ClientBuilder) -> Self { - TripleClient { - builder: Some(builder.clone()), - inner: builder.connect(), - send_compression_encoding: Some(CompressionEncoding::Gzip), - directory: None, - } - } -} + let builder = ClientBuilder::from_static(&host); -impl TripleClient { - pub fn new(inner: T, builder: ClientBuilder) -> Self { - TripleClient { - builder: Some(builder), - inner, - send_compression_encoding: Some(CompressionEncoding::Gzip), - directory: None, - } + builder.build() } - pub fn with_filter(self, filter: F) -> TripleClient> - where - F: Filter, - { - TripleClient::new( - FilterService::new(self.inner, filter), - self.builder.unwrap(), - ) + pub fn new(builder: ClientBuilder) -> Self { + builder.build() } - /// host: http://0.0.0.0:8888 - pub fn with_directory(self, directory: Box) -> Self { - TripleClient { - directory: Some(directory), - ..self - } - } -} + // /// host: http://0.0.0.0:8888 + // pub fn with_directory1(self, directory: Box) -> Self { + // TripleClient { + // directory: Some(directory), + // ..self + // } + // } -impl TripleClient -where - T: Service, Response = http::Response>, - T::Error: Into, -{ fn map_request( &self, uri: http::Uri, diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs index 73bc02ef..c1d04036 100644 --- a/examples/echo/src/echo/client.rs +++ b/examples/echo/src/echo/client.rs @@ -31,7 +31,8 @@ impl Filter for FakeFilter { #[tokio::main] async fn main() { - let mut cli = EchoClient::connect("http://127.0.0.1:8888".to_string()); + let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888").with_timeout(1000000); + let mut cli = EchoClient::new(builder); // let mut unary_cli = cli.clone().with_filter(FakeFilter {}); // let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888")); let resp = cli diff --git a/examples/echo/src/echo/grpc.examples.echo.rs b/examples/echo/src/echo/grpc.examples.echo.rs index 40b7b486..700a51b6 100644 --- a/examples/echo/src/echo/grpc.examples.echo.rs +++ b/examples/echo/src/echo/grpc.examples.echo.rs @@ -16,42 +16,19 @@ pub mod echo_client { use dubbo::{codegen::*, cluster::directory::StaticDirectory}; /// Echo is the echo service. #[derive(Debug, Clone, Default)] - pub struct EchoClient { - inner: TripleClient, + pub struct EchoClient { + inner: TripleClient, } - impl EchoClient { + impl EchoClient { pub fn connect(host: String) -> Self { - let mut cli = TripleClient::connect(host.clone()); - cli = cli.with_directory(Box::new(StaticDirectory::new(&host))); + let cli = TripleClient::connect(host); EchoClient { inner: cli } } - pub fn build(builder: ClientBuilder) -> Self { + pub fn new(builder: ClientBuilder) -> Self { Self { - inner: TripleClient::with_builder(builder), + inner: TripleClient::new(builder), } } - } - impl EchoClient - where - T: Service, Response = http::Response>, - T::Error: Into, - { - pub fn new(inner: T, builder: ClientBuilder) -> Self { - Self { - inner: TripleClient::new(inner, builder), - } - } - pub fn with_filter(self, filter: F) -> EchoClient> - where - F: Filter, - { - let inner = self.inner.with_filter(filter); - EchoClient { inner } - } - pub fn with_directory(mut self, directory: Box) -> Self { - self.inner = self.inner.with_directory(directory); - self - } /// UnaryEcho is unary echo. pub async fn unary_echo( &mut self, diff --git a/examples/echo/src/protos/hello_echo.rs b/examples/echo/src/protos/hello_echo.rs index d29446a8..700a51b6 100644 --- a/examples/echo/src/protos/hello_echo.rs +++ b/examples/echo/src/protos/hello_echo.rs @@ -1,85 +1,49 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - /// EchoRequest is the request for echo. #[derive(Clone, PartialEq, ::prost::Message)] pub struct EchoRequest { - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub message: ::prost::alloc::string::String, } /// EchoResponse is the response for echo. #[derive(Clone, PartialEq, ::prost::Message)] pub struct EchoResponse { - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub message: ::prost::alloc::string::String, } /// Generated client implementations. pub mod echo_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use dubbo::{cluster::directory::StaticDirectory, codegen::*}; + use dubbo::{codegen::*, cluster::directory::StaticDirectory}; /// Echo is the echo service. #[derive(Debug, Clone, Default)] - pub struct EchoClient { - inner: TripleClient, + pub struct EchoClient { + inner: TripleClient, } - impl EchoClient { + impl EchoClient { pub fn connect(host: String) -> Self { - let mut cli = TripleClient::connect(host.clone()); - cli = cli.with_directory(Box::new(StaticDirectory::new(&host))); + let cli = TripleClient::connect(host); EchoClient { inner: cli } } - pub fn build(builder: ClientBuilder) -> Self { + pub fn new(builder: ClientBuilder) -> Self { Self { - inner: TripleClient::with_builder(builder), + inner: TripleClient::new(builder), } } - } - impl EchoClient - where - T: Service, Response = http::Response>, - T::Error: Into, - { - pub fn new(inner: T, builder: ClientBuilder) -> Self { - Self { - inner: TripleClient::new(inner, builder), - } - } - pub fn with_filter(self, filter: F) -> EchoClient> - where - F: Filter, - { - let inner = self.inner.with_filter(filter); - EchoClient { inner } - } - pub fn with_directory(mut self, directory: Box) -> Self { - self.inner = self.inner.with_directory(directory); - self - } /// UnaryEcho is unary echo. pub async fn unary_echo( &mut self, request: Request, ) -> Result, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let invocation = RpcInvocation::default() .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("UnaryEcho")); - let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho"); + let path = http::uri::PathAndQuery::from_static( + "/grpc.examples.echo.Echo/UnaryEcho", + ); self.inner.unary(request, codec, path, invocation).await } /// ServerStreamingEcho is server side streaming. @@ -87,51 +51,51 @@ pub mod echo_client { &mut self, request: Request, ) -> Result>, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let invocation = RpcInvocation::default() .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("ServerStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/ServerStreamingEcho", ); - self.inner - .server_streaming(request, codec, path, invocation) - .await + self.inner.server_streaming(request, codec, path, invocation).await } /// ClientStreamingEcho is client side streaming. pub async fn client_streaming_echo( &mut self, request: impl IntoStreamingRequest, ) -> Result, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let invocation = RpcInvocation::default() .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("ClientStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/ClientStreamingEcho", ); - self.inner - .client_streaming(request, codec, path, invocation) - .await + self.inner.client_streaming(request, codec, path, invocation).await } /// BidirectionalStreamingEcho is bidi streaming. pub async fn bidirectional_streaming_echo( &mut self, request: impl IntoStreamingRequest, ) -> Result>, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let invocation = RpcInvocation::default() .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("BidirectionalStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/BidirectionalStreamingEcho", ); - self.inner - .bidi_streaming(request, codec, path, invocation) - .await + self.inner.bidi_streaming(request, codec, path, invocation).await } } } @@ -148,7 +112,9 @@ pub mod echo_server { request: Request, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the ServerStreamingEcho method. - type ServerStreamingEchoStream: futures_util::Stream> + type ServerStreamingEchoStream: futures_util::Stream< + Item = Result, + > + Send + 'static; /// ServerStreamingEcho is server side streaming. @@ -162,14 +128,19 @@ pub mod echo_server { request: Request>, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the BidirectionalStreamingEcho method. - type BidirectionalStreamingEchoStream: futures_util::Stream> + type BidirectionalStreamingEchoStream: futures_util::Stream< + Item = Result, + > + Send + 'static; /// BidirectionalStreamingEcho is bidi streaming. async fn bidirectional_streaming_echo( &self, request: Request>, - ) -> Result, dubbo::status::Status>; + ) -> Result< + Response, + dubbo::status::Status, + >; } /// Echo is the echo service. #[derive(Debug)] @@ -199,7 +170,10 @@ pub mod echo_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -212,18 +186,26 @@ pub mod echo_server { } impl UnarySvc for UnaryEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture, dubbo::status::Status>; - fn call(&mut self, request: Request) -> Self::Future { + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request, + ) -> Self::Future { let inner = self.inner.0.clone(); let fut = async move { inner.unary_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server.unary(UnaryEchoServer { inner }, req).await; Ok(res) }; @@ -234,22 +216,32 @@ pub mod echo_server { struct ServerStreamingEchoServer { inner: _Inner, } - impl ServerStreamingSvc for ServerStreamingEchoServer { + impl ServerStreamingSvc + for ServerStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::ServerStreamingEchoStream; - type Future = - BoxFuture, dubbo::status::Status>; - fn call(&mut self, request: Request) -> Self::Future { + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request, + ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { inner.server_streaming_echo(request).await }; + let fut = async move { + inner.server_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server .server_streaming(ServerStreamingEchoServer { inner }, req) .await; @@ -262,23 +254,31 @@ pub mod echo_server { struct ClientStreamingEchoServer { inner: _Inner, } - impl ClientStreamingSvc for ClientStreamingEchoServer { + impl ClientStreamingSvc + for ClientStreamingEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture, dubbo::status::Status>; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { inner.client_streaming_echo(request).await }; + let fut = async move { + inner.client_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server .client_streaming(ClientStreamingEchoServer { inner }, req) .await; @@ -291,41 +291,56 @@ pub mod echo_server { struct BidirectionalStreamingEchoServer { inner: _Inner, } - impl StreamingSvc for BidirectionalStreamingEchoServer { + impl StreamingSvc + for BidirectionalStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::BidirectionalStreamingEchoStream; - type Future = - BoxFuture, dubbo::status::Status>; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = - async move { inner.bidirectional_streaming_echo(request).await }; + let fut = async move { + inner.bidirectional_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server - .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req) + .bidi_streaming( + BidirectionalStreamingEchoServer { + inner, + }, + req, + ) .await; Ok(res) }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs index 0abd5201..de8ea3f8 100644 --- a/examples/greeter/src/greeter/client.rs +++ b/examples/greeter/src/greeter/client.rs @@ -40,12 +40,7 @@ async fn main() { tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); - let http_uri = http::Uri::from_str(&"http://1.1.1.1:8888").unwrap(); - - let mut cli = GreeterClient::new(Connection::new().with_host(http_uri), ClientBuilder::new()); - let directory = StaticDirectory::new("http://127.0.0.1:8888"); - cli = cli.with_directory(Box::new(directory)); - //let mut cli = GreeterClient::connect("http://127.0.0.1:8888".to_string()); + let mut cli = GreeterClient::new(ClientBuilder::from_static(&"http://127.0.0.1:8888")); // Here is example for zk // let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {