Skip to content

Commit

Permalink
use new pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
robberphex committed Feb 8, 2023
1 parent 3e09d50 commit 6d49aff
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 220 deletions.
54 changes: 21 additions & 33 deletions dubbo-build/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,56 +61,44 @@ pub fn generate<T: Service>(
// will trigger if compression is disabled
clippy::let_unit_value,
)]
use dubbo::{codegen::*, cluster::directory::StaticDirectory};
use dubbo::codegen::*;

#service_doc
#(#struct_attributes)*
#[derive(Debug, Clone, Default)]
pub struct #service_ident<T> {
inner: TripleClient<T>,
pub struct #service_ident {
inner: TripleClient,
}

impl #service_ident<ClientBoxService> {
impl #service_ident {
pub fn connect(host: String) -> Self {
let mut cli = TripleClient::connect(host.clone());
cli = cli.with_directory(Box::new(StaticDirectory::new(&host)));
let cli = TripleClient::connect(host);
#service_ident {
inner: cli,
}
}

pub fn build(builder: ClientBuilder) -> Self {
Self {
inner: TripleClient::with_builder(builder),
}
}
}
// pub fn build(builder: ClientBuilder) -> Self {
// Self {
// inner: TripleClient::new(builder),
// }
// }

impl<T> #service_ident<T>
where
T: Service<http::Request<hyperBody>, Response = http::Response<BoxBody>>,
T::Error: Into<StdError>,
{
pub fn new(inner: T, builder: ClientBuilder) -> Self {
pub fn new(builder: ClientBuilder) -> Self {
Self {
inner: TripleClient::new(inner, builder),
inner: TripleClient::new(builder),
}
}

pub fn with_filter<F>(self, filter: F) -> #service_ident<FilterService<T, F>>
where
F: Filter,
{
let inner = self.inner.with_filter(filter);
#service_ident {
inner,
}
}

pub fn with_directory(mut self, directory: Box<dyn Directory>) -> Self {
self.inner = self.inner.with_directory(directory);
self
}
// pub fn with_filter<F>(self, filter: F) -> #service_ident<FilterService<T, F>>
// where
// F: Filter,
// {
// let inner = self.inner.with_filter(filter);
// #service_ident {
// inner,
// }
// }

#methods

Expand Down
4 changes: 4 additions & 0 deletions dubbo/src/cluster/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl StaticDirectory {
};
StaticDirectory { uri: uri }
}

pub fn from_uri(uri: &http::Uri) -> StaticDirectory {
StaticDirectory { uri: uri.clone() }
}
}

impl Directory for StaticDirectory {
Expand Down
18 changes: 8 additions & 10 deletions dubbo/src/protocol/triple/triple_invoker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,25 @@
* limitations under the License.
*/

use std::str::FromStr;

use tower_service::Service;

use crate::common::url::Url;
use crate::protocol::Invoker;
use crate::triple::client::builder::{ClientBoxService, ClientBuilder};
use crate::triple::client::builder::ClientBoxService;

pub struct TripleInvoker {
url: Url,
conn: ClientBoxService,
}

impl TripleInvoker {
pub fn new(url: Url) -> TripleInvoker {
let uri = http::Uri::from_str(&url.to_url()).unwrap();
Self {
url,
conn: ClientBuilder::from(uri).connect(),
}
}
// pub fn new(url: Url) -> TripleInvoker {
// let uri = http::Uri::from_str(&url.to_url()).unwrap();
// Self {
// url,
// conn: ClientBuilder::from_uri(&uri).build()connect(),
// }
// }
}

impl Invoker<http::Request<hyper::Body>> for TripleInvoker {
Expand Down
5 changes: 3 additions & 2 deletions dubbo/src/protocol/triple/triple_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ impl Protocol for TripleProtocol {
Box::new(TripleExporter::new())
}

async fn refer(self, url: Url) -> Self::Invoker {
TripleInvoker::new(url)
async fn refer(self, _url: Url) -> Self::Invoker {
todo!()
// TripleInvoker::new(url)
// Self::Invoker
}
}
4 changes: 2 additions & 2 deletions dubbo/src/registry/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::sync::{Arc, RwLock};

use super::memory_registry::MemoryRegistry;
use super::BoxRegistry;
use crate::codegen::TripleInvoker;
use crate::common::url::Url;
use crate::protocol::triple::triple_exporter::TripleExporter;
use crate::protocol::triple::triple_protocol::TripleProtocol;
Expand Down Expand Up @@ -104,6 +103,7 @@ impl Protocol for RegistryProtocol {
// get Registry from registry_url
// init directory based on registry_url and Registry
// init Cluster based on Directory generates Invoker
Box::new(TripleInvoker::new(url))
todo!()
//Box::new(TripleInvoker::new(url))
}
}
67 changes: 34 additions & 33 deletions dubbo/src/triple/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,46 @@
* limitations under the License.
*/

use http::Uri;
use hyper::client::conn::Builder;
use tokio::time::Duration;
use tower::ServiceBuilder;

use crate::triple::transport::connection::Connection;
use crate::cluster::directory::StaticDirectory;
use crate::codegen::Directory;
use crate::triple::compression::CompressionEncoding;
use crate::utils::boxed::BoxService;

use super::TripleClient;

pub type ClientBoxService =
BoxService<http::Request<hyper::Body>, http::Response<crate::BoxBody>, crate::Error>;

#[derive(Clone, Debug, Default)]
pub struct ClientBuilder {
pub uri: Uri,
pub timeout: Option<u64>,
pub connector: &'static str,
directory: Option<Box<dyn Directory>>,
}

impl ClientBuilder {
pub fn new() -> ClientBuilder {
ClientBuilder {
uri: Uri::builder().build().unwrap(),
timeout: None,
connector: "",
directory: None,
}
}

pub fn from_static(host: &str) -> ClientBuilder {
Self {
timeout: None,
connector: "",
directory: Some(Box::new(StaticDirectory::new(&host))),
}
}

pub fn from_static(s: &'static str) -> ClientBuilder {
Self::from(Uri::from_static(s))
pub fn from_uri(uri: &http::Uri) -> ClientBuilder {
Self {
timeout: None,
connector: "",
directory: Some(Box::new(StaticDirectory::from_uri(&uri))),
}
}

pub fn with_timeout(self, timeout: u64) -> Self {
Expand All @@ -53,9 +64,17 @@ impl ClientBuilder {
}
}

/// host: http://0.0.0.0:8888
pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
Self {
directory: Some(directory),
..self
}
}

pub fn with_host(self, host: &'static str) -> Self {
Self {
uri: Uri::from_static(host),
directory: Some(Box::new(StaticDirectory::new(&host))),
..self
}
}
Expand All @@ -67,28 +86,10 @@ impl ClientBuilder {
}
}

pub fn connect(self) -> ClientBoxService {
let builder = ServiceBuilder::new();
let timeout = self.timeout.unwrap_or(5);
let builder = builder.timeout(Duration::from_secs(timeout));

let mut b = Builder::new();
let hyper_builder = b.http2_only(true);
let conn = Connection::new()
.with_host(self.uri.clone())
.with_connector(self.connector)
.with_builder(hyper_builder.to_owned());

BoxService::new(builder.service(conn))
}
}

impl From<Uri> for ClientBuilder {
fn from(u: Uri) -> Self {
Self {
uri: u,
timeout: None,
connector: "tcp",
pub fn build(self) -> TripleClient {
TripleClient {
send_compression_encoding: Some(CompressionEncoding::Gzip),
directory: self.directory,
}
}
}
77 changes: 10 additions & 67 deletions dubbo/src/triple/client/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,89 +24,32 @@ use rand::prelude::SliceRandom;
use tower_service::Service;

use super::super::transport::connection::Connection;
use super::builder::{ClientBoxService, ClientBuilder};
use super::builder::ClientBuilder;
use crate::codegen::{Directory, RpcInvocation};
use crate::filter::service::FilterService;
use crate::filter::Filter;
use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response};

use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response};
use crate::triple::codec::Codec;
use crate::triple::compression::CompressionEncoding;
use crate::triple::decode::Decoding;
use crate::triple::encode::encode;

#[derive(Debug, Clone, Default)]
pub struct TripleClient<T> {
builder: Option<ClientBuilder>,
inner: T,
send_compression_encoding: Option<CompressionEncoding>,
directory: Option<Box<dyn Directory>>,
pub struct TripleClient {
pub(crate) send_compression_encoding: Option<CompressionEncoding>,
pub(crate) directory: Option<Box<dyn Directory>>,
}

impl TripleClient<ClientBoxService> {
impl TripleClient {
pub fn connect(host: String) -> Self {
let uri = match http::Uri::from_str(&host) {
Ok(v) => v,
Err(err) => {
tracing::error!("http uri parse error: {}, host: {}", err, host);
panic!("http uri parse error: {}, host: {}", err, host)
}
};

let builder = ClientBuilder::from(uri);

TripleClient {
builder: Some(builder.clone()),
inner: builder.connect(),
send_compression_encoding: Some(CompressionEncoding::Gzip),
directory: None,
}
}
let builder = ClientBuilder::from_static(&host);

pub fn with_builder(builder: ClientBuilder) -> Self {
TripleClient {
builder: Some(builder.clone()),
inner: builder.connect(),
send_compression_encoding: Some(CompressionEncoding::Gzip),
directory: None,
}
builder.build()
}
}

impl<T> TripleClient<T> {
pub fn new(inner: T, builder: ClientBuilder) -> Self {
TripleClient {
builder: Some(builder),
inner,
send_compression_encoding: Some(CompressionEncoding::Gzip),
directory: None,
}
pub fn new(builder: ClientBuilder) -> Self {
builder.build()
}

pub fn with_filter<F>(self, filter: F) -> TripleClient<FilterService<T, F>>
where
F: Filter,
{
TripleClient::new(
FilterService::new(self.inner, filter),
self.builder.unwrap(),
)
}

/// host: http://0.0.0.0:8888
pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
TripleClient {
directory: Some(directory),
..self
}
}
}

impl<T> TripleClient<T>
where
T: Service<http::Request<hyper::Body>, Response = http::Response<crate::BoxBody>>,
T::Error: Into<crate::Error>,
{
fn map_request(
&self,
uri: http::Uri,
Expand Down
9 changes: 5 additions & 4 deletions examples/echo/src/echo/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ impl Filter for FakeFilter {

#[tokio::main]
async fn main() {
let builder = ClientBuilder::new()
.with_connector("unix")
.with_host("unix://127.0.0.1:8888");
let mut cli = EchoClient::build(builder);
// let builder = ClientBuilder::new()
// .with_connector("unix")
// .with_host("unix://127.0.0.1:8888");
let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888").with_timeout(1000000);
let mut cli = EchoClient::new(builder);
// let mut unary_cli = cli.clone().with_filter(FakeFilter {});
// let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888"));
let resp = cli
Expand Down
Loading

0 comments on commit 6d49aff

Please sign in to comment.