Skip to content

Commit

Permalink
switch to use common quic listen code
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Nov 20, 2023
1 parent 692a8ba commit bacbccf
Show file tree
Hide file tree
Showing 22 changed files with 276 additions and 583 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 12 additions & 3 deletions g3proxy/src/serve/dummy_close/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use tokio::sync::broadcast;
use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;

use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats};
use g3_daemon::listen::{
AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats,
};
use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand};
use g3_types::metrics::MetricsName;

Expand Down Expand Up @@ -151,6 +153,15 @@ impl AcceptTcpServer for DummyCloseServer {
}
}

#[async_trait]
impl AcceptQuicServer for DummyCloseServer {
async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {}

fn get_reloaded(&self) -> ArcAcceptQuicServer {
crate::serve::get_or_insert_default(self.config.name())
}
}

#[async_trait]
impl Server for DummyCloseServer {
fn escaper(&self) -> &MetricsName {
Expand Down Expand Up @@ -187,6 +198,4 @@ impl Server for DummyCloseServer {
_cc_info: ClientConnectionInfo,
) {
}

async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {}
}
62 changes: 36 additions & 26 deletions g3proxy/src/serve/http_proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ use tokio::sync::{broadcast, mpsc};
use tokio_openssl::SslStream;
use tokio_rustls::{server::TlsStream, TlsAcceptor};

use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime};
use g3_daemon::listen::{
AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats,
ListenTcpRuntime,
};
use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand};
use g3_types::acl::{AclAction, AclNetworkRule};
use g3_types::acl_set::AclDstHostRuleSet;
Expand Down Expand Up @@ -385,6 +388,38 @@ impl AcceptTcpServer for HttpProxyServer {
}
}

#[async_trait]
impl AcceptQuicServer for HttpProxyServer {
async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) {
let client_addr = cc_info.client_addr();
self.server_stats.add_conn(client_addr);
if self.drop_early(client_addr) {
return;
}

loop {
// TODO update ctx and quit gracefully
match connection.accept_bi().await {
Ok((send_stream, recv_stream)) => {
self.spawn_quic_stream_task(send_stream, recv_stream, cc_info.clone())
}
Err(e) => {
debug!(
"{} - {} quic connection error: {e:?}",
cc_info.sock_local_addr(),
cc_info.sock_peer_addr()
);
break;
}
}
}
}

fn get_reloaded(&self) -> ArcAcceptQuicServer {
crate::serve::get_or_insert_default(self.config.name())
}
}

#[async_trait]
impl Server for HttpProxyServer {
fn escaper(&self) -> &MetricsName {
Expand Down Expand Up @@ -435,29 +470,4 @@ impl Server for HttpProxyServer {

self.spawn_stream_task(stream, cc_info).await;
}

async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) {
let client_addr = cc_info.client_addr();
self.server_stats.add_conn(client_addr);
if self.drop_early(client_addr) {
return;
}

loop {
// TODO update ctx and quit gracefully
match connection.accept_bi().await {
Ok((send_stream, recv_stream)) => {
self.spawn_quic_stream_task(send_stream, recv_stream, cc_info.clone())
}
Err(e) => {
debug!(
"{} - {} quic connection error: {e:?}",
cc_info.sock_local_addr(),
cc_info.sock_peer_addr()
);
break;
}
}
}
}
}
16 changes: 13 additions & 3 deletions g3proxy/src/serve/http_rproxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;
use tokio_rustls::LazyConfigAcceptor;

use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime};
use g3_daemon::listen::{
AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats,
ListenTcpRuntime,
};
use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand};
use g3_types::acl::{AclAction, AclNetworkRule};
use g3_types::metrics::MetricsName;
Expand Down Expand Up @@ -402,6 +405,15 @@ impl AcceptTcpServer for HttpRProxyServer {
}
}

#[async_trait]
impl AcceptQuicServer for HttpRProxyServer {
async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {}

fn get_reloaded(&self) -> ArcAcceptQuicServer {
crate::serve::get_or_insert_default(self.config.name())
}
}

#[async_trait]
impl Server for HttpRProxyServer {
fn escaper(&self) -> &MetricsName {
Expand Down Expand Up @@ -452,6 +464,4 @@ impl Server for HttpRProxyServer {

self.spawn_stream_task(stream, cc_info).await;
}

async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {}
}
16 changes: 13 additions & 3 deletions g3proxy/src/serve/intelli_proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use tokio::sync::broadcast;
use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;

use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime};
use g3_daemon::listen::{
AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats,
ListenTcpRuntime,
};
use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand};
use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader};
use g3_types::acl::{AclAction, AclNetworkRule};
Expand Down Expand Up @@ -266,6 +269,15 @@ impl AcceptTcpServer for IntelliProxy {
}
}

#[async_trait]
impl AcceptQuicServer for IntelliProxy {
async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {}

fn get_reloaded(&self) -> ArcAcceptQuicServer {
crate::serve::get_or_insert_default(self.config.name())
}
}

#[async_trait]
impl Server for IntelliProxy {
fn escaper(&self) -> &MetricsName {
Expand Down Expand Up @@ -302,6 +314,4 @@ impl Server for IntelliProxy {
_cc_info: ClientConnectionInfo,
) {
}

async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {}
}
12 changes: 4 additions & 8 deletions g3proxy/src/serve/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
use std::sync::Arc;

use async_trait::async_trait;
use quinn::Connection;
use tokio::net::TcpStream;
use tokio::sync::broadcast;
use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;

use g3_daemon::listen::{AcceptTcpServer, ListenStats};
use g3_daemon::listen::{AcceptQuicServer, AcceptTcpServer, ListenStats};
use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerQuitPolicy, ServerReloadCommand};
use g3_types::metrics::MetricsName;

Expand All @@ -35,9 +34,6 @@ pub(crate) use registry::{foreach_online as foreach_server, get_names, get_or_in
mod idle_check;
pub(crate) use idle_check::ServerIdleChecker;

mod runtime;
use runtime::ListenQuicRuntime;

mod dummy_close;
mod intelli_proxy;
mod native_tls_port;
Expand Down Expand Up @@ -90,7 +86,9 @@ pub(crate) trait ServerInternal {
}

#[async_trait]
pub(crate) trait Server: ServerInternal + BaseServer + AcceptTcpServer {
pub(crate) trait Server:
ServerInternal + BaseServer + AcceptTcpServer + AcceptQuicServer
{
fn escaper(&self) -> &MetricsName;
fn user_group(&self) -> &MetricsName;
fn auditor(&self) -> &MetricsName;
Expand All @@ -106,8 +104,6 @@ pub(crate) trait Server: ServerInternal + BaseServer + AcceptTcpServer {
async fn run_rustls_task(&self, stream: TlsStream<TcpStream>, cc_info: ClientConnectionInfo);

async fn run_openssl_task(&self, stream: SslStream<TcpStream>, cc_info: ClientConnectionInfo);

async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo);
}

pub(crate) type ArcServer = Arc<dyn Server + Send + Sync>;
Expand Down
16 changes: 13 additions & 3 deletions g3proxy/src/serve/native_tls_port/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use tokio::sync::broadcast;
use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;

use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime};
use g3_daemon::listen::{
AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats,
ListenTcpRuntime,
};
use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand};
use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader};
use g3_types::acl::{AclAction, AclNetworkRule};
Expand Down Expand Up @@ -282,6 +285,15 @@ impl AcceptTcpServer for NativeTlsPort {
}
}

#[async_trait]
impl AcceptQuicServer for NativeTlsPort {
async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {}

fn get_reloaded(&self) -> ArcAcceptQuicServer {
crate::serve::get_or_insert_default(self.config.name())
}
}

#[async_trait]
impl Server for NativeTlsPort {
fn escaper(&self) -> &MetricsName {
Expand Down Expand Up @@ -318,6 +330,4 @@ impl Server for NativeTlsPort {
_cc_info: ClientConnectionInfo,
) {
}

async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {}
}
30 changes: 22 additions & 8 deletions g3proxy/src/serve/plain_quic_port/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@ use tokio::sync::{broadcast, watch};
use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;

use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenQuicConf};
use g3_daemon::listen::{
AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenQuicConf,
ListenQuicRuntime, ListenStats,
};
use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand};
use g3_types::acl::AclNetworkRule;
use g3_types::metrics::MetricsName;
use g3_types::net::UdpListenConfig;

use crate::config::server::plain_quic_port::{PlainQuicPortConfig, PlainQuicPortUpdateFlags};
use crate::config::server::{AnyServerConfig, ServerConfig};
use crate::serve::{ArcServer, ListenQuicRuntime, Server, ServerInternal, ServerQuitPolicy};
use crate::serve::{ArcServer, Server, ServerInternal, ServerQuitPolicy};

#[derive(Clone)]
struct PlainQuicPortAuxConfig {
Expand Down Expand Up @@ -229,7 +232,11 @@ impl ServerInternal for PlainQuicPort {

fn _start_runtime(&self, server: &ArcServer) -> anyhow::Result<()> {
let config = self.config.load();
let runtime = ListenQuicRuntime::new(server, config.as_ref(), config.listen.clone());
let runtime = ListenQuicRuntime::new(
server.clone(),
server.get_listen_stats(),
config.listen.clone(),
);
runtime.run_all_instances(
config.listen_in_worker,
&self.quinn_config,
Expand Down Expand Up @@ -269,6 +276,18 @@ impl AcceptTcpServer for PlainQuicPort {
}
}

#[async_trait]
impl AcceptQuicServer for PlainQuicPort {
async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) {
let next_server = self.next_server.load().as_ref().clone();
next_server.run_quic_task(connection, cc_info).await
}

fn get_reloaded(&self) -> ArcAcceptQuicServer {
crate::serve::get_or_insert_default(&self.name)
}
}

#[async_trait]
impl Server for PlainQuicPort {
fn escaper(&self) -> &MetricsName {
Expand Down Expand Up @@ -305,9 +324,4 @@ impl Server for PlainQuicPort {
_cc_info: ClientConnectionInfo,
) {
}

async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) {
let next_server = self.next_server.load().as_ref().clone();
next_server.run_quic_task(connection, cc_info).await;
}
}
16 changes: 13 additions & 3 deletions g3proxy/src/serve/plain_tcp_port/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use tokio::sync::broadcast;
use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;

use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime};
use g3_daemon::listen::{
AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats,
ListenTcpRuntime,
};
use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand};
use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader};
use g3_types::acl::{AclAction, AclNetworkRule};
Expand Down Expand Up @@ -232,6 +235,15 @@ impl AcceptTcpServer for PlainTcpPort {
}
}

#[async_trait]
impl AcceptQuicServer for PlainTcpPort {
async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {}

fn get_reloaded(&self) -> ArcAcceptQuicServer {
crate::serve::get_or_insert_default(self.config.name())
}
}

#[async_trait]
impl Server for PlainTcpPort {
fn escaper(&self) -> &MetricsName {
Expand Down Expand Up @@ -268,6 +280,4 @@ impl Server for PlainTcpPort {
_cc_info: ClientConnectionInfo,
) {
}

async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {}
}
Loading

0 comments on commit bacbccf

Please sign in to comment.