Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr: exchange layer abstraction design #126

Merged
merged 29 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
02ad8af
Add: add remoting/net member
mozhou-tech Feb 26, 2023
b3cb54a
Add: add remoting/net member
mozhou-tech Feb 26, 2023
b49eb41
Add: remoting/net/incoming
mozhou-tech Feb 26, 2023
fbe02d6
Add: remoting/net/incoming
mozhou-tech Feb 27, 2023
d48ae45
Add: License
mozhou-tech Feb 27, 2023
8c4cbb4
Add: License
mozhou-tech Feb 27, 2023
27925ea
Merge branch 'main' into main
mozhou-tech Feb 28, 2023
76c2001
Merge branch 'apache:main' into main
mozhou-tech Feb 28, 2023
8bd727d
Merge remote-tracking branch 'origin/main' into main
mozhou-tech Mar 1, 2023
9ddb8d9
Rft: using subpackage to manage registry implementations for avoiding…
mozhou-tech Mar 1, 2023
3e594fa
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
b45d1ba
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
05095fa
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
11139c6
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
d3a048a
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
e777e6b
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
319868c
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
e696071
Ftr: add common/utils subpackage
mozhou-tech Mar 1, 2023
8040708
Merge branch 'apache:main' into main
mozhou-tech Mar 2, 2023
5edc821
Merge remote-tracking branch 'origin/main' into main
mozhou-tech Mar 3, 2023
07ed112
Merge remote-tracking branch 'origin/main' into main
mozhou-tech Mar 4, 2023
ce849be
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 4, 2023
519b3b1
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 4, 2023
0c26956
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 4, 2023
8723f2e
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 4, 2023
044c987
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 6, 2023
52490d0
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 6, 2023
9c84d3b
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 6, 2023
96f1907
Ftr: remoting/exchange abstraction design
mozhou-tech Mar 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ members = [
"dubbo-build",
"remoting/net",
"remoting/http",
"remoting/h2",
"remoting/zookeeper",
"remoting/exchange",
"remoting/base",
"remoting/xds",
"protocol/dubbo2",
"protocol/base",
Expand All @@ -28,6 +27,8 @@ members = [
pin-project = "1"
tokio = "1.0"
tower = "0.4"
tower-service = "0.3.1"
tower-layer = "0.3"
tokio-stream = "0.1"
tokio-util = "0.7"
socket2 = "0.4"
Expand All @@ -42,16 +43,20 @@ logger = {path="./common/logger"}
utils = {path="./common/utils"}
base = {path="./common/base"}
remoting-net = {path="./remoting/net"}
protocol = {path= "protocol/base" }
remoting-base = {path="./remoting/base"}
protocol-base = {path= "protocol/base" }
protocol-dubbo2 = {path="./protocol/dubbo2"}
protocol-triple = {path="./protocol/triple"}
registry-zookeeper = {path="./registry/zookeeper"}
registry-nacos = {path="./registry/nacos"}
anyhow = "1.0.66"
thiserror = "1.0.30"
dubbo = { path = "./dubbo/" }
bb8 = "0.8.0" # A connecton pool based on tokio
serde_yaml = "0.9.4" # yaml file parser
once_cell = "1.16.0"
itertools = "0.10.1"
bytes = "1.0"



4 changes: 4 additions & 0 deletions common/base/src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ impl Url {
pub fn protocol(&self) -> String {
self.scheme.clone()
}

pub fn get_ip_port(&self) -> String {
format!("{}:{}", self.ip, self.port)
}
}

impl Display for Url {
Expand Down
17 changes: 8 additions & 9 deletions dubbo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,30 @@ repository = "https://github.com/apache/dubbo-rust.git"
[dependencies]
hyper = { version = "0.14.19", features = ["full"] }
http = "0.2"
tower-service = "0.3.1"
tower-service.workspace = true
http-body = "0.4.4"
tower = { version = "0.4.12", features = ["timeout"] }
tower = { workspace = true, features = ["timeout"] }
futures-util = "0.3.23"
futures-core = "0.3.23"
tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net", "signal"] }
tokio = { workspace = true, features = ["rt-multi-thread", "time", "fs", "macros", "net", "signal"] }
prost = "0.10.4"
async-trait = "0.1.56"
tower-layer = "0.3"
bytes = "1.0"
tower-layer.workspace = true
bytes.workspace = true
pin-project.workspace = true
rand = "0.8.5"
serde_json.workspace = true
serde = { workspace = true, features = ["derive"] }
futures = "0.3"
tracing = "0.1"
tracing-subscriber = "0.3.15"
futures.workspace = true
axum = "0.5.9"
async-stream = "0.3"
flate2 = "1.0"
aws-smithy-http = "0.54.1"
itertools.workspace = true
urlencoding.workspace = true
lazy_static.workspace = true
base.workspace=true
base.workspace = true
logger.workspace = true

dubbo-config = { path = "../config", version = "0.3.0" }

Expand Down
1 change: 1 addition & 0 deletions dubbo/src/cluster/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
registry::{memory_registry::MemoryNotifyListener, BoxRegistry, RegistryWrapper},
};
use base::Url;
use logger::tracing;

/// Directory.
///
Expand Down
1 change: 1 addition & 0 deletions dubbo/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{
thread,
};

use logger::tracing;
use serde_json::Value;
use state::Container;

Expand Down
1 change: 1 addition & 0 deletions dubbo/src/filter/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::time::{SystemTime, UNIX_EPOCH};

use logger::tracing;
use serde_json::Value;

use crate::{
Expand Down
1 change: 1 addition & 0 deletions dubbo/src/filter/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use logger::tracing;
use std::time::{SystemTime, UNIX_EPOCH};

use crate::{
Expand Down
19 changes: 9 additions & 10 deletions dubbo/src/framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ use std::{
sync::{Arc, Mutex},
};

use base::Url;
use futures::{future, Future};
use tracing::{debug, info};

use crate::{
protocol::{BoxExporter, Protocol},
registry::{
Expand All @@ -34,7 +30,10 @@ use crate::{
BoxRegistry, Registry,
},
};
use base::Url;
use dubbo_config::{get_global_config, protocol::ProtocolRetrieve, RootConfig};
use futures::{future, Future};
use logger::tracing;

// Invoker是否可以基于hyper写一个通用的

Expand Down Expand Up @@ -78,10 +77,10 @@ impl Dubbo {
}

let root_config = self.config.as_ref().unwrap();
debug!("global conf: {:?}", root_config);
tracing::debug!("global conf: {:?}", root_config);
// env::set_var("ZOOKEEPER_SERVERS",root_config);
for (_, service_config) in root_config.provider.services.iter() {
info!("init service name: {}", service_config.interface);
tracing::info!("init service name: {}", service_config.interface);
let url = if root_config
.protocols
.contains_key(service_config.protocol.as_str())
Expand All @@ -91,12 +90,12 @@ impl Dubbo {
.get_protocol_or_default(service_config.protocol.as_str());
let protocol_url =
format!("{}/{}", protocol.to_url(), service_config.interface.clone(),);
info!("protocol_url: {:?}", protocol_url);
tracing::info!("protocol_url: {:?}", protocol_url);
Url::from_url(&protocol_url)
} else {
return Err(format!("base {:?} not exists", service_config.protocol).into());
};
info!("url: {:?}", url);
tracing::info!("url: {:?}", url);
if url.is_none() {
continue;
}
Expand All @@ -116,7 +115,7 @@ impl Dubbo {

pub async fn start(&mut self) {
self.init().unwrap();
info!("starting...");
tracing::info!("starting...");
// TODO: server registry
let mem_reg = Box::new(
RegistryProtocol::new()
Expand All @@ -126,7 +125,7 @@ impl Dubbo {
let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
for (name, items) in self.protocols.iter() {
for url in items.iter() {
info!("base: {:?}, service url: {:?}", name, url);
tracing::info!("base: {:?}, service url: {:?}", name, url);
let exporter = mem_reg.clone().export(url.to_owned());
async_vec.push(exporter);
//TODO multiple registry
Expand Down
2 changes: 1 addition & 1 deletion dubbo/src/registry/memory_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

#![allow(unused_variables, dead_code, missing_docs)]

use logger::tracing::debug;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use tracing::debug;

use base::Url;

Expand Down
1 change: 1 addition & 0 deletions dubbo/src/registry/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

use base::Url;
use logger::tracing;
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
Expand Down
2 changes: 1 addition & 1 deletion dubbo/src/registry/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{

use base::Url;
use itertools::Itertools;
use tracing::info;
use logger::tracing::info;

use crate::{
registry::{BoxRegistry, Registry},
Expand Down
1 change: 1 addition & 0 deletions dubbo/src/triple/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{pin::Pin, task::Poll};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures_util::{future, ready, Stream};
use http_body::Body;
use logger::tracing;

use super::compression::{decompress, CompressionEncoding};
use crate::{
Expand Down
1 change: 1 addition & 0 deletions dubbo/src/triple/server/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{
use base::Url;
use http::{Request, Response, Uri};
use hyper::body::Body;
use logger::tracing;
use tower_service::Service;

use crate::{triple::transport::DubboServer, BoxBody};
Expand Down
2 changes: 1 addition & 1 deletion dubbo/src/triple/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
use std::task::Poll;

use hyper::client::{conn::Builder, service::Connect};
use logger::tracing::debug;
use tower_service::Service;
use tracing::debug;

use crate::{boxed, triple::transport::connector::get_connector};

Expand Down
1 change: 1 addition & 0 deletions dubbo/src/triple/transport/connector/http_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{

use http::Uri;
use hyper::client::connect::dns::Name;
use logger::tracing;
use tokio::net::TcpStream;
use tower_service::Service;

Expand Down
1 change: 1 addition & 0 deletions dubbo/src/triple/transport/connector/unix_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{

use http::Uri;
use hyper::client::connect::dns::Name;
use logger::tracing;
use tokio::net::UnixStream;
use tower_service::Service;

Expand Down
1 change: 1 addition & 0 deletions dubbo/src/triple/transport/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod unix_listener;
use std::net::SocketAddr;

use async_trait::async_trait;
use logger::tracing;
use tokio::io::{AsyncRead, AsyncWrite};

use super::io::BoxIO;
Expand Down
1 change: 1 addition & 0 deletions dubbo/src/triple/transport/listener/tcp_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::Listener;
use async_trait::async_trait;
use futures_core::Stream;
use hyper::server::accept::Accept;
use logger::tracing;
use tokio::net::{TcpListener as tokioTcpListener, TcpStream};

pub struct TcpListener {
Expand Down
1 change: 1 addition & 0 deletions dubbo/src/triple/transport/listener/unix_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::Listener;
use async_trait::async_trait;
use futures_core::Stream;
use hyper::server::accept::Accept;
use logger::tracing;
use tokio::net::{UnixListener as tokioUnixListener, UnixStream};

pub struct UnixListener {
Expand Down
1 change: 1 addition & 0 deletions dubbo/src/triple/transport/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::net::SocketAddr;
use futures_core::Future;
use http::{Request, Response};
use hyper::body::Body;
use logger::tracing;
use tokio::time::Duration;
use tower_service::Service;

Expand Down
5 changes: 3 additions & 2 deletions protocol/base/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
[package]
name = "protocol"
name = "protocol-base"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dashmap.workspace = true
base.workspace = true
base.workspace = true
thiserror.workspace = true
24 changes: 5 additions & 19 deletions protocol/base/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use thiserror::Error;

use std::{
error::Error,
fmt::{Debug, Display, Formatter},
};

pub struct InvokerError(String);

impl Debug for InvokerError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(self.0.as_str())
}
#[derive(Error, Debug)]
pub enum InvokerError {
#[error("unknown invoker error.")]
Unknown,
}

impl Display for InvokerError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(self.0.as_str())
}
}

impl Error for InvokerError {}
2 changes: 2 additions & 0 deletions protocol/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ pub mod error;
pub mod invocation;
pub mod invoker;
pub mod output;

pub type ProtocolName = &'static str;
4 changes: 2 additions & 2 deletions protocol/dubbo2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
remoting-net.workspace = true
protocol.workspace = true
remoting-base.workspace = true
protocol-base.workspace = true
2 changes: 1 addition & 1 deletion protocol/triple/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ edition = "2021"

[dependencies]
remoting-net.workspace = true
protocol.workspace = true
protocol-base.workspace = true
base.workspace = true
3 changes: 2 additions & 1 deletion protocol/triple/src/triple_invoker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
* limitations under the License.
*/
use base::{Node, Url};
use protocol::{

use protocol_base::{
invocation::BoxInvocation,
invoker::{BaseInvoker, Invoker},
};
Expand Down
14 changes: 14 additions & 0 deletions remoting/base/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "remoting-base"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bytes.workspace = true
base.workspace = true
thiserror.workspace = true
dashmap.workspace = true
protocol-base.workspace = true
anyhow.workspace = true
File renamed without changes.
Loading