diff --git a/Cargo.lock b/Cargo.lock index 070a2f11e..b693e35f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1681,6 +1681,7 @@ dependencies = [ "log", "once_cell", "openssl-probe", + "quinn", "rand", "rustc_version", "rustls", diff --git a/g3proxy/src/serve/dummy_close/server.rs b/g3proxy/src/serve/dummy_close/server.rs index 2e51262c7..e00368e6b 100644 --- a/g3proxy/src/serve/dummy_close/server.rs +++ b/g3proxy/src/serve/dummy_close/server.rs @@ -24,7 +24,9 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::metrics::MetricsName; @@ -151,6 +153,15 @@ impl AcceptTcpServer for DummyCloseServer { } } +#[async_trait] +impl AcceptQuicServer for DummyCloseServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for DummyCloseServer { fn escaper(&self) -> &MetricsName { @@ -187,6 +198,4 @@ impl Server for DummyCloseServer { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/http_proxy/server.rs b/g3proxy/src/serve/http_proxy/server.rs index 9adf69cd9..7f43c436c 100644 --- a/g3proxy/src/serve/http_proxy/server.rs +++ b/g3proxy/src/serve/http_proxy/server.rs @@ -30,7 +30,10 @@ use tokio::sync::{broadcast, mpsc}; use tokio_openssl::SslStream; use tokio_rustls::{server::TlsStream, TlsAcceptor}; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::acl_set::AclDstHostRuleSet; @@ -385,6 +388,38 @@ impl AcceptTcpServer for HttpProxyServer { } } +#[async_trait] +impl AcceptQuicServer for HttpProxyServer { + async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) { + let client_addr = cc_info.client_addr(); + self.server_stats.add_conn(client_addr); + if self.drop_early(client_addr) { + return; + } + + loop { + // TODO update ctx and quit gracefully + match connection.accept_bi().await { + Ok((send_stream, recv_stream)) => { + self.spawn_quic_stream_task(send_stream, recv_stream, cc_info.clone()) + } + Err(e) => { + debug!( + "{} - {} quic connection error: {e:?}", + cc_info.sock_local_addr(), + cc_info.sock_peer_addr() + ); + break; + } + } + } + } + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for HttpProxyServer { fn escaper(&self) -> &MetricsName { @@ -435,29 +470,4 @@ impl Server for HttpProxyServer { self.spawn_stream_task(stream, cc_info).await; } - - async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) { - let client_addr = cc_info.client_addr(); - self.server_stats.add_conn(client_addr); - if self.drop_early(client_addr) { - return; - } - - loop { - // TODO update ctx and quit gracefully - match connection.accept_bi().await { - Ok((send_stream, recv_stream)) => { - self.spawn_quic_stream_task(send_stream, recv_stream, cc_info.clone()) - } - Err(e) => { - debug!( - "{} - {} quic connection error: {e:?}", - cc_info.sock_local_addr(), - cc_info.sock_peer_addr() - ); - break; - } - } - } - } } diff --git a/g3proxy/src/serve/http_rproxy/server.rs b/g3proxy/src/serve/http_rproxy/server.rs index 4e235d7c8..774c13266 100644 --- a/g3proxy/src/serve/http_rproxy/server.rs +++ b/g3proxy/src/serve/http_rproxy/server.rs @@ -31,7 +31,10 @@ use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; use tokio_rustls::LazyConfigAcceptor; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; @@ -402,6 +405,15 @@ impl AcceptTcpServer for HttpRProxyServer { } } +#[async_trait] +impl AcceptQuicServer for HttpRProxyServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for HttpRProxyServer { fn escaper(&self) -> &MetricsName { @@ -452,6 +464,4 @@ impl Server for HttpRProxyServer { self.spawn_stream_task(stream, cc_info).await; } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/intelli_proxy/server.rs b/g3proxy/src/serve/intelli_proxy/server.rs index 48fbe3d1d..d2f6f6b83 100644 --- a/g3proxy/src/serve/intelli_proxy/server.rs +++ b/g3proxy/src/serve/intelli_proxy/server.rs @@ -26,7 +26,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; @@ -266,6 +269,15 @@ impl AcceptTcpServer for IntelliProxy { } } +#[async_trait] +impl AcceptQuicServer for IntelliProxy { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for IntelliProxy { fn escaper(&self) -> &MetricsName { @@ -302,6 +314,4 @@ impl Server for IntelliProxy { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/mod.rs b/g3proxy/src/serve/mod.rs index 0e1013e2d..7ea827d2a 100644 --- a/g3proxy/src/serve/mod.rs +++ b/g3proxy/src/serve/mod.rs @@ -17,13 +17,12 @@ use std::sync::Arc; use async_trait::async_trait; -use quinn::Connection; use tokio::net::TcpStream; use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ListenStats}; +use g3_daemon::listen::{AcceptQuicServer, AcceptTcpServer, ListenStats}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerQuitPolicy, ServerReloadCommand}; use g3_types::metrics::MetricsName; @@ -35,9 +34,6 @@ pub(crate) use registry::{foreach_online as foreach_server, get_names, get_or_in mod idle_check; pub(crate) use idle_check::ServerIdleChecker; -mod runtime; -use runtime::ListenQuicRuntime; - mod dummy_close; mod intelli_proxy; mod native_tls_port; @@ -90,7 +86,9 @@ pub(crate) trait ServerInternal { } #[async_trait] -pub(crate) trait Server: ServerInternal + BaseServer + AcceptTcpServer { +pub(crate) trait Server: + ServerInternal + BaseServer + AcceptTcpServer + AcceptQuicServer +{ fn escaper(&self) -> &MetricsName; fn user_group(&self) -> &MetricsName; fn auditor(&self) -> &MetricsName; @@ -106,8 +104,6 @@ pub(crate) trait Server: ServerInternal + BaseServer + AcceptTcpServer { async fn run_rustls_task(&self, stream: TlsStream, cc_info: ClientConnectionInfo); async fn run_openssl_task(&self, stream: SslStream, cc_info: ClientConnectionInfo); - - async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo); } pub(crate) type ArcServer = Arc; diff --git a/g3proxy/src/serve/native_tls_port/mod.rs b/g3proxy/src/serve/native_tls_port/mod.rs index 720d46256..7254d6553 100644 --- a/g3proxy/src/serve/native_tls_port/mod.rs +++ b/g3proxy/src/serve/native_tls_port/mod.rs @@ -29,7 +29,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; @@ -282,6 +285,15 @@ impl AcceptTcpServer for NativeTlsPort { } } +#[async_trait] +impl AcceptQuicServer for NativeTlsPort { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for NativeTlsPort { fn escaper(&self) -> &MetricsName { @@ -318,6 +330,4 @@ impl Server for NativeTlsPort { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/plain_quic_port/mod.rs b/g3proxy/src/serve/plain_quic_port/mod.rs index c167461de..2b50d69ec 100644 --- a/g3proxy/src/serve/plain_quic_port/mod.rs +++ b/g3proxy/src/serve/plain_quic_port/mod.rs @@ -26,7 +26,10 @@ use tokio::sync::{broadcast, watch}; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenQuicConf}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenQuicConf, + ListenQuicRuntime, ListenStats, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::AclNetworkRule; use g3_types::metrics::MetricsName; @@ -34,7 +37,7 @@ use g3_types::net::UdpListenConfig; use crate::config::server::plain_quic_port::{PlainQuicPortConfig, PlainQuicPortUpdateFlags}; use crate::config::server::{AnyServerConfig, ServerConfig}; -use crate::serve::{ArcServer, ListenQuicRuntime, Server, ServerInternal, ServerQuitPolicy}; +use crate::serve::{ArcServer, Server, ServerInternal, ServerQuitPolicy}; #[derive(Clone)] struct PlainQuicPortAuxConfig { @@ -229,7 +232,11 @@ impl ServerInternal for PlainQuicPort { fn _start_runtime(&self, server: &ArcServer) -> anyhow::Result<()> { let config = self.config.load(); - let runtime = ListenQuicRuntime::new(server, config.as_ref(), config.listen.clone()); + let runtime = ListenQuicRuntime::new( + server.clone(), + server.get_listen_stats(), + config.listen.clone(), + ); runtime.run_all_instances( config.listen_in_worker, &self.quinn_config, @@ -269,6 +276,18 @@ impl AcceptTcpServer for PlainQuicPort { } } +#[async_trait] +impl AcceptQuicServer for PlainQuicPort { + async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) { + let next_server = self.next_server.load().as_ref().clone(); + next_server.run_quic_task(connection, cc_info).await + } + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(&self.name) + } +} + #[async_trait] impl Server for PlainQuicPort { fn escaper(&self) -> &MetricsName { @@ -305,9 +324,4 @@ impl Server for PlainQuicPort { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) { - let next_server = self.next_server.load().as_ref().clone(); - next_server.run_quic_task(connection, cc_info).await; - } } diff --git a/g3proxy/src/serve/plain_tcp_port/mod.rs b/g3proxy/src/serve/plain_tcp_port/mod.rs index 53c89254f..f1189f5a5 100644 --- a/g3proxy/src/serve/plain_tcp_port/mod.rs +++ b/g3proxy/src/serve/plain_tcp_port/mod.rs @@ -26,7 +26,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; @@ -232,6 +235,15 @@ impl AcceptTcpServer for PlainTcpPort { } } +#[async_trait] +impl AcceptQuicServer for PlainTcpPort { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for PlainTcpPort { fn escaper(&self) -> &MetricsName { @@ -268,6 +280,4 @@ impl Server for PlainTcpPort { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/plain_tls_port/mod.rs b/g3proxy/src/serve/plain_tls_port/mod.rs index f5ca14be1..d6af042b8 100644 --- a/g3proxy/src/serve/plain_tls_port/mod.rs +++ b/g3proxy/src/serve/plain_tls_port/mod.rs @@ -28,7 +28,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::{server::TlsStream, TlsAcceptor}; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; @@ -270,6 +273,15 @@ impl AcceptTcpServer for PlainTlsPort { } } +#[async_trait] +impl AcceptQuicServer for PlainTlsPort { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for PlainTlsPort { fn escaper(&self) -> &MetricsName { @@ -306,6 +318,4 @@ impl Server for PlainTlsPort { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/runtime/listen_quic.rs b/g3proxy/src/serve/runtime/listen_quic.rs deleted file mode 100644 index a1f21ff74..000000000 --- a/g3proxy/src/serve/runtime/listen_quic.rs +++ /dev/null @@ -1,463 +0,0 @@ -/* - * Copyright 2023 ByteDance and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::io; -use std::net::{SocketAddr, UdpSocket}; -use std::os::fd::{AsRawFd, RawFd}; -use std::sync::Arc; -use std::time::Duration; - -use log::{info, warn}; -use quinn::{Connecting, Endpoint}; -use tokio::runtime::Handle; -use tokio::sync::{broadcast, watch}; - -use g3_daemon::listen::{ListenQuicConf, ListenStats}; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; -use g3_socket::util::native_socket_addr; -use g3_types::acl::AclAction; -use g3_types::net::UdpListenConfig; - -use crate::config::server::ServerConfig; -use crate::serve::ArcServer; - -#[derive(Clone)] -pub(crate) struct ListenQuicRuntime { - server: ArcServer, - server_type: &'static str, - server_version: usize, - worker_id: Option, - listen_config: UdpListenConfig, - listen_stats: Arc, - instance_id: usize, -} - -impl ListenQuicRuntime { - pub(crate) fn new( - server: &ArcServer, - server_config: &C, - listen_config: UdpListenConfig, - ) -> Self { - ListenQuicRuntime { - server: Arc::clone(server), - server_type: server_config.server_type(), - server_version: server.version(), - worker_id: None, - listen_config, - listen_stats: server.get_listen_stats(), - instance_id: 0, - } - } - - fn pre_start(&self) { - info!( - "started {} SRT[{}_v{}#{}]", - self.server_type, - self.server.name(), - self.server_version, - self.instance_id, - ); - self.listen_stats.add_running_runtime(); - } - - fn pre_stop(&self) { - info!( - "stopping {} SRT[{}_v{}#{}]", - self.server_type, - self.server.name(), - self.server_version, - self.instance_id, - ); - } - - fn post_stop(&self) { - info!( - "stopped {} SRT[{}_v{}#{}]", - self.server_type, - self.server.name(), - self.server_version, - self.instance_id, - ); - self.listen_stats.del_running_runtime(); - } - - async fn run( - mut self, - listener: Endpoint, - mut listen_addr: SocketAddr, - mut sock_raw_fd: RawFd, - mut server_reload_channel: broadcast::Receiver, - mut quic_cfg_receiver: watch::Receiver, - ) where - C: ListenQuicConf + Send + Clone + 'static, - { - use broadcast::error::RecvError; - - let mut aux_config = quic_cfg_receiver.borrow().clone(); - - loop { - tokio::select! { - biased; - - ev = server_reload_channel.recv() => { - match ev { - Ok(ServerReloadCommand::ReloadVersion(version)) => { - info!("SRT[{}_v{}#{}] received reload request from v{version}", - self.server.name(), self.server_version, self.instance_id); - match crate::serve::get_server(self.server.name()) { - Ok(server) => { - self.server_version = server.version(); - self.server = server; - continue; - } - Err(_) => { - info!("SRT[{}_v{}#{}] will quit as no server v{version}+ found", - self.server.name(), self.server_version, self.instance_id); - } - } - } - Ok(ServerReloadCommand::QuitRuntime) => {}, - Err(RecvError::Closed) => {}, - Err(RecvError::Lagged(dropped)) => { - warn!("SRT[{}_v{}#{}] server {} reload notify channel overflowed, {dropped} msg dropped", - self.server.name(), self.server_version, self.instance_id, self.server.name()); - continue; - }, - } - - info!("SRT[{}_v{}#{}] will go offline", - self.server.name(), self.server_version, self.instance_id); - self.pre_stop(); - self.goto_offline(listener, listen_addr, aux_config.offline_rebind_port()); - break; - } - ev = quic_cfg_receiver.changed() => { - if ev.is_err() { - warn!("SRT[{}_v{}#{}] quit as quic cfg channel closed", - self.server.name(), self.server_version, self.instance_id); - self.goto_close(listener); - break; - } - aux_config = quic_cfg_receiver.borrow().clone(); - if let Some(quinn_config) = aux_config.take_quinn_config() { - listener.set_server_config(Some(quinn_config)); - } - if let Some(listen_config) = aux_config.take_udp_listen_config() { - self.listen_config = listen_config; - if self.listen_config.address() != listen_addr { - if let Ok((fd, addr)) = self.rebind_socket(&listener) { - sock_raw_fd = fd; - listen_addr = addr; - } - } else { - self.update_socket_opts(sock_raw_fd); - } - } - } - result = listener.accept() => { - let Some(connecting) = result else { - continue; - }; - self.listen_stats.add_accepted(); - self.run_task(connecting, listen_addr, &aux_config); - } - } - } - self.post_stop(); - } - - fn run_task(&self, connecting: Connecting, listen_addr: SocketAddr, aux_config: &C) - where - C: ListenQuicConf + Send + Clone + 'static, - { - let peer_addr = connecting.remote_address(); - if let Some(filter) = aux_config.ingress_network_acl() { - let (_, action) = filter.check(peer_addr.ip()); - match action { - AclAction::Permit | AclAction::PermitAndLog => {} - AclAction::Forbid | AclAction::ForbidAndLog => { - self.listen_stats.add_dropped(); - return; - } - } - } - - let local_addr = connecting - .local_ip() - .map(|ip| SocketAddr::new(ip, listen_addr.port())) - .unwrap_or(listen_addr); - let mut cc_info = ClientConnectionInfo::new( - native_socket_addr(peer_addr), - native_socket_addr(local_addr), - ); - - let server = self.server.clone(); - let listen_stats = self.listen_stats.clone(); - let accept_timeout = aux_config.accept_timeout(); - if let Some(worker_id) = self.worker_id { - cc_info.set_worker_id(Some(worker_id)); - tokio::spawn(async move { - Self::accept_connection_and_run( - server, - connecting, - cc_info, - accept_timeout, - listen_stats, - ) - .await - }); - } else if let Some(rt) = g3_daemon::runtime::worker::select_handle() { - cc_info.set_worker_id(Some(rt.id)); - rt.handle.spawn(async move { - Self::accept_connection_and_run( - server, - connecting, - cc_info, - accept_timeout, - listen_stats, - ) - .await - }); - } else { - tokio::spawn(async move { - Self::accept_connection_and_run( - server, - connecting, - cc_info, - accept_timeout, - listen_stats, - ) - .await - }); - } - } - - async fn accept_connection_and_run( - server: ArcServer, - connecting: Connecting, - cc_info: ClientConnectionInfo, - timeout: Duration, - listen_stats: Arc, - ) { - match tokio::time::timeout(timeout, connecting).await { - Ok(Ok(c)) => { - listen_stats.add_accepted(); - server.run_quic_task(c, cc_info).await; - } - Ok(Err(_e)) => { - listen_stats.add_failed(); - // TODO may be attack - } - Err(_) => { - listen_stats.add_failed(); - // TODO may be attack - } - } - } - - fn update_socket_opts(&self, raw_fd: RawFd) { - if let Err(e) = g3_socket::udp::set_raw_opts(raw_fd, self.listen_config.socket_misc_opts()) - { - warn!( - "SRT[{}_v{}#{}] update socket misc opts failed: {e}", - self.server.name(), - self.server_version, - self.instance_id, - ); - } - if let Err(e) = g3_socket::udp::set_raw_buf_opts(raw_fd, self.listen_config.socket_buffer()) - { - warn!( - "SRT[{}_v{}#{}] update socket buf opts failed: {e}", - self.server.name(), - self.server_version, - self.instance_id, - ); - } - } - - fn rebind_socket(&self, listener: &Endpoint) -> io::Result<(RawFd, SocketAddr)> { - match g3_socket::udp::new_std_bind_listen(&self.listen_config) { - Ok(socket) => { - let raw_fd = socket.as_raw_fd(); - match listener.rebind(socket) { - Ok(_) => Ok((raw_fd, listener.local_addr().unwrap())), - Err(e) => { - warn!( - "SRT[{}_v{}#{}] reload rebind {} failed: {e}", - self.server.name(), - self.server_version, - self.instance_id, - self.listen_config.address() - ); - Err(e) - } - } - } - Err(e) => { - warn!( - "SRT[{}_v{}#{}] reload create new socket {} failed: {e}", - self.server.name(), - self.server_version, - self.instance_id, - self.listen_config.address() - ); - Err(e) - } - } - } - - fn goto_offline(&self, listener: Endpoint, listen_addr: SocketAddr, rebind_port: Option) { - if let Some(port) = rebind_port { - let rebind_addr = SocketAddr::new(listen_addr.ip(), port); - match g3_socket::udp::new_std_rebind_listen( - &self.listen_config, - SocketAddr::new(listen_addr.ip(), port), - ) { - Ok(socket) => match listener.rebind(socket) { - Ok(_) => { - info!( - "SRT[{}_v{}#{}] re-bound to: {rebind_addr}", - self.server.name(), - self.server_version, - self.instance_id - ); - listener.reject_new_connections(); - tokio::spawn(async move { listener.wait_idle().await }); - return; - } - Err(e) => { - warn!( - "SRT[{}_v{}#{}] rebind failed: {e}", - self.server.name(), - self.server_version, - self.instance_id - ); - } - }, - Err(e) => { - warn!( - "SRT[{}_v{}#{}] create rebind socket failed: {e}", - self.server.name(), - self.server_version, - self.instance_id - ); - } - } - } - self.goto_close(listener); - } - - fn goto_close(&self, listener: Endpoint) { - info!( - "SRT[{}_v{}#{}] will close all quic connections immediately", - self.server.name(), - self.server_version, - self.instance_id - ); - listener.close(quinn::VarInt::default(), b"close as server shutdown"); - } - - fn get_rt_handle(&mut self, listen_in_worker: bool) -> Handle { - if listen_in_worker { - if let Some(rt) = g3_daemon::runtime::worker::select_listen_handle() { - self.worker_id = Some(rt.id); - return rt.handle; - } - } - Handle::current() - } - - fn into_running( - mut self, - socket: UdpSocket, - listen_addr: SocketAddr, - config: quinn::ServerConfig, - listen_in_worker: bool, - server_reload_channel: broadcast::Receiver, - quic_cfg_receiver: watch::Receiver, - ) where - C: ListenQuicConf + Clone + Send + Sync + 'static, - { - let handle = self.get_rt_handle(listen_in_worker); - handle.spawn(async move { - let sock_raw_fd = socket.as_raw_fd(); - // make sure the listen socket associated with the correct reactor - match Endpoint::new( - Default::default(), - Some(config), - socket, - Arc::new(quinn::TokioRuntime), - ) { - Ok(endpoint) => { - self.pre_start(); - self.run( - endpoint, - listen_addr, - sock_raw_fd, - server_reload_channel, - quic_cfg_receiver, - ) - .await; - } - Err(e) => { - warn!( - "SRT[{}_v{}#{}] listen async: {e:?}", - self.server.name(), - self.server_version, - self.instance_id - ); - } - } - }); - } - - pub(crate) fn run_all_instances( - &self, - listen_in_worker: bool, - quic_config: &quinn::ServerConfig, - server_reload_sender: &broadcast::Sender, - quic_cfg_receiver: &watch::Sender, - ) -> anyhow::Result<()> - where - C: ListenQuicConf + Clone + Send + Sync + 'static, - { - let mut instance_count = self.listen_config.instance(); - if listen_in_worker { - let worker_count = g3_daemon::runtime::worker::worker_count(); - if worker_count > 0 { - instance_count = worker_count; - } - } - - for i in 0..instance_count { - let mut runtime = self.clone(); - runtime.instance_id = i; - - let socket = g3_socket::udp::new_std_bind_listen(&self.listen_config)?; - let listen_addr = socket.local_addr()?; - runtime.into_running( - socket, - listen_addr, - quic_config.clone(), - listen_in_worker, - server_reload_sender.subscribe(), - quic_cfg_receiver.subscribe(), - ); - } - Ok(()) - } -} diff --git a/g3proxy/src/serve/runtime/mod.rs b/g3proxy/src/serve/runtime/mod.rs deleted file mode 100644 index daf2fac24..000000000 --- a/g3proxy/src/serve/runtime/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright 2023 ByteDance and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -mod listen_quic; -pub(crate) use listen_quic::ListenQuicRuntime; diff --git a/g3proxy/src/serve/sni_proxy/server.rs b/g3proxy/src/serve/sni_proxy/server.rs index fda18502d..896818c14 100644 --- a/g3proxy/src/serve/sni_proxy/server.rs +++ b/g3proxy/src/serve/sni_proxy/server.rs @@ -27,7 +27,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_dpi::ProtocolPortMap; use g3_types::acl::{AclAction, AclNetworkRule}; @@ -258,6 +261,15 @@ impl AcceptTcpServer for SniProxyServer { } } +#[async_trait] +impl AcceptQuicServer for SniProxyServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for SniProxyServer { fn escaper(&self) -> &MetricsName { @@ -298,6 +310,4 @@ impl Server for SniProxyServer { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/socks_proxy/server.rs b/g3proxy/src/serve/socks_proxy/server.rs index fa69a7077..3a4765ba8 100644 --- a/g3proxy/src/serve/socks_proxy/server.rs +++ b/g3proxy/src/serve/socks_proxy/server.rs @@ -27,7 +27,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::acl_set::AclDstHostRuleSet; @@ -267,6 +270,15 @@ impl AcceptTcpServer for SocksProxyServer { } } +#[async_trait] +impl AcceptQuicServer for SocksProxyServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for SocksProxyServer { fn escaper(&self) -> &MetricsName { @@ -307,6 +319,4 @@ impl Server for SocksProxyServer { self.server_stats.add_conn(cc_info.client_addr()); self.listen_stats.add_dropped(); } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/tcp_stream/server.rs b/g3proxy/src/serve/tcp_stream/server.rs index 531d45b08..5cf11cad5 100644 --- a/g3proxy/src/serve/tcp_stream/server.rs +++ b/g3proxy/src/serve/tcp_stream/server.rs @@ -29,7 +29,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::collection::{SelectivePickPolicy, SelectiveVec, SelectiveVecBuilder}; @@ -332,6 +335,38 @@ impl AcceptTcpServer for TcpStreamServer { } } +#[async_trait] +impl AcceptQuicServer for TcpStreamServer { + async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) { + let client_addr = cc_info.client_addr(); + self.server_stats.add_conn(client_addr); + if self.drop_early(client_addr) { + return; + } + + loop { + // TODO update ctx and quit gracefully + match connection.accept_bi().await { + Ok((send_stream, recv_stream)) => { + self.run_task_with_quic_stream(send_stream, recv_stream, cc_info.clone()) + } + Err(e) => { + debug!( + "{} - {} quic connection error: {e:?}", + cc_info.sock_local_addr(), + cc_info.sock_peer_addr() + ); + break; + } + } + } + } + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for TcpStreamServer { fn escaper(&self) -> &MetricsName { @@ -382,29 +417,4 @@ impl Server for TcpStreamServer { self.run_task_with_stream(stream, cc_info).await } - - async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) { - let client_addr = cc_info.client_addr(); - self.server_stats.add_conn(client_addr); - if self.drop_early(client_addr) { - return; - } - - loop { - // TODO update ctx and quit gracefully - match connection.accept_bi().await { - Ok((send_stream, recv_stream)) => { - self.run_task_with_quic_stream(send_stream, recv_stream, cc_info.clone()) - } - Err(e) => { - debug!( - "{} - {} quic connection error: {e:?}", - cc_info.sock_local_addr(), - cc_info.sock_peer_addr() - ); - break; - } - } - } - } } diff --git a/g3proxy/src/serve/tls_stream/server.rs b/g3proxy/src/serve/tls_stream/server.rs index a12e96f17..caa6a2297 100644 --- a/g3proxy/src/serve/tls_stream/server.rs +++ b/g3proxy/src/serve/tls_stream/server.rs @@ -29,7 +29,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::{server::TlsStream, TlsAcceptor}; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::collection::{SelectivePickPolicy, SelectiveVec, SelectiveVecBuilder}; @@ -330,6 +333,15 @@ impl AcceptTcpServer for TlsStreamServer { } } +#[async_trait] +impl AcceptQuicServer for TlsStreamServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for TlsStreamServer { fn escaper(&self) -> &MetricsName { @@ -370,6 +382,4 @@ impl Server for TlsStreamServer { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3tiles/Cargo.toml b/g3tiles/Cargo.toml index 5c8454ce4..936f602fa 100644 --- a/g3tiles/Cargo.toml +++ b/g3tiles/Cargo.toml @@ -32,12 +32,13 @@ openssl.workspace = true openssl-probe = { workspace = true, optional = true } tokio-openssl.workspace = true rustls.workspace = true +quinn.workspace = true tokio-rustls.workspace = true governor = { workspace = true, features = ["std", "jitter"] } chrono = { workspace = true, features = ["clock"] } uuid.workspace = true g3-compat.workspace = true -g3-daemon.workspace = true +g3-daemon = { workspace = true, features = ["quic"] } g3-signal.workspace = true g3-yaml = { workspace = true, features = ["acl-rule", "route", "openssl", "rustls"] } g3-types = { workspace = true, features = ["acl-rule", "route", "openssl", "rustls"] } diff --git a/g3tiles/src/serve/dummy_close/server.rs b/g3tiles/src/serve/dummy_close/server.rs index 2e2720eb4..3720c0d75 100644 --- a/g3tiles/src/serve/dummy_close/server.rs +++ b/g3tiles/src/serve/dummy_close/server.rs @@ -18,10 +18,13 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; +use quinn::Connection; use tokio::net::TcpStream; use tokio::sync::broadcast; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::metrics::MetricsName; @@ -140,6 +143,15 @@ impl AcceptTcpServer for DummyCloseServer { } } +#[async_trait] +impl AcceptQuicServer for DummyCloseServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for DummyCloseServer { fn get_listen_stats(&self) -> Arc { diff --git a/g3tiles/src/serve/mod.rs b/g3tiles/src/serve/mod.rs index 8c94bc3a5..38457f806 100644 --- a/g3tiles/src/serve/mod.rs +++ b/g3tiles/src/serve/mod.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use async_trait::async_trait; use tokio::sync::broadcast; -use g3_daemon::listen::{AcceptTcpServer, ListenStats}; +use g3_daemon::listen::{AcceptQuicServer, AcceptTcpServer, ListenStats}; use g3_daemon::server::{BaseServer, ServerQuitPolicy, ServerReloadCommand}; use g3_types::metrics::MetricsName; @@ -66,7 +66,9 @@ pub(crate) trait ServerInternal { } #[async_trait] -pub(crate) trait Server: ServerInternal + BaseServer + AcceptTcpServer { +pub(crate) trait Server: + ServerInternal + BaseServer + AcceptTcpServer + AcceptQuicServer +{ fn get_server_stats(&self) -> Option { None } diff --git a/g3tiles/src/serve/openssl_proxy/server.rs b/g3tiles/src/serve/openssl_proxy/server.rs index c853dbfc7..62ade2b2f 100644 --- a/g3tiles/src/serve/openssl_proxy/server.rs +++ b/g3tiles/src/serve/openssl_proxy/server.rs @@ -23,11 +23,15 @@ use async_trait::async_trait; use log::warn; use openssl::ex_data::Index; use openssl::ssl::{Ssl, SslContext}; +use quinn::Connection; use slog::Logger; use tokio::net::TcpStream; use tokio::sync::broadcast; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; @@ -342,6 +346,15 @@ impl AcceptTcpServer for OpensslProxyServer { } } +#[async_trait] +impl AcceptQuicServer for OpensslProxyServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for OpensslProxyServer { fn get_server_stats(&self) -> Option { diff --git a/g3tiles/src/serve/plain_tcp_port/mod.rs b/g3tiles/src/serve/plain_tcp_port/mod.rs index 6f624d29a..044861d4e 100644 --- a/g3tiles/src/serve/plain_tcp_port/mod.rs +++ b/g3tiles/src/serve/plain_tcp_port/mod.rs @@ -20,10 +20,14 @@ use std::sync::Arc; use anyhow::anyhow; use arc_swap::ArcSwap; use async_trait::async_trait; +use quinn::Connection; use tokio::net::TcpStream; use tokio::sync::broadcast; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; @@ -223,6 +227,15 @@ impl AcceptTcpServer for PlainTcpPort { } } +#[async_trait] +impl AcceptQuicServer for PlainTcpPort { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for PlainTcpPort { fn get_listen_stats(&self) -> Arc { diff --git a/g3tiles/src/serve/rustls_proxy/server.rs b/g3tiles/src/serve/rustls_proxy/server.rs index b6d333def..f372a3d44 100644 --- a/g3tiles/src/serve/rustls_proxy/server.rs +++ b/g3tiles/src/serve/rustls_proxy/server.rs @@ -20,11 +20,15 @@ use std::sync::Arc; use ahash::AHashMap; use anyhow::anyhow; use async_trait::async_trait; +use quinn::Connection; use slog::Logger; use tokio::net::TcpStream; use tokio::sync::broadcast; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; @@ -249,6 +253,15 @@ impl AcceptTcpServer for RustlsProxyServer { } } +#[async_trait] +impl AcceptQuicServer for RustlsProxyServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for RustlsProxyServer { fn get_server_stats(&self) -> Option {