Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr: exchange layer abstraction design #126

Merged
merged 29 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
02ad8af
Add: add remoting/net member
mozhou-tech Feb 26, 2023
b3cb54a
Add: add remoting/net member
mozhou-tech Feb 26, 2023
b49eb41
Add: remoting/net/incoming
mozhou-tech Feb 26, 2023
fbe02d6
Add: remoting/net/incoming
mozhou-tech Feb 27, 2023
d48ae45
Add: License
mozhou-tech Feb 27, 2023
8c4cbb4
Add: License
mozhou-tech Feb 27, 2023
27925ea
Merge branch 'main' into main
mozhou-tech Feb 28, 2023
76c2001
Merge branch 'apache:main' into main
mozhou-tech Feb 28, 2023
8bd727d
Merge remote-tracking branch 'origin/main' into main
mozhou-tech Mar 1, 2023
9ddb8d9
Rft: using subpackage to manage registry implementations for avoiding…
mozhou-tech Mar 1, 2023
3e594fa
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
b45d1ba
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
05095fa
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
11139c6
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
d3a048a
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
e777e6b
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
319868c
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
e696071
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
8040708
Merge branch 'apache:main' into main
mozhou-tech Mar 2, 2023
5edc821
Merge remote-tracking branch 'origin/main' into main
mozhou-tech Mar 3, 2023
07ed112
Merge remote-tracking branch 'origin/main' into main
mozhou-tech Mar 4, 2023
ce849be
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 4, 2023
519b3b1
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 4, 2023
0c26956
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 4, 2023
8723f2e
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 4, 2023
044c987
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 6, 2023
52490d0
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 6, 2023
9c84d3b
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 6, 2023
96f1907
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Ftr: remoting/exchange abstraction design
  • Loading branch information
mozhou-tech committed Mar 6, 2023
commit 52490d0e0db6627e590c9f57a91f34541758e9b6
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ members = [
"dubbo-build",
"remoting/net",
"remoting/http",
"remoting/h2",
"remoting/zookeeper",
"remoting/base",
"remoting/xds",
Expand Down
179 changes: 62 additions & 117 deletions examples/echo/src/generated/grpc.examples.echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,68 +40,64 @@ pub mod echo_client {
&mut self,
request: Request<super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
let codec = dubbo::codegen::ProstCodec::<
super::EchoRequest,
super::EchoResponse,
>::default();
let codec =
dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("UnaryEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/UnaryEcho",
);
let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
self.inner.unary(request, codec, path, invocation).await
}
/// ServerStreamingEcho is server side streaming.
pub async fn server_streaming_echo(
&mut self,
request: Request<super::EchoRequest>,
) -> Result<Response<Decoding<super::EchoResponse>>, dubbo::status::Status> {
let codec = dubbo::codegen::ProstCodec::<
super::EchoRequest,
super::EchoResponse,
>::default();
let codec =
dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("ServerStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ServerStreamingEcho",
);
self.inner.server_streaming(request, codec, path, invocation).await
self.inner
.server_streaming(request, codec, path, invocation)
.await
}
/// ClientStreamingEcho is client side streaming.
pub async fn client_streaming_echo(
&mut self,
request: impl IntoStreamingRequest<Message = super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
let codec = dubbo::codegen::ProstCodec::<
super::EchoRequest,
super::EchoResponse,
>::default();
let codec =
dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("ClientStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ClientStreamingEcho",
);
self.inner.client_streaming(request, codec, path, invocation).await
self.inner
.client_streaming(request, codec, path, invocation)
.await
}
/// BidirectionalStreamingEcho is bidi streaming.
pub async fn bidirectional_streaming_echo(
&mut self,
request: impl IntoStreamingRequest<Message = super::EchoRequest>,
) -> Result<Response<Decoding<super::EchoResponse>>, dubbo::status::Status> {
let codec = dubbo::codegen::ProstCodec::<
super::EchoRequest,
super::EchoResponse,
>::default();
let codec =
dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("BidirectionalStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
);
self.inner.bidi_streaming(request, codec, path, invocation).await
self.inner
.bidi_streaming(request, codec, path, invocation)
.await
}
}
}
Expand All @@ -118,9 +114,7 @@ pub mod echo_server {
request: Request<super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
///Server streaming response type for the ServerStreamingEcho method.
type ServerStreamingEchoStream: futures_util::Stream<
Item = Result<super::EchoResponse, dubbo::status::Status>,
>
type ServerStreamingEchoStream: futures_util::Stream<Item = Result<super::EchoResponse, dubbo::status::Status>>
+ Send
+ 'static;
/// ServerStreamingEcho is server side streaming.
Expand All @@ -134,19 +128,14 @@ pub mod echo_server {
request: Request<Decoding<super::EchoRequest>>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
///Server streaming response type for the BidirectionalStreamingEcho method.
type BidirectionalStreamingEchoStream: futures_util::Stream<
Item = Result<super::EchoResponse, dubbo::status::Status>,
>
type BidirectionalStreamingEchoStream: futures_util::Stream<Item = Result<super::EchoResponse, dubbo::status::Status>>
+ Send
+ 'static;
/// BidirectionalStreamingEcho is bidi streaming.
async fn bidirectional_streaming_echo(
&self,
request: Request<Decoding<super::EchoRequest>>,
) -> Result<
Response<Self::BidirectionalStreamingEchoStream>,
dubbo::status::Status,
>;
) -> Result<Response<Self::BidirectionalStreamingEchoStream>, dubbo::status::Status>;
}
/// Echo is the echo service.
#[derive(Debug)]
Expand Down Expand Up @@ -176,10 +165,7 @@ pub mod echo_server {
type Response = http::Response<BoxBody>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
Expand All @@ -192,26 +178,18 @@ pub mod echo_server {
}
impl<T: Echo> UnarySvc<super::EchoRequest> for UnaryEchoServer<T> {
type Response = super::EchoResponse;
type Future = BoxFuture<
Response<Self::Response>,
dubbo::status::Status,
>;
fn call(
&mut self,
request: Request<super::EchoRequest>,
) -> Self::Future {
type Future = BoxFuture<Response<Self::Response>, dubbo::status::Status>;
fn call(&mut self, request: Request<super::EchoRequest>) -> Self::Future {
let inner = self.inner.0.clone();
let fut = async move { inner.unary_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
let mut server = TripleServer::new(
dubbo::codegen::ProstCodec::<
super::EchoResponse,
super::EchoRequest,
>::default(),
);
let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
super::EchoResponse,
super::EchoRequest,
>::default());
let res = server.unary(UnaryEchoServer { inner }, req).await;
Ok(res)
};
Expand All @@ -222,32 +200,22 @@ pub mod echo_server {
struct ServerStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
impl<T: Echo> ServerStreamingSvc<super::EchoRequest>
for ServerStreamingEchoServer<T> {
impl<T: Echo> ServerStreamingSvc<super::EchoRequest> for ServerStreamingEchoServer<T> {
type Response = super::EchoResponse;
type ResponseStream = T::ServerStreamingEchoStream;
type Future = BoxFuture<
Response<Self::ResponseStream>,
dubbo::status::Status,
>;
fn call(
&mut self,
request: Request<super::EchoRequest>,
) -> Self::Future {
type Future =
BoxFuture<Response<Self::ResponseStream>, dubbo::status::Status>;
fn call(&mut self, request: Request<super::EchoRequest>) -> Self::Future {
let inner = self.inner.0.clone();
let fut = async move {
inner.server_streaming_echo(request).await
};
let fut = async move { inner.server_streaming_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
let mut server = TripleServer::new(
dubbo::codegen::ProstCodec::<
super::EchoResponse,
super::EchoRequest,
>::default(),
);
let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
super::EchoResponse,
super::EchoRequest,
>::default());
let res = server
.server_streaming(ServerStreamingEchoServer { inner }, req)
.await;
Expand All @@ -260,31 +228,23 @@ pub mod echo_server {
struct ClientStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
impl<T: Echo> ClientStreamingSvc<super::EchoRequest>
for ClientStreamingEchoServer<T> {
impl<T: Echo> ClientStreamingSvc<super::EchoRequest> for ClientStreamingEchoServer<T> {
type Response = super::EchoResponse;
type Future = BoxFuture<
Response<Self::Response>,
dubbo::status::Status,
>;
type Future = BoxFuture<Response<Self::Response>, dubbo::status::Status>;
fn call(
&mut self,
request: Request<Decoding<super::EchoRequest>>,
) -> Self::Future {
let inner = self.inner.0.clone();
let fut = async move {
inner.client_streaming_echo(request).await
};
let fut = async move { inner.client_streaming_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
let mut server = TripleServer::new(
dubbo::codegen::ProstCodec::<
super::EchoResponse,
super::EchoRequest,
>::default(),
);
let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
super::EchoResponse,
super::EchoRequest,
>::default());
let res = server
.client_streaming(ClientStreamingEchoServer { inner }, req)
.await;
Expand All @@ -297,56 +257,41 @@ pub mod echo_server {
struct BidirectionalStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
impl<T: Echo> StreamingSvc<super::EchoRequest>
for BidirectionalStreamingEchoServer<T> {
impl<T: Echo> StreamingSvc<super::EchoRequest> for BidirectionalStreamingEchoServer<T> {
type Response = super::EchoResponse;
type ResponseStream = T::BidirectionalStreamingEchoStream;
type Future = BoxFuture<
Response<Self::ResponseStream>,
dubbo::status::Status,
>;
type Future =
BoxFuture<Response<Self::ResponseStream>, dubbo::status::Status>;
fn call(
&mut self,
request: Request<Decoding<super::EchoRequest>>,
) -> Self::Future {
let inner = self.inner.0.clone();
let fut = async move {
inner.bidirectional_streaming_echo(request).await
};
let fut =
async move { inner.bidirectional_streaming_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
let mut server = TripleServer::new(
dubbo::codegen::ProstCodec::<
super::EchoResponse,
super::EchoRequest,
>::default(),
);
let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
super::EchoResponse,
super::EchoRequest,
>::default());
let res = server
.bidi_streaming(
BidirectionalStreamingEchoServer {
inner,
},
req,
)
.bidi_streaming(BidirectionalStreamingEchoServer { inner }, req)
.await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
Ok(
http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap(),
)
})
}
_ => Box::pin(async move {
Ok(http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap())
}),
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions protocol/triple/src/triple_invoker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
*/
use base::{Node, Url};

use protocol_base::invocation::BoxInvocation;
use protocol_base::invoker::{BaseInvoker, Invoker};
use protocol_base::{
invocation::BoxInvocation,
invoker::{BaseInvoker, Invoker},
};
use std::sync::Arc;

pub struct TripleInvoker {
Expand Down
8 changes: 0 additions & 8 deletions remoting/h2/Cargo.toml

This file was deleted.

Loading