From fc3b047a03c66913149ae00c382482ab15eaf8b1 Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 16 Oct 2023 10:33:27 +0800 Subject: [PATCH 1/8] g3bench: use native async fn in trait --- Cargo.lock | 1 - g3bench/Cargo.toml | 1 - g3bench/src/target/dns/task.rs | 2 -- g3bench/src/target/h1/task.rs | 2 -- g3bench/src/target/h2/task.rs | 2 -- g3bench/src/target/h3/task.rs | 2 -- g3bench/src/target/keyless/cloudflare/task.rs | 2 -- g3bench/src/target/keyless/openssl/task.rs | 2 -- g3bench/src/target/mod.rs | 11 ++++++++--- g3bench/src/target/ssl/task.rs | 2 -- 10 files changed, 8 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 76f7df900..b052bf0db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1343,7 +1343,6 @@ version = "0.8.5" dependencies = [ "ahash", "anyhow", - "async-trait", "atomic-waker", "bytes", "cadence-with-flush", diff --git a/g3bench/Cargo.toml b/g3bench/Cargo.toml index 4516890e3..48f5a7a40 100644 --- a/g3bench/Cargo.toml +++ b/g3bench/Cargo.toml @@ -13,7 +13,6 @@ thiserror.workspace = true anyhow.workspace = true clap.workspace = true clap_complete.workspace = true -async-trait.workspace = true indicatif = "0.17" tokio = { workspace = true, features = ["rt", "net", "macros"] } http.workspace = true diff --git a/g3bench/src/target/dns/task.rs b/g3bench/src/target/dns/task.rs index d6493c206..18d3d3012 100644 --- a/g3bench/src/target/dns/task.rs +++ b/g3bench/src/target/dns/task.rs @@ -18,7 +18,6 @@ use std::cell::UnsafeCell; use std::sync::Arc; use anyhow::{anyhow, Context}; -use async_trait::async_trait; use hickory_client::client::{AsyncClient, ClientHandle}; use hickory_proto::op::ResponseCode; use tokio::time::Instant; @@ -133,7 +132,6 @@ impl DnsTaskContext { } } -#[async_trait] impl BenchTaskContext for DnsTaskContext { fn mark_task_start(&self) { self.runtime_stats.add_task_total(); diff --git a/g3bench/src/target/h1/task.rs b/g3bench/src/target/h1/task.rs index 35f20b7fe..53fce5a8b 100644 --- a/g3bench/src/target/h1/task.rs +++ b/g3bench/src/target/h1/task.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context}; -use async_trait::async_trait; use futures_util::FutureExt; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::time::Instant; @@ -183,7 +182,6 @@ impl HttpTaskContext { } } -#[async_trait] impl BenchTaskContext for HttpTaskContext { fn mark_task_start(&self) { self.runtime_stats.add_task_total(); diff --git a/g3bench/src/target/h2/task.rs b/g3bench/src/target/h2/task.rs index 51f3eb9da..828f49e95 100644 --- a/g3bench/src/target/h2/task.rs +++ b/g3bench/src/target/h2/task.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use anyhow::{anyhow, Context}; -use async_trait::async_trait; use bytes::Bytes; use h2::client::SendRequest; use tokio::time::Instant; @@ -180,7 +179,6 @@ impl H2TaskContext { } } -#[async_trait] impl BenchTaskContext for H2TaskContext { fn mark_task_start(&self) { self.runtime_stats.add_task_total(); diff --git a/g3bench/src/target/h3/task.rs b/g3bench/src/target/h3/task.rs index 720094ced..707af2273 100644 --- a/g3bench/src/target/h3/task.rs +++ b/g3bench/src/target/h3/task.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use anyhow::{anyhow, Context}; -use async_trait::async_trait; use bytes::Bytes; use h3::client::SendRequest; use h3_quinn::OpenStreams; @@ -167,7 +166,6 @@ impl H3TaskContext { } } -#[async_trait] impl BenchTaskContext for H3TaskContext { fn mark_task_start(&self) { self.runtime_stats.add_task_total(); diff --git a/g3bench/src/target/keyless/cloudflare/task.rs b/g3bench/src/target/keyless/cloudflare/task.rs index 65dc1785a..a396eb7c7 100644 --- a/g3bench/src/target/keyless/cloudflare/task.rs +++ b/g3bench/src/target/keyless/cloudflare/task.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use anyhow::anyhow; -use async_trait::async_trait; use tokio::time::Instant; use super::{ @@ -179,7 +178,6 @@ impl KeylessCloudflareTaskContext { } } -#[async_trait] impl BenchTaskContext for KeylessCloudflareTaskContext { fn mark_task_start(&self) { self.runtime_stats.add_task_total(); diff --git a/g3bench/src/target/keyless/openssl/task.rs b/g3bench/src/target/keyless/openssl/task.rs index 152abb122..b73bff609 100644 --- a/g3bench/src/target/keyless/openssl/task.rs +++ b/g3bench/src/target/keyless/openssl/task.rs @@ -16,7 +16,6 @@ use std::sync::Arc; -use async_trait::async_trait; use tokio::time::Instant; use super::{ @@ -57,7 +56,6 @@ impl KeylessOpensslTaskContext { } } -#[async_trait] impl BenchTaskContext for KeylessOpensslTaskContext { fn mark_task_start(&self) { self.runtime_stats.add_task_total(); diff --git a/g3bench/src/target/mod.rs b/g3bench/src/target/mod.rs index 7a2068ecd..ce65c22c0 100644 --- a/g3bench/src/target/mod.rs +++ b/g3bench/src/target/mod.rs @@ -14,12 +14,12 @@ * limitations under the License. */ +use std::future::Future; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context}; -use async_trait::async_trait; use cadence::{Gauged, StatsdClient}; use hdrhistogram::Histogram; use tokio::signal::unix::SignalKind; @@ -173,12 +173,17 @@ enum BenchError { Task(anyhow::Error), } -#[async_trait] trait BenchTaskContext { fn mark_task_start(&self); fn mark_task_passed(&self); fn mark_task_failed(&self); - async fn run(&mut self, task_id: usize, time_started: Instant) -> Result<(), BenchError>; + + // TODO use native async fn declaration + fn run( + &mut self, + task_id: usize, + time_started: Instant, + ) -> impl Future> + Send; } trait BenchTarget diff --git a/g3bench/src/target/ssl/task.rs b/g3bench/src/target/ssl/task.rs index d728c0ae6..34b1cfee1 100644 --- a/g3bench/src/target/ssl/task.rs +++ b/g3bench/src/target/ssl/task.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; -use async_trait::async_trait; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; use tokio::time::Instant; @@ -76,7 +75,6 @@ impl SslTaskContext { } } -#[async_trait] impl BenchTaskContext for SslTaskContext { fn mark_task_start(&self) { self.runtime_stats.add_task_total(); From f104386ba1e404a2e55b14afe0fd1a980723505a Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 16 Oct 2023 10:46:19 +0800 Subject: [PATCH 2/8] g3keymess: use native async fn in trait --- Cargo.lock | 1 - g3keymess/Cargo.toml | 1 - g3keymess/src/config/store/local.rs | 2 -- g3keymess/src/config/store/mod.rs | 2 -- g3keymess/src/config/store/redis.rs | 2 -- 5 files changed, 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b052bf0db..491f93b19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1419,7 +1419,6 @@ dependencies = [ "anyhow", "arc-swap", "ascii", - "async-trait", "cadence-with-flush", "capnp", "capnp-rpc", diff --git a/g3keymess/Cargo.toml b/g3keymess/Cargo.toml index 8f43c1917..e60358994 100644 --- a/g3keymess/Cargo.toml +++ b/g3keymess/Cargo.toml @@ -26,7 +26,6 @@ chrono = { workspace = true, features = ["clock"] } uuid.workspace = true url.workspace = true ahash.workspace = true -async-trait.workspace = true futures-util.workspace = true cadence.workspace = true itoa.workspace = true diff --git a/g3keymess/src/config/store/local.rs b/g3keymess/src/config/store/local.rs index a7357e8d5..9b89464d8 100644 --- a/g3keymess/src/config/store/local.rs +++ b/g3keymess/src/config/store/local.rs @@ -17,7 +17,6 @@ use std::path::{Path, PathBuf}; use anyhow::anyhow; -use async_trait::async_trait; use openssl::pkey::{PKey, Private}; use tokio::sync::oneshot; use yaml_rust::{yaml, Yaml}; @@ -88,7 +87,6 @@ impl LocalKeyStoreConfig { } } -#[async_trait] impl KeyStoreConfig for LocalKeyStoreConfig { #[inline] fn name(&self) -> &MetricsName { diff --git a/g3keymess/src/config/store/mod.rs b/g3keymess/src/config/store/mod.rs index 198f93e19..eadf49bd1 100644 --- a/g3keymess/src/config/store/mod.rs +++ b/g3keymess/src/config/store/mod.rs @@ -17,7 +17,6 @@ use std::path::Path; use anyhow::anyhow; -use async_trait::async_trait; use openssl::pkey::{PKey, Private}; use tokio::sync::oneshot; use yaml_rust::{yaml, Yaml}; @@ -33,7 +32,6 @@ pub(crate) use registry::{clear, get_all}; const CONFIG_KEY_STORE_TYPE: &str = "type"; -#[async_trait] pub trait KeyStoreConfig { fn name(&self) -> &MetricsName; async fn load_certs(&self) -> anyhow::Result>>; diff --git a/g3keymess/src/config/store/redis.rs b/g3keymess/src/config/store/redis.rs index 22cefc533..b27184218 100644 --- a/g3keymess/src/config/store/redis.rs +++ b/g3keymess/src/config/store/redis.rs @@ -15,7 +15,6 @@ */ use anyhow::anyhow; -use async_trait::async_trait; use openssl::pkey::{PKey, Private}; use url::Url; use yaml_rust::{yaml, Yaml}; @@ -80,7 +79,6 @@ impl RedisKeyStoreConfig { } } -#[async_trait] impl KeyStoreConfig for RedisKeyStoreConfig { #[inline] fn name(&self) -> &MetricsName { From 52b6c578d7fd079b183d58c3c099da55c25960ae Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 16 Oct 2023 11:06:47 +0800 Subject: [PATCH 3/8] use more native async fn in trait for libs --- Cargo.lock | 5 ++--- Cargo.toml | 4 ++-- g3proxy/src/inspect/http/v1/forward/adaptation.rs | 2 -- g3proxy/src/module/http_forward/connection/mod.rs | 1 - g3proxy/src/serve/http_proxy/task/ftp/list.rs | 6 ------ g3proxy/utils/ftp/src/cmd_list.rs | 2 -- lib/g3-ftp-client/Cargo.toml | 2 +- lib/g3-ftp-client/src/transfer/line.rs | 3 +-- lib/g3-icap-client/Cargo.toml | 3 +-- lib/g3-icap-client/src/reqmod/h1/mod.rs | 3 +-- lib/g3-icap-client/src/respmod/h1/impl_trait.rs | 2 -- lib/g3-icap-client/src/respmod/h1/mod.rs | 3 +-- 12 files changed, 9 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 491f93b19..3567e05ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -947,7 +947,7 @@ dependencies = [ [[package]] name = "g3-ftp-client" -version = "0.2.0" +version = "0.3.0" dependencies = [ "async-trait", "chrono", @@ -1022,10 +1022,9 @@ dependencies = [ [[package]] name = "g3-icap-client" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", - "async-trait", "atoi", "bytes", "flume", diff --git a/Cargo.toml b/Cargo.toml index cd1ac2648..89930d381 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -193,10 +193,10 @@ g3-ctl = { version = "0.1", path = "lib/g3-ctl" } g3-datetime = { version = "0.1", path = "lib/g3-datetime" } g3-dpi = { version = "0.1", path = "lib/g3-dpi" } g3-fluentd = { version = "0.1", path = "lib/g3-fluentd" } -g3-ftp-client = { version = "0.2", path = "lib/g3-ftp-client" } +g3-ftp-client = { version = "0.3", path = "lib/g3-ftp-client" } g3-h2 = { version = "0.1", path = "lib/g3-h2" } g3-http = { version = "0.2", path = "lib/g3-http" } -g3-icap-client = { version = "0.1", path = "lib/g3-icap-client" } +g3-icap-client = { version = "0.2", path = "lib/g3-icap-client" } g3-io-ext = { version = "0.6", path = "lib/g3-io-ext" } g3-journal = { version = "0.2", path = "lib/g3-journal" } g3-json = { version = "0.3", path = "lib/g3-json" } diff --git a/g3proxy/src/inspect/http/v1/forward/adaptation.rs b/g3proxy/src/inspect/http/v1/forward/adaptation.rs index 137210245..aa9054763 100644 --- a/g3proxy/src/inspect/http/v1/forward/adaptation.rs +++ b/g3proxy/src/inspect/http/v1/forward/adaptation.rs @@ -18,7 +18,6 @@ use std::io::{self, IoSlice}; use std::pin::Pin; use std::task::{Context, Poll}; -use async_trait::async_trait; use tokio::io::{AsyncWrite, AsyncWriteExt}; use g3_http::server::HttpTransparentRequest; @@ -61,7 +60,6 @@ where } } -#[async_trait] impl<'a, W> HttpRequestUpstreamWriter for HttpRequestWriterForAdaptation<'a, W> where diff --git a/g3proxy/src/module/http_forward/connection/mod.rs b/g3proxy/src/module/http_forward/connection/mod.rs index 959be87ba..68a558ef5 100644 --- a/g3proxy/src/module/http_forward/connection/mod.rs +++ b/g3proxy/src/module/http_forward/connection/mod.rs @@ -108,7 +108,6 @@ impl<'a> AsyncWrite for HttpForwardWriterForAdaptation<'a> { } } -#[async_trait] impl<'a> HttpRequestUpstreamWriter for HttpForwardWriterForAdaptation<'a> { async fn send_request_header(&mut self, req: &HttpProxyClientRequest) -> io::Result<()> { self.inner.send_request_header(req).await diff --git a/g3proxy/src/serve/http_proxy/task/ftp/list.rs b/g3proxy/src/serve/http_proxy/task/ftp/list.rs index 4677f990d..4c8f87815 100644 --- a/g3proxy/src/serve/http_proxy/task/ftp/list.rs +++ b/g3proxy/src/serve/http_proxy/task/ftp/list.rs @@ -16,7 +16,6 @@ use std::io::{self, Error, Write}; -use async_trait::async_trait; use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter}; use g3_ftp_client::FtpLineDataReceiver; @@ -24,7 +23,6 @@ use g3_ftp_client::FtpLineDataReceiver; const CHUNKED_BUF_HEAD_RESERVED: usize = (usize::BITS as usize >> 2) + 2; const CHUNKED_BUF_TAIL_RESERVED: usize = 2; -#[async_trait] pub(super) trait ListWriter: FtpLineDataReceiver { fn take_io_error(&mut self) -> Option; async fn flush_buf(&mut self) -> io::Result<()>; @@ -75,7 +73,6 @@ where } } -#[async_trait] impl<'a, W> FtpLineDataReceiver for ChunkedListWriter<'a, W> where W: AsyncWrite + Send + Unpin, @@ -100,7 +97,6 @@ where } } -#[async_trait] impl<'a, W> ListWriter for ChunkedListWriter<'a, W> where W: AsyncWrite + Send + Unpin, @@ -153,7 +149,6 @@ where } } -#[async_trait] impl<'a, W> FtpLineDataReceiver for EndingListWriter<'a, W> where W: AsyncWrite + Send + Unpin, @@ -171,7 +166,6 @@ where } } -#[async_trait] impl<'a, W> ListWriter for EndingListWriter<'a, W> where W: AsyncWrite + Send + Unpin, diff --git a/g3proxy/utils/ftp/src/cmd_list.rs b/g3proxy/utils/ftp/src/cmd_list.rs index 63d734799..46c709258 100644 --- a/g3proxy/utils/ftp/src/cmd_list.rs +++ b/g3proxy/utils/ftp/src/cmd_list.rs @@ -14,7 +14,6 @@ * limitations under the License. */ -use async_trait::async_trait; use clap::{Arg, ArgMatches, Command}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, Stdout}; @@ -68,7 +67,6 @@ impl Default for StdioLineReceiver { } } -#[async_trait] impl FtpLineDataReceiver for StdioLineReceiver { async fn recv_line(&mut self, line: &str) { self.has_error = self.io.write_all(line.as_bytes()).await.is_err(); diff --git a/lib/g3-ftp-client/Cargo.toml b/lib/g3-ftp-client/Cargo.toml index 0aca5c972..acf348a9f 100644 --- a/lib/g3-ftp-client/Cargo.toml +++ b/lib/g3-ftp-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "g3-ftp-client" -version = "0.2.0" +version = "0.3.0" license.workspace = true edition.workspace = true diff --git a/lib/g3-ftp-client/src/transfer/line.rs b/lib/g3-ftp-client/src/transfer/line.rs index 3d832e0ef..ae111888f 100644 --- a/lib/g3-ftp-client/src/transfer/line.rs +++ b/lib/g3-ftp-client/src/transfer/line.rs @@ -14,7 +14,6 @@ * limitations under the License. */ -use async_trait::async_trait; use tokio::io::{AsyncRead, AsyncWrite, BufStream}; use g3_io_ext::LimitedBufReadExt; @@ -22,7 +21,7 @@ use g3_io_ext::LimitedBufReadExt; use crate::config::FtpTransferConfig; use crate::error::FtpLineDataReadError; -#[async_trait] +#[allow(async_fn_in_trait)] pub trait FtpLineDataReceiver { async fn recv_line(&mut self, line: &str); fn should_return_early(&self) -> bool; diff --git a/lib/g3-icap-client/Cargo.toml b/lib/g3-icap-client/Cargo.toml index cec15c01b..fc6dd6632 100644 --- a/lib/g3-icap-client/Cargo.toml +++ b/lib/g3-icap-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "g3-icap-client" -version = "0.1.0" +version = "0.2.0" license.workspace = true edition.workspace = true rust-version = "1.74.0" @@ -10,7 +10,6 @@ rust-version = "1.74.0" [dependencies] anyhow.workspace = true thiserror.workspace = true -async-trait.workspace = true memchr.workspace = true atoi.workspace = true url.workspace = true diff --git a/lib/g3-icap-client/src/reqmod/h1/mod.rs b/lib/g3-icap-client/src/reqmod/h1/mod.rs index 64953500a..4dc1e43dd 100644 --- a/lib/g3-icap-client/src/reqmod/h1/mod.rs +++ b/lib/g3-icap-client/src/reqmod/h1/mod.rs @@ -19,7 +19,6 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use async_trait::async_trait; use bytes::BufMut; use http::Method; use tokio::io::{AsyncBufRead, AsyncWrite}; @@ -59,7 +58,7 @@ pub trait HttpRequestForAdaptation { fn adapt_to(&self, other: HttpAdaptedRequest) -> Self; } -#[async_trait] +#[allow(async_fn_in_trait)] pub trait HttpRequestUpstreamWriter: AsyncWrite { async fn send_request_header(&mut self, req: &H) -> io::Result<()>; } diff --git a/lib/g3-icap-client/src/respmod/h1/impl_trait.rs b/lib/g3-icap-client/src/respmod/h1/impl_trait.rs index ace315e69..6ea3b7198 100644 --- a/lib/g3-icap-client/src/respmod/h1/impl_trait.rs +++ b/lib/g3-icap-client/src/respmod/h1/impl_trait.rs @@ -14,7 +14,6 @@ * limitations under the License. */ -use async_trait::async_trait; use bytes::BufMut; use http::{header, Method}; use tokio::io::{AsyncWrite, AsyncWriteExt}; @@ -68,7 +67,6 @@ impl HttpResponseForAdaptation for HttpTransparentResponse { } } -#[async_trait] impl HttpResponseClientWriter for W where W: AsyncWrite + Send + Unpin, diff --git a/lib/g3-icap-client/src/respmod/h1/mod.rs b/lib/g3-icap-client/src/respmod/h1/mod.rs index 58242ef7b..5e5a483a6 100644 --- a/lib/g3-icap-client/src/respmod/h1/mod.rs +++ b/lib/g3-icap-client/src/respmod/h1/mod.rs @@ -19,7 +19,6 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use async_trait::async_trait; use bytes::BufMut; use http::Method; use tokio::io::{AsyncBufRead, AsyncWrite}; @@ -55,7 +54,7 @@ pub trait HttpResponseForAdaptation { fn adapt_to(&self, other: HttpAdaptedResponse) -> Self; } -#[async_trait] +#[allow(async_fn_in_trait)] pub trait HttpResponseClientWriter: AsyncWrite { async fn send_response_header(&mut self, req: &H) -> io::Result<()>; } From d4410a338d4d3a053904eed2e88ad5b24bee9175 Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 20 Nov 2023 14:39:16 +0800 Subject: [PATCH 4/8] update ci config --- .github/workflows/linux.yml | 4 ++-- .github/workflows/macos.yml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 9d90fd69b..e7282550d 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -12,7 +12,7 @@ jobs: strategy: matrix: rust: - - stable + #- stable - beta - nightly steps: @@ -38,7 +38,7 @@ jobs: - name: Checkout sources uses: actions/checkout@v4 - name: Install rust toolchain - uses: dtolnay/rust-toolchain@stable + uses: dtolnay/rust-toolchain@beta with: components: clippy - name: Install dependencies diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index 682eb4de4..fcd6b6582 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -12,7 +12,7 @@ jobs: strategy: matrix: rust: - - stable + #- stable - beta - nightly steps: @@ -37,7 +37,7 @@ jobs: - name: Checkout sources uses: actions/checkout@v4 - name: Install rust toolchain - uses: dtolnay/rust-toolchain@stable + uses: dtolnay/rust-toolchain@beta with: components: clippy - name: Install dependencies From 5cec6f74922c31bb0dc459468bcb57e9fba6a721 Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 20 Nov 2023 13:52:35 +0800 Subject: [PATCH 5/8] move listen tcp server code to g3-daemon --- g3proxy/src/lib.rs | 2 + g3proxy/src/serve/dummy_close/server.rs | 25 ++- g3proxy/src/serve/http_proxy/server.rs | 83 ++++--- g3proxy/src/serve/http_rproxy/server.rs | 83 ++++--- g3proxy/src/serve/intelli_proxy/server.rs | 48 ++-- g3proxy/src/serve/mod.rs | 12 +- g3proxy/src/serve/native_tls_port/mod.rs | 48 ++-- g3proxy/src/serve/plain_quic_port/mod.rs | 26 ++- g3proxy/src/serve/plain_tcp_port/mod.rs | 48 ++-- g3proxy/src/serve/plain_tls_port/mod.rs | 48 ++-- g3proxy/src/serve/runtime/listen_tcp.rs | 253 ---------------------- g3proxy/src/serve/runtime/mod.rs | 3 - g3proxy/src/serve/sni_proxy/server.rs | 47 ++-- g3proxy/src/serve/socks_proxy/server.rs | 47 ++-- g3proxy/src/serve/tcp_stream/server.rs | 47 ++-- g3proxy/src/serve/tls_stream/server.rs | 83 ++++--- lib/g3-daemon/src/listen/tcp.rs | 10 +- 17 files changed, 405 insertions(+), 508 deletions(-) delete mode 100644 g3proxy/src/serve/runtime/listen_tcp.rs diff --git a/g3proxy/src/lib.rs b/g3proxy/src/lib.rs index 3c19d1fec..2ed6acdc9 100644 --- a/g3proxy/src/lib.rs +++ b/g3proxy/src/lib.rs @@ -14,6 +14,8 @@ * limitations under the License. */ +#![feature(trait_upcasting)] + pub mod audit; pub mod auth; pub mod config; diff --git a/g3proxy/src/serve/dummy_close/server.rs b/g3proxy/src/serve/dummy_close/server.rs index 317059437..2e51262c7 100644 --- a/g3proxy/src/serve/dummy_close/server.rs +++ b/g3proxy/src/serve/dummy_close/server.rs @@ -24,8 +24,8 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::metrics::MetricsName; use crate::config::server::dummy_close::DummyCloseServerConfig; @@ -128,16 +128,31 @@ impl ServerInternal for DummyCloseServer { } } -#[async_trait] -impl Server for DummyCloseServer { +impl BaseServer for DummyCloseServer { fn name(&self) -> &MetricsName { self.config.name() } + fn server_type(&self) -> &'static str { + self.config.server_type() + } + fn version(&self) -> usize { 0 } +} + +#[async_trait] +impl AcceptTcpServer for DummyCloseServer { + async fn run_tcp_task(&self, _stream: TcpStream, _cc_info: ClientConnectionInfo) {} + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + +#[async_trait] +impl Server for DummyCloseServer { fn escaper(&self) -> &MetricsName { Default::default() } @@ -163,8 +178,6 @@ impl Server for DummyCloseServer { &self.quit_policy } - async fn run_tcp_task(&self, _stream: TcpStream, _cc_info: ClientConnectionInfo) {} - async fn run_rustls_task(&self, _stream: TlsStream, _cc_info: ClientConnectionInfo) { } diff --git a/g3proxy/src/serve/http_proxy/server.rs b/g3proxy/src/serve/http_proxy/server.rs index e546e5840..9adf69cd9 100644 --- a/g3proxy/src/serve/http_proxy/server.rs +++ b/g3proxy/src/serve/http_proxy/server.rs @@ -30,8 +30,8 @@ use tokio::sync::{broadcast, mpsc}; use tokio_openssl::SslStream; use tokio_rustls::{server::TlsStream, TlsAcceptor}; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::acl_set::AclDstHostRuleSet; use g3_types::metrics::MetricsName; @@ -48,8 +48,7 @@ use crate::config::server::http_proxy::HttpProxyServerConfig; use crate::config::server::{AnyServerConfig, ServerConfig}; use crate::escape::ArcEscaper; use crate::serve::{ - ArcServer, ArcServerStats, ListenTcpRuntime, Server, ServerInternal, ServerQuitPolicy, - ServerStats, + ArcServer, ArcServerStats, Server, ServerInternal, ServerQuitPolicy, ServerStats, }; pub(crate) struct HttpProxyServer { @@ -312,7 +311,7 @@ impl ServerInternal for HttpProxyServer { let Some(listen_config) = &self.config.listen else { return Ok(()); }; - let runtime = ListenTcpRuntime::new(server, &*self.config); + let runtime = ListenTcpRuntime::new(server.clone(), server.get_listen_stats()); runtime .run_all_instances( listen_config, @@ -328,47 +327,25 @@ impl ServerInternal for HttpProxyServer { } } -#[async_trait] -impl Server for HttpProxyServer { +impl BaseServer for HttpProxyServer { #[inline] fn name(&self) -> &MetricsName { self.config.name() } #[inline] - fn version(&self) -> usize { - self.reload_version - } - - fn escaper(&self) -> &MetricsName { - self.config.escaper() - } - - fn user_group(&self) -> &MetricsName { - self.config.user_group() - } - - fn auditor(&self) -> &MetricsName { - self.config.auditor() - } - - fn get_server_stats(&self) -> Option { - Some(Arc::clone(&self.server_stats) as _) - } - - fn get_listen_stats(&self) -> Arc { - Arc::clone(&self.listen_stats) - } - - fn alive_count(&self) -> i32 { - self.server_stats.get_alive_count() + fn server_type(&self) -> &'static str { + self.config.server_type() } #[inline] - fn quit_policy(&self) -> &Arc { - &self.quit_policy + fn version(&self) -> usize { + self.reload_version } +} +#[async_trait] +impl AcceptTcpServer for HttpProxyServer { async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { let client_addr = cc_info.client_addr(); self.server_stats.add_conn(client_addr); @@ -403,6 +380,42 @@ impl Server for HttpProxyServer { } } + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + +#[async_trait] +impl Server for HttpProxyServer { + fn escaper(&self) -> &MetricsName { + self.config.escaper() + } + + fn user_group(&self) -> &MetricsName { + self.config.user_group() + } + + fn auditor(&self) -> &MetricsName { + self.config.auditor() + } + + fn get_server_stats(&self) -> Option { + Some(Arc::clone(&self.server_stats) as _) + } + + fn get_listen_stats(&self) -> Arc { + Arc::clone(&self.listen_stats) + } + + fn alive_count(&self) -> i32 { + self.server_stats.get_alive_count() + } + + #[inline] + fn quit_policy(&self) -> &Arc { + &self.quit_policy + } + async fn run_rustls_task(&self, stream: TlsStream, cc_info: ClientConnectionInfo) { let client_addr = cc_info.client_addr(); self.server_stats.add_conn(client_addr); diff --git a/g3proxy/src/serve/http_rproxy/server.rs b/g3proxy/src/serve/http_rproxy/server.rs index 8be7afdb9..4e235d7c8 100644 --- a/g3proxy/src/serve/http_rproxy/server.rs +++ b/g3proxy/src/serve/http_rproxy/server.rs @@ -31,8 +31,8 @@ use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; use tokio_rustls::LazyConfigAcceptor; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; use g3_types::net::{RustlsServerConfig, UpstreamAddr}; @@ -48,8 +48,7 @@ use crate::config::server::http_rproxy::HttpRProxyServerConfig; use crate::config::server::{AnyServerConfig, ServerConfig}; use crate::escape::ArcEscaper; use crate::serve::{ - ArcServer, ArcServerStats, ListenTcpRuntime, Server, ServerInternal, ServerQuitPolicy, - ServerStats, + ArcServer, ArcServerStats, Server, ServerInternal, ServerQuitPolicy, ServerStats, }; pub(crate) struct HttpRProxyServer { @@ -276,7 +275,7 @@ impl ServerInternal for HttpRProxyServer { let Some(listen_config) = &self.config.listen else { return Ok(()); }; - let runtime = ListenTcpRuntime::new(server, &*self.config); + let runtime = ListenTcpRuntime::new(server.clone(), server.get_listen_stats()); runtime .run_all_instances( listen_config, @@ -292,47 +291,25 @@ impl ServerInternal for HttpRProxyServer { } } -#[async_trait] -impl Server for HttpRProxyServer { +impl BaseServer for HttpRProxyServer { #[inline] fn name(&self) -> &MetricsName { self.config.name() } #[inline] - fn version(&self) -> usize { - self.reload_version - } - - fn escaper(&self) -> &MetricsName { - self.config.escaper() - } - - fn user_group(&self) -> &MetricsName { - self.config.user_group() - } - - fn auditor(&self) -> &MetricsName { - self.config.auditor() - } - - fn get_server_stats(&self) -> Option { - Some(Arc::clone(&self.server_stats) as _) - } - - fn get_listen_stats(&self) -> Arc { - Arc::clone(&self.listen_stats) - } - - fn alive_count(&self) -> i32 { - self.server_stats.get_alive_count() + fn server_type(&self) -> &'static str { + self.config.server_type() } #[inline] - fn quit_policy(&self) -> &Arc { - &self.quit_policy + fn version(&self) -> usize { + self.reload_version } +} +#[async_trait] +impl AcceptTcpServer for HttpRProxyServer { async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { let client_addr = cc_info.client_addr(); self.server_stats.add_conn(client_addr); @@ -420,6 +397,42 @@ impl Server for HttpRProxyServer { } } + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + +#[async_trait] +impl Server for HttpRProxyServer { + fn escaper(&self) -> &MetricsName { + self.config.escaper() + } + + fn user_group(&self) -> &MetricsName { + self.config.user_group() + } + + fn auditor(&self) -> &MetricsName { + self.config.auditor() + } + + fn get_server_stats(&self) -> Option { + Some(Arc::clone(&self.server_stats) as _) + } + + fn get_listen_stats(&self) -> Arc { + Arc::clone(&self.listen_stats) + } + + fn alive_count(&self) -> i32 { + self.server_stats.get_alive_count() + } + + #[inline] + fn quit_policy(&self) -> &Arc { + &self.quit_policy + } + async fn run_rustls_task(&self, stream: TlsStream, cc_info: ClientConnectionInfo) { let client_addr = cc_info.client_addr(); self.server_stats.add_conn(client_addr); diff --git a/g3proxy/src/serve/intelli_proxy/server.rs b/g3proxy/src/serve/intelli_proxy/server.rs index 99322b61f..48fbe3d1d 100644 --- a/g3proxy/src/serve/intelli_proxy/server.rs +++ b/g3proxy/src/serve/intelli_proxy/server.rs @@ -26,8 +26,8 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; @@ -36,10 +36,9 @@ use g3_types::net::ProxyProtocolVersion; use super::{detect_tcp_proxy_protocol, DetectedProxyProtocol}; use crate::config::server::intelli_proxy::IntelliProxyConfig; use crate::config::server::{AnyServerConfig, ServerConfig}; -use crate::serve::{ArcServer, ListenTcpRuntime, Server, ServerInternal, ServerQuitPolicy}; +use crate::serve::{ArcServer, Server, ServerInternal, ServerQuitPolicy}; pub(crate) struct IntelliProxy { - name: MetricsName, config: IntelliProxyConfig, listen_stats: Arc, ingress_net_filter: Option, @@ -68,7 +67,6 @@ impl IntelliProxy { let socks_server = Arc::new(crate::serve::get_or_insert_default(&config.socks_server)); IntelliProxy { - name: config.name().clone(), config, listen_stats, ingress_net_filter, @@ -222,7 +220,7 @@ impl ServerInternal for IntelliProxy { } fn _start_runtime(&self, server: &ArcServer) -> anyhow::Result<()> { - let runtime = ListenTcpRuntime::new(server, &self.config); + let runtime = ListenTcpRuntime::new(server.clone(), server.get_listen_stats()); runtime.run_all_instances( &self.config.listen, self.config.listen_in_worker, @@ -235,18 +233,41 @@ impl ServerInternal for IntelliProxy { } } -#[async_trait] -impl Server for IntelliProxy { +impl BaseServer for IntelliProxy { #[inline] fn name(&self) -> &MetricsName { - &self.name + self.config.name() + } + + #[inline] + fn server_type(&self) -> &'static str { + self.config.server_type() } #[inline] fn version(&self) -> usize { self.reload_version } +} + +#[async_trait] +impl AcceptTcpServer for IntelliProxy { + async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { + let client_addr = cc_info.client_addr(); + if self.drop_early(client_addr) { + return; + } + + self.run_task(stream, cc_info).await + } + + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} +#[async_trait] +impl Server for IntelliProxy { fn escaper(&self) -> &MetricsName { Default::default() } @@ -272,15 +293,6 @@ impl Server for IntelliProxy { &self.quit_policy } - async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { - let client_addr = cc_info.client_addr(); - if self.drop_early(client_addr) { - return; - } - - self.run_task(stream, cc_info).await - } - async fn run_rustls_task(&self, _stream: TlsStream, _cc_info: ClientConnectionInfo) { } diff --git a/g3proxy/src/serve/mod.rs b/g3proxy/src/serve/mod.rs index a80471b4d..0e1013e2d 100644 --- a/g3proxy/src/serve/mod.rs +++ b/g3proxy/src/serve/mod.rs @@ -23,8 +23,8 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerQuitPolicy, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ListenStats}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerQuitPolicy, ServerReloadCommand}; use g3_types::metrics::MetricsName; use crate::config::server::AnyServerConfig; @@ -36,7 +36,7 @@ mod idle_check; pub(crate) use idle_check::ServerIdleChecker; mod runtime; -use runtime::{ListenQuicRuntime, ListenTcpRuntime}; +use runtime::ListenQuicRuntime; mod dummy_close; mod intelli_proxy; @@ -90,9 +90,7 @@ pub(crate) trait ServerInternal { } #[async_trait] -pub(crate) trait Server: ServerInternal { - fn name(&self) -> &MetricsName; - fn version(&self) -> usize; +pub(crate) trait Server: ServerInternal + BaseServer + AcceptTcpServer { fn escaper(&self) -> &MetricsName; fn user_group(&self) -> &MetricsName; fn auditor(&self) -> &MetricsName; @@ -105,8 +103,6 @@ pub(crate) trait Server: ServerInternal { fn alive_count(&self) -> i32; fn quit_policy(&self) -> &Arc; - async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo); - async fn run_rustls_task(&self, stream: TlsStream, cc_info: ClientConnectionInfo); async fn run_openssl_task(&self, stream: SslStream, cc_info: ClientConnectionInfo); diff --git a/g3proxy/src/serve/native_tls_port/mod.rs b/g3proxy/src/serve/native_tls_port/mod.rs index 2230bc8dd..720d46256 100644 --- a/g3proxy/src/serve/native_tls_port/mod.rs +++ b/g3proxy/src/serve/native_tls_port/mod.rs @@ -29,8 +29,8 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; @@ -38,10 +38,9 @@ use g3_types::net::{OpensslServerConfig, ProxyProtocolVersion}; use crate::config::server::native_tls_port::NativeTlsPortConfig; use crate::config::server::{AnyServerConfig, ServerConfig}; -use crate::serve::{ArcServer, ListenTcpRuntime, Server, ServerInternal, ServerQuitPolicy}; +use crate::serve::{ArcServer, Server, ServerInternal, ServerQuitPolicy}; pub(crate) struct NativeTlsPort { - name: MetricsName, config: NativeTlsPortConfig, listen_stats: Arc, tls_server_config: OpensslServerConfig, @@ -77,7 +76,6 @@ impl NativeTlsPort { let next_server = Arc::new(crate::serve::get_or_insert_default(&config.server)); Ok(NativeTlsPort { - name: config.name().clone(), config, listen_stats, tls_server_config, @@ -238,7 +236,7 @@ impl ServerInternal for NativeTlsPort { } fn _start_runtime(&self, server: &ArcServer) -> anyhow::Result<()> { - let runtime = ListenTcpRuntime::new(server, &self.config); + let runtime = ListenTcpRuntime::new(server.clone(), server.get_listen_stats()); runtime.run_all_instances( &self.config.listen, self.config.listen_in_worker, @@ -251,18 +249,41 @@ impl ServerInternal for NativeTlsPort { } } -#[async_trait] -impl Server for NativeTlsPort { +impl BaseServer for NativeTlsPort { #[inline] fn name(&self) -> &MetricsName { - &self.name + self.config.name() + } + + #[inline] + fn server_type(&self) -> &'static str { + self.config.server_type() } #[inline] fn version(&self) -> usize { self.reload_version } +} + +#[async_trait] +impl AcceptTcpServer for NativeTlsPort { + async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { + let client_addr = cc_info.client_addr(); + if self.drop_early(client_addr) { + return; + } + + self.run_task(stream, cc_info).await + } + + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} +#[async_trait] +impl Server for NativeTlsPort { fn escaper(&self) -> &MetricsName { Default::default() } @@ -288,15 +309,6 @@ impl Server for NativeTlsPort { &self.quit_policy } - async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { - let client_addr = cc_info.client_addr(); - if self.drop_early(client_addr) { - return; - } - - self.run_task(stream, cc_info).await - } - async fn run_rustls_task(&self, _stream: TlsStream, _cc_info: ClientConnectionInfo) { } diff --git a/g3proxy/src/serve/plain_quic_port/mod.rs b/g3proxy/src/serve/plain_quic_port/mod.rs index bd3de9da1..c167461de 100644 --- a/g3proxy/src/serve/plain_quic_port/mod.rs +++ b/g3proxy/src/serve/plain_quic_port/mod.rs @@ -26,8 +26,8 @@ use tokio::sync::{broadcast, watch}; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{ListenQuicConf, ListenStats}; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenQuicConf}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::AclNetworkRule; use g3_types::metrics::MetricsName; use g3_types::net::UdpListenConfig; @@ -243,18 +243,34 @@ impl ServerInternal for PlainQuicPort { } } -#[async_trait] -impl Server for PlainQuicPort { +impl BaseServer for PlainQuicPort { #[inline] fn name(&self) -> &MetricsName { &self.name } + fn server_type(&self) -> &'static str { + let config = self.config.load(); + config.server_type() + } + #[inline] fn version(&self) -> usize { self.reload_version } +} + +#[async_trait] +impl AcceptTcpServer for PlainQuicPort { + async fn run_tcp_task(&self, _stream: TcpStream, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(&self.name) + } +} +#[async_trait] +impl Server for PlainQuicPort { fn escaper(&self) -> &MetricsName { Default::default() } @@ -280,8 +296,6 @@ impl Server for PlainQuicPort { &self.quit_policy } - async fn run_tcp_task(&self, _stream: TcpStream, _cc_info: ClientConnectionInfo) {} - async fn run_rustls_task(&self, _stream: TlsStream, _cc_info: ClientConnectionInfo) { } diff --git a/g3proxy/src/serve/plain_tcp_port/mod.rs b/g3proxy/src/serve/plain_tcp_port/mod.rs index 6e8c81f0f..53c89254f 100644 --- a/g3proxy/src/serve/plain_tcp_port/mod.rs +++ b/g3proxy/src/serve/plain_tcp_port/mod.rs @@ -26,8 +26,8 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; @@ -35,10 +35,9 @@ use g3_types::net::ProxyProtocolVersion; use crate::config::server::plain_tcp_port::PlainTcpPortConfig; use crate::config::server::{AnyServerConfig, ServerConfig}; -use crate::serve::{ArcServer, ListenTcpRuntime, Server, ServerInternal, ServerQuitPolicy}; +use crate::serve::{ArcServer, Server, ServerInternal, ServerQuitPolicy}; pub(crate) struct PlainTcpPort { - name: MetricsName, config: PlainTcpPortConfig, listen_stats: Arc, ingress_net_filter: Option, @@ -65,7 +64,6 @@ impl PlainTcpPort { let next_server = Arc::new(crate::serve::get_or_insert_default(&config.server)); Ok(PlainTcpPort { - name: config.name().clone(), config, listen_stats, ingress_net_filter, @@ -188,7 +186,7 @@ impl ServerInternal for PlainTcpPort { } fn _start_runtime(&self, server: &ArcServer) -> anyhow::Result<()> { - let runtime = ListenTcpRuntime::new(server, &self.config); + let runtime = ListenTcpRuntime::new(server.clone(), server.get_listen_stats()); runtime.run_all_instances( &self.config.listen, self.config.listen_in_worker, @@ -201,18 +199,41 @@ impl ServerInternal for PlainTcpPort { } } -#[async_trait] -impl Server for PlainTcpPort { +impl BaseServer for PlainTcpPort { #[inline] fn name(&self) -> &MetricsName { - &self.name + self.config.name() + } + + #[inline] + fn server_type(&self) -> &'static str { + self.config.server_type() } #[inline] fn version(&self) -> usize { self.reload_version } +} + +#[async_trait] +impl AcceptTcpServer for PlainTcpPort { + async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { + let client_addr = cc_info.client_addr(); + if self.drop_early(client_addr) { + return; + } + + self.run_task(stream, cc_info).await + } + + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} +#[async_trait] +impl Server for PlainTcpPort { fn escaper(&self) -> &MetricsName { Default::default() } @@ -238,15 +259,6 @@ impl Server for PlainTcpPort { &self.quit_policy } - async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { - let client_addr = cc_info.client_addr(); - if self.drop_early(client_addr) { - return; - } - - self.run_task(stream, cc_info).await - } - async fn run_rustls_task(&self, _stream: TlsStream, _cc_info: ClientConnectionInfo) { } diff --git a/g3proxy/src/serve/plain_tls_port/mod.rs b/g3proxy/src/serve/plain_tls_port/mod.rs index 9bc918584..f5ca14be1 100644 --- a/g3proxy/src/serve/plain_tls_port/mod.rs +++ b/g3proxy/src/serve/plain_tls_port/mod.rs @@ -28,8 +28,8 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::{server::TlsStream, TlsAcceptor}; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; @@ -37,10 +37,9 @@ use g3_types::net::ProxyProtocolVersion; use crate::config::server::plain_tls_port::PlainTlsPortConfig; use crate::config::server::{AnyServerConfig, ServerConfig}; -use crate::serve::{ArcServer, ListenTcpRuntime, Server, ServerInternal, ServerQuitPolicy}; +use crate::serve::{ArcServer, Server, ServerInternal, ServerQuitPolicy}; pub(crate) struct PlainTlsPort { - name: MetricsName, config: PlainTlsPortConfig, listen_stats: Arc, tls_acceptor: TlsAcceptor, @@ -77,7 +76,6 @@ impl PlainTlsPort { let next_server = Arc::new(crate::serve::get_or_insert_default(&config.server)); Ok(PlainTlsPort { - name: config.name().clone(), config, listen_stats, tls_acceptor: TlsAcceptor::from(tls_server_config.driver), @@ -226,7 +224,7 @@ impl ServerInternal for PlainTlsPort { } fn _start_runtime(&self, server: &ArcServer) -> anyhow::Result<()> { - let runtime = ListenTcpRuntime::new(server, &self.config); + let runtime = ListenTcpRuntime::new(server.clone(), server.get_listen_stats()); runtime.run_all_instances( &self.config.listen, self.config.listen_in_worker, @@ -239,18 +237,41 @@ impl ServerInternal for PlainTlsPort { } } -#[async_trait] -impl Server for PlainTlsPort { +impl BaseServer for PlainTlsPort { #[inline] fn name(&self) -> &MetricsName { - &self.name + self.config.name() + } + + #[inline] + fn server_type(&self) -> &'static str { + self.config.server_type() } #[inline] fn version(&self) -> usize { self.reload_version } +} + +#[async_trait] +impl AcceptTcpServer for PlainTlsPort { + async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { + let client_addr = cc_info.client_addr(); + if self.drop_early(client_addr) { + return; + } + + self.run_task(stream, cc_info).await + } + + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} +#[async_trait] +impl Server for PlainTlsPort { fn escaper(&self) -> &MetricsName { Default::default() } @@ -276,15 +297,6 @@ impl Server for PlainTlsPort { &self.quit_policy } - async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { - let client_addr = cc_info.client_addr(); - if self.drop_early(client_addr) { - return; - } - - self.run_task(stream, cc_info).await - } - async fn run_rustls_task(&self, _stream: TlsStream, _cc_info: ClientConnectionInfo) { } diff --git a/g3proxy/src/serve/runtime/listen_tcp.rs b/g3proxy/src/serve/runtime/listen_tcp.rs deleted file mode 100644 index a3401cca3..000000000 --- a/g3proxy/src/serve/runtime/listen_tcp.rs +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Copyright 2023 ByteDance and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::net::SocketAddr; -use std::os::fd::AsRawFd; -use std::sync::Arc; - -use log::{info, warn}; -use tokio::net::TcpStream; -use tokio::runtime::Handle; -use tokio::sync::broadcast; - -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; -use g3_io_ext::LimitedTcpListener; -use g3_socket::util::native_socket_addr; -use g3_types::net::TcpListenConfig; - -use crate::config::server::ServerConfig; -use crate::serve::ArcServer; - -#[derive(Clone)] -pub(crate) struct ListenTcpRuntime { - server: ArcServer, - server_type: &'static str, - server_version: usize, - worker_id: Option, - listen_stats: Arc, - instance_id: usize, -} - -impl ListenTcpRuntime { - pub(crate) fn new(server: &ArcServer, server_config: &C) -> Self { - ListenTcpRuntime { - server: Arc::clone(server), - server_type: server_config.server_type(), - server_version: server.version(), - worker_id: None, - listen_stats: server.get_listen_stats(), - instance_id: 0, - } - } - - fn pre_start(&self) { - info!( - "started {} SRT[{}_v{}#{}]", - self.server_type, - self.server.name(), - self.server_version, - self.instance_id, - ); - self.listen_stats.add_running_runtime(); - } - - fn pre_stop(&self) { - info!( - "stopping {} SRT[{}_v{}#{}]", - self.server_type, - self.server.name(), - self.server_version, - self.instance_id, - ); - } - - fn post_stop(&self) { - info!( - "stopped {} SRT[{}_v{}#{}]", - self.server_type, - self.server.name(), - self.server_version, - self.instance_id, - ); - self.listen_stats.del_running_runtime(); - } - - async fn run( - mut self, - mut listener: LimitedTcpListener, - mut server_reload_channel: broadcast::Receiver, - ) { - use broadcast::error::RecvError; - - loop { - tokio::select! { - biased; - - ev = server_reload_channel.recv() => { - match ev { - Ok(ServerReloadCommand::ReloadVersion(version)) => { - info!("SRT[{}_v{}#{}] received reload request from v{version}", - self.server.name(), self.server_version, self.instance_id); - match crate::serve::get_server(self.server.name()) { - Ok(server) => { - self.server_version = server.version(); - self.server = server; - continue; - } - Err(_) => { - info!("SRT[{}_v{}#{}] will quit as no server v{version}+ found", - self.server.name(), self.server_version, self.instance_id); - } - } - } - Ok(ServerReloadCommand::QuitRuntime) => {}, - Err(RecvError::Closed) => {}, - Err(RecvError::Lagged(dropped)) => { - warn!("SRT[{}_v{}#{}] server {} reload notify channel overflowed, {dropped} msg dropped", - self.server.name(), self.server_version, self.instance_id, self.server.name()); - continue; - }, - } - - info!("SRT[{}_v{}#{}] will go offline", - self.server.name(), self.server_version, self.instance_id); - self.pre_stop(); - let accept_again = listener.set_offline(); - if accept_again { - info!("SRT[{}_v{}#{}] will accept all pending connections", - self.server.name(), self.server_version, self.instance_id); - continue; - } else { - break; - } - } - result = listener.accept() => { - if listener.accept_current_available(result, |result| { - match result { - Ok(Some((stream, peer_addr, local_addr))) => { - self.listen_stats.add_accepted(); - self.run_task( - stream, - native_socket_addr(peer_addr), - native_socket_addr(local_addr), - ); - Ok(()) - } - Ok(None) => { - info!("SRT[{}_v{}#{}] offline", - self.server.name(), self.server_version, self.instance_id); - Err(()) - } - Err(e) => { - self.listen_stats.add_failed(); - warn!("SRT[{}_v{}#{}] accept: {e:?}", - self.server.name(), self.server_version, self.instance_id); - Ok(()) - } - } - }).await.is_err() { - break; - } - } - } - } - self.post_stop(); - } - - fn run_task(&self, stream: TcpStream, peer_addr: SocketAddr, local_addr: SocketAddr) { - let server = Arc::clone(&self.server); - - let mut cc_info = ClientConnectionInfo::new(peer_addr, local_addr); - cc_info.set_tcp_raw_fd(stream.as_raw_fd()); - if let Some(worker_id) = self.worker_id { - cc_info.set_worker_id(Some(worker_id)); - tokio::spawn(async move { - server.run_tcp_task(stream, cc_info).await; - }); - } else if let Some(rt) = g3_daemon::runtime::worker::select_handle() { - cc_info.set_worker_id(Some(rt.id)); - rt.handle.spawn(async move { - server.run_tcp_task(stream, cc_info).await; - }); - } else { - tokio::spawn(async move { - server.run_tcp_task(stream, cc_info).await; - }); - } - } - - fn get_rt_handle(&mut self, listen_in_worker: bool) -> Handle { - if listen_in_worker { - if let Some(rt) = g3_daemon::runtime::worker::select_listen_handle() { - self.worker_id = Some(rt.id); - return rt.handle; - } - } - Handle::current() - } - - fn into_running( - mut self, - listener: std::net::TcpListener, - listen_in_worker: bool, - server_reload_channel: broadcast::Receiver, - ) { - let handle = self.get_rt_handle(listen_in_worker); - handle.spawn(async move { - // make sure the listen socket associated with the correct reactor - match tokio::net::TcpListener::from_std(listener) { - Ok(listener) => { - self.pre_start(); - self.run(LimitedTcpListener::new(listener), server_reload_channel) - .await; - } - Err(e) => { - warn!( - "SRT[{}_v{}#{}] listen async: {e:?}", - self.server.name(), - self.server_version, - self.instance_id - ); - } - } - }); - } - - pub(crate) fn run_all_instances( - &self, - listen_config: &TcpListenConfig, - listen_in_worker: bool, - server_reload_sender: &broadcast::Sender, - ) -> anyhow::Result<()> { - let mut instance_count = listen_config.instance(); - if listen_in_worker { - let worker_count = g3_daemon::runtime::worker::worker_count(); - if worker_count > 0 { - instance_count = worker_count; - } - } - - for i in 0..instance_count { - let mut runtime = self.clone(); - runtime.instance_id = i; - - let listener = g3_socket::tcp::new_std_listener(listen_config)?; - runtime.into_running(listener, listen_in_worker, server_reload_sender.subscribe()); - } - Ok(()) - } -} diff --git a/g3proxy/src/serve/runtime/mod.rs b/g3proxy/src/serve/runtime/mod.rs index fd7641eba..daf2fac24 100644 --- a/g3proxy/src/serve/runtime/mod.rs +++ b/g3proxy/src/serve/runtime/mod.rs @@ -14,8 +14,5 @@ * limitations under the License. */ -mod listen_tcp; -pub(crate) use listen_tcp::ListenTcpRuntime; - mod listen_quic; pub(crate) use listen_quic::ListenQuicRuntime; diff --git a/g3proxy/src/serve/sni_proxy/server.rs b/g3proxy/src/serve/sni_proxy/server.rs index 4660a92cb..fda18502d 100644 --- a/g3proxy/src/serve/sni_proxy/server.rs +++ b/g3proxy/src/serve/sni_proxy/server.rs @@ -27,8 +27,8 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_dpi::ProtocolPortMap; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; @@ -39,8 +39,7 @@ use crate::config::server::sni_proxy::SniProxyServerConfig; use crate::config::server::{AnyServerConfig, ServerConfig}; use crate::escape::ArcEscaper; use crate::serve::{ - ArcServer, ArcServerStats, ListenTcpRuntime, Server, ServerInternal, ServerQuitPolicy, - ServerStats, + ArcServer, ArcServerStats, Server, ServerInternal, ServerQuitPolicy, ServerStats, }; pub(crate) struct SniProxyServer { @@ -209,7 +208,7 @@ impl ServerInternal for SniProxyServer { let Some(listen_config) = &self.config.listen else { return Ok(()); }; - let runtime = ListenTcpRuntime::new(server, &*self.config); + let runtime = ListenTcpRuntime::new(server.clone(), server.get_listen_stats()); runtime .run_all_instances( listen_config, @@ -225,18 +224,42 @@ impl ServerInternal for SniProxyServer { } } -#[async_trait] -impl Server for SniProxyServer { +impl BaseServer for SniProxyServer { #[inline] fn name(&self) -> &MetricsName { self.config.name() } + #[inline] + fn server_type(&self) -> &'static str { + self.config.server_type() + } + #[inline] fn version(&self) -> usize { self.reload_version } +} + +#[async_trait] +impl AcceptTcpServer for SniProxyServer { + async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { + let client_addr = cc_info.client_addr(); + self.server_stats.add_conn(client_addr); + if self.drop_early(client_addr) { + return; + } + + self.run_task(stream, cc_info).await + } + + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} +#[async_trait] +impl Server for SniProxyServer { fn escaper(&self) -> &MetricsName { self.config.escaper() } @@ -266,16 +289,6 @@ impl Server for SniProxyServer { &self.quit_policy } - async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { - let client_addr = cc_info.client_addr(); - self.server_stats.add_conn(client_addr); - if self.drop_early(client_addr) { - return; - } - - self.run_task(stream, cc_info).await - } - async fn run_rustls_task(&self, _stream: TlsStream, _cc_info: ClientConnectionInfo) { } diff --git a/g3proxy/src/serve/socks_proxy/server.rs b/g3proxy/src/serve/socks_proxy/server.rs index 19e48f5a3..fa69a7077 100644 --- a/g3proxy/src/serve/socks_proxy/server.rs +++ b/g3proxy/src/serve/socks_proxy/server.rs @@ -27,8 +27,8 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::acl_set::AclDstHostRuleSet; use g3_types::metrics::MetricsName; @@ -41,8 +41,7 @@ use crate::config::server::socks_proxy::SocksProxyServerConfig; use crate::config::server::{AnyServerConfig, ServerConfig}; use crate::escape::ArcEscaper; use crate::serve::{ - ArcServer, ArcServerStats, ListenTcpRuntime, Server, ServerInternal, ServerQuitPolicy, - ServerStats, + ArcServer, ArcServerStats, Server, ServerInternal, ServerQuitPolicy, ServerStats, }; pub(crate) struct SocksProxyServer { @@ -218,7 +217,7 @@ impl ServerInternal for SocksProxyServer { let Some(listen_config) = &self.config.listen else { return Ok(()); }; - let runtime = ListenTcpRuntime::new(server, &*self.config); + let runtime = ListenTcpRuntime::new(server.clone(), server.get_listen_stats()); runtime .run_all_instances( listen_config, @@ -234,18 +233,42 @@ impl ServerInternal for SocksProxyServer { } } -#[async_trait] -impl Server for SocksProxyServer { +impl BaseServer for SocksProxyServer { #[inline] fn name(&self) -> &MetricsName { self.config.name() } + #[inline] + fn server_type(&self) -> &'static str { + self.config.server_type() + } + #[inline] fn version(&self) -> usize { self.reload_version } +} + +#[async_trait] +impl AcceptTcpServer for SocksProxyServer { + async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { + let client_addr = cc_info.client_addr(); + self.server_stats.add_conn(client_addr); + if self.drop_early(client_addr) { + return; + } + + self.run_task(stream, cc_info).await + } + + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} +#[async_trait] +impl Server for SocksProxyServer { fn escaper(&self) -> &MetricsName { self.config.escaper() } @@ -275,16 +298,6 @@ impl Server for SocksProxyServer { &self.quit_policy } - async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { - let client_addr = cc_info.client_addr(); - self.server_stats.add_conn(client_addr); - if self.drop_early(client_addr) { - return; - } - - self.run_task(stream, cc_info).await - } - async fn run_rustls_task(&self, _stream: TlsStream, cc_info: ClientConnectionInfo) { self.server_stats.add_conn(cc_info.client_addr()); self.listen_stats.add_dropped(); diff --git a/g3proxy/src/serve/tcp_stream/server.rs b/g3proxy/src/serve/tcp_stream/server.rs index e989af3ea..531d45b08 100644 --- a/g3proxy/src/serve/tcp_stream/server.rs +++ b/g3proxy/src/serve/tcp_stream/server.rs @@ -29,8 +29,8 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::collection::{SelectivePickPolicy, SelectiveVec, SelectiveVecBuilder}; use g3_types::metrics::MetricsName; @@ -44,8 +44,7 @@ use crate::config::server::tcp_stream::TcpStreamServerConfig; use crate::config::server::{AnyServerConfig, ServerConfig}; use crate::escape::ArcEscaper; use crate::serve::{ - ArcServer, ArcServerStats, ListenTcpRuntime, Server, ServerInternal, ServerQuitPolicy, - ServerStats, + ArcServer, ArcServerStats, Server, ServerInternal, ServerQuitPolicy, ServerStats, }; pub(crate) struct TcpStreamServer { @@ -283,7 +282,7 @@ impl ServerInternal for TcpStreamServer { let Some(listen_config) = &self.config.listen else { return Ok(()); }; - let runtime = ListenTcpRuntime::new(server, &*self.config); + let runtime = ListenTcpRuntime::new(server.clone(), server.get_listen_stats()); runtime .run_all_instances( listen_config, @@ -299,18 +298,42 @@ impl ServerInternal for TcpStreamServer { } } -#[async_trait] -impl Server for TcpStreamServer { +impl BaseServer for TcpStreamServer { #[inline] fn name(&self) -> &MetricsName { self.config.name() } + #[inline] + fn server_type(&self) -> &'static str { + self.config.server_type() + } + #[inline] fn version(&self) -> usize { self.reload_version } +} + +#[async_trait] +impl AcceptTcpServer for TcpStreamServer { + async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { + let client_addr = cc_info.client_addr(); + self.server_stats.add_conn(client_addr); + if self.drop_early(client_addr) { + return; + } + + self.run_task_with_tcp(stream, cc_info).await + } + + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} +#[async_trait] +impl Server for TcpStreamServer { fn escaper(&self) -> &MetricsName { self.config.escaper() } @@ -340,16 +363,6 @@ impl Server for TcpStreamServer { &self.quit_policy } - async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { - let client_addr = cc_info.client_addr(); - self.server_stats.add_conn(client_addr); - if self.drop_early(client_addr) { - return; - } - - self.run_task_with_tcp(stream, cc_info).await - } - async fn run_rustls_task(&self, stream: TlsStream, cc_info: ClientConnectionInfo) { let client_addr = cc_info.client_addr(); self.server_stats.add_conn(client_addr); diff --git a/g3proxy/src/serve/tls_stream/server.rs b/g3proxy/src/serve/tls_stream/server.rs index 612093653..a12e96f17 100644 --- a/g3proxy/src/serve/tls_stream/server.rs +++ b/g3proxy/src/serve/tls_stream/server.rs @@ -29,8 +29,8 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::{server::TlsStream, TlsAcceptor}; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::collection::{SelectivePickPolicy, SelectiveVec, SelectiveVecBuilder}; use g3_types::metrics::MetricsName; @@ -44,8 +44,7 @@ use crate::config::server::{AnyServerConfig, ServerConfig}; use crate::escape::ArcEscaper; use crate::serve::tcp_stream::TcpStreamServerStats; use crate::serve::{ - ArcServer, ArcServerStats, ListenTcpRuntime, Server, ServerInternal, ServerQuitPolicy, - ServerStats, + ArcServer, ArcServerStats, Server, ServerInternal, ServerQuitPolicy, ServerStats, }; pub(crate) struct TlsStreamServer { @@ -260,7 +259,7 @@ impl ServerInternal for TlsStreamServer { let Some(listen_config) = &self.config.listen else { return Ok(()); }; - let runtime = ListenTcpRuntime::new(server, &*self.config); + let runtime = ListenTcpRuntime::new(server.clone(), server.get_listen_stats()); runtime .run_all_instances( listen_config, @@ -276,47 +275,25 @@ impl ServerInternal for TlsStreamServer { } } -#[async_trait] -impl Server for TlsStreamServer { +impl BaseServer for TlsStreamServer { #[inline] fn name(&self) -> &MetricsName { self.config.name() } #[inline] - fn version(&self) -> usize { - self.reload_version - } - - fn escaper(&self) -> &MetricsName { - self.config.escaper() - } - - fn user_group(&self) -> &MetricsName { - Default::default() - } - - fn auditor(&self) -> &MetricsName { - self.config.auditor() - } - - fn get_server_stats(&self) -> Option { - Some(Arc::clone(&self.server_stats) as _) - } - - fn get_listen_stats(&self) -> Arc { - Arc::clone(&self.listen_stats) - } - - fn alive_count(&self) -> i32 { - self.server_stats.get_alive_count() + fn server_type(&self) -> &'static str { + self.config.server_type() } #[inline] - fn quit_policy(&self) -> &Arc { - &self.quit_policy + fn version(&self) -> usize { + self.reload_version } +} +#[async_trait] +impl AcceptTcpServer for TlsStreamServer { async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { let client_addr = cc_info.client_addr(); self.server_stats.add_conn(client_addr); @@ -348,6 +325,42 @@ impl Server for TlsStreamServer { } } + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + +#[async_trait] +impl Server for TlsStreamServer { + fn escaper(&self) -> &MetricsName { + self.config.escaper() + } + + fn user_group(&self) -> &MetricsName { + Default::default() + } + + fn auditor(&self) -> &MetricsName { + self.config.auditor() + } + + fn get_server_stats(&self) -> Option { + Some(Arc::clone(&self.server_stats) as _) + } + + fn get_listen_stats(&self) -> Arc { + Arc::clone(&self.listen_stats) + } + + fn alive_count(&self) -> i32 { + self.server_stats.get_alive_count() + } + + #[inline] + fn quit_policy(&self) -> &Arc { + &self.quit_policy + } + async fn run_rustls_task(&self, _stream: TlsStream, _cc_info: ClientConnectionInfo) { } diff --git a/lib/g3-daemon/src/listen/tcp.rs b/lib/g3-daemon/src/listen/tcp.rs index 8bbc5d603..c42827f56 100644 --- a/lib/g3-daemon/src/listen/tcp.rs +++ b/lib/g3-daemon/src/listen/tcp.rs @@ -50,11 +50,13 @@ pub struct ListenTcpRuntime { } impl ListenTcpRuntime { - pub fn new(server: &ArcAcceptTcpServer, listen_stats: Arc) -> Self { + pub fn new(server: ArcAcceptTcpServer, listen_stats: Arc) -> Self { + let server_type = server.server_type(); + let server_version = server.version(); ListenTcpRuntime { - server: Arc::clone(server), - server_type: server.server_type(), - server_version: server.version(), + server, + server_type, + server_version, worker_id: None, listen_stats, instance_id: 0, From 8ebd664a2bc4aa1f98c07521cf02cb7a73856bd4 Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 20 Nov 2023 14:35:54 +0800 Subject: [PATCH 6/8] g3tiles: use common listen tcp code --- g3tiles/src/lib.rs | 2 + g3tiles/src/serve/dummy_close/server.rs | 25 ++- g3tiles/src/serve/mod.rs | 15 +- g3tiles/src/serve/openssl_proxy/server.rs | 49 ++-- g3tiles/src/serve/plain_tcp_port/mod.rs | 44 ++-- g3tiles/src/serve/runtime/listen_tcp.rs | 259 ---------------------- g3tiles/src/serve/runtime/mod.rs | 18 -- g3tiles/src/serve/rustls_proxy/server.rs | 49 ++-- 8 files changed, 117 insertions(+), 344 deletions(-) delete mode 100644 g3tiles/src/serve/runtime/listen_tcp.rs delete mode 100644 g3tiles/src/serve/runtime/mod.rs diff --git a/g3tiles/src/lib.rs b/g3tiles/src/lib.rs index 44fb19221..116fc9272 100644 --- a/g3tiles/src/lib.rs +++ b/g3tiles/src/lib.rs @@ -14,6 +14,8 @@ * limitations under the License. */ +#![feature(trait_upcasting)] + pub mod config; pub mod control; pub mod opts; diff --git a/g3tiles/src/serve/dummy_close/server.rs b/g3tiles/src/serve/dummy_close/server.rs index 84e691dd2..2e2720eb4 100644 --- a/g3tiles/src/serve/dummy_close/server.rs +++ b/g3tiles/src/serve/dummy_close/server.rs @@ -21,8 +21,8 @@ use async_trait::async_trait; use tokio::net::TcpStream; use tokio::sync::broadcast; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::metrics::MetricsName; use crate::config::server::dummy_close::DummyCloseServerConfig; @@ -117,16 +117,31 @@ impl ServerInternal for DummyCloseServer { } } -#[async_trait] -impl Server for DummyCloseServer { +impl BaseServer for DummyCloseServer { fn name(&self) -> &MetricsName { self.config.name() } + fn server_type(&self) -> &'static str { + self.config.server_type() + } + fn version(&self) -> usize { 0 } +} + +#[async_trait] +impl AcceptTcpServer for DummyCloseServer { + async fn run_tcp_task(&self, _stream: TcpStream, _cc_info: ClientConnectionInfo) {} + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + +#[async_trait] +impl Server for DummyCloseServer { fn get_listen_stats(&self) -> Arc { Arc::clone(&self.listen_stats) } @@ -139,6 +154,4 @@ impl Server for DummyCloseServer { fn quit_policy(&self) -> &Arc { &self.quit_policy } - - async fn run_tcp_task(&self, _stream: TcpStream, _cc_info: ClientConnectionInfo) {} } diff --git a/g3tiles/src/serve/mod.rs b/g3tiles/src/serve/mod.rs index 76739c4b9..8c94bc3a5 100644 --- a/g3tiles/src/serve/mod.rs +++ b/g3tiles/src/serve/mod.rs @@ -17,11 +17,10 @@ use std::sync::Arc; use async_trait::async_trait; -use tokio::net::TcpStream; use tokio::sync::broadcast; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerQuitPolicy, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ListenStats}; +use g3_daemon::server::{BaseServer, ServerQuitPolicy, ServerReloadCommand}; use g3_types::metrics::MetricsName; use crate::config::server::AnyServerConfig; @@ -32,9 +31,6 @@ pub(crate) use registry::{foreach_online as foreach_server, get_names, get_or_in mod error; pub(crate) use error::{ServerTaskError, ServerTaskResult}; -mod runtime; -use runtime::ListenTcpRuntime; - mod dummy_close; mod plain_tcp_port; @@ -70,10 +66,7 @@ pub(crate) trait ServerInternal { } #[async_trait] -pub(crate) trait Server: ServerInternal { - fn name(&self) -> &MetricsName; - fn version(&self) -> usize; - +pub(crate) trait Server: ServerInternal + BaseServer + AcceptTcpServer { fn get_server_stats(&self) -> Option { None } @@ -81,8 +74,6 @@ pub(crate) trait Server: ServerInternal { fn alive_count(&self) -> i32; fn quit_policy(&self) -> &Arc; - - async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo); } pub(crate) type ArcServer = Arc; diff --git a/g3tiles/src/serve/openssl_proxy/server.rs b/g3tiles/src/serve/openssl_proxy/server.rs index 92efbccb8..c853dbfc7 100644 --- a/g3tiles/src/serve/openssl_proxy/server.rs +++ b/g3tiles/src/serve/openssl_proxy/server.rs @@ -27,8 +27,8 @@ use slog::Logger; use tokio::net::TcpStream; use tokio::sync::broadcast; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; use g3_types::route::HostMatch; @@ -37,8 +37,7 @@ use super::{CommonTaskContext, OpensslAcceptTask, OpensslHost, OpensslProxyServe use crate::config::server::openssl_proxy::OpensslProxyServerConfig; use crate::config::server::{AnyServerConfig, ServerConfig}; use crate::serve::{ - ArcServer, ArcServerStats, ListenTcpRuntime, Server, ServerInternal, ServerQuitPolicy, - ServerStats, + ArcServer, ArcServerStats, Server, ServerInternal, ServerQuitPolicy, ServerStats, }; pub(crate) struct OpensslProxyServer { @@ -293,7 +292,7 @@ impl ServerInternal for OpensslProxyServer { } fn _start_runtime(&self, server: &ArcServer) -> anyhow::Result<()> { - let runtime = ListenTcpRuntime::new(server, &*self.config); + let runtime = ListenTcpRuntime::new(server.clone(), server.get_listen_stats()); runtime .run_all_instances( &self.config.listen, @@ -309,16 +308,42 @@ impl ServerInternal for OpensslProxyServer { } } -#[async_trait] -impl Server for OpensslProxyServer { +impl BaseServer for OpensslProxyServer { + #[inline] fn name(&self) -> &MetricsName { self.config.name() } + #[inline] + fn server_type(&self) -> &'static str { + self.config.server_type() + } + + #[inline] fn version(&self) -> usize { self.reload_version } +} + +#[async_trait] +impl AcceptTcpServer for OpensslProxyServer { + async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { + let client_addr = cc_info.client_addr(); + self.server_stats.add_conn(client_addr); + if self.drop_early(client_addr) { + return; + } + + self.run_task(stream, cc_info).await + } + + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} +#[async_trait] +impl Server for OpensslProxyServer { fn get_server_stats(&self) -> Option { Some(Arc::clone(&self.server_stats) as _) } @@ -335,14 +360,4 @@ impl Server for OpensslProxyServer { fn quit_policy(&self) -> &Arc { &self.quit_policy } - - async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { - let client_addr = cc_info.client_addr(); - self.server_stats.add_conn(client_addr); - if self.drop_early(client_addr) { - return; - } - - self.run_task(stream, cc_info).await - } } diff --git a/g3tiles/src/serve/plain_tcp_port/mod.rs b/g3tiles/src/serve/plain_tcp_port/mod.rs index 42bd89057..6f624d29a 100644 --- a/g3tiles/src/serve/plain_tcp_port/mod.rs +++ b/g3tiles/src/serve/plain_tcp_port/mod.rs @@ -23,8 +23,8 @@ use async_trait::async_trait; use tokio::net::TcpStream; use tokio::sync::broadcast; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; @@ -32,7 +32,7 @@ use g3_types::net::ProxyProtocolVersion; use crate::config::server::plain_tcp_port::PlainTcpPortConfig; use crate::config::server::{AnyServerConfig, ServerConfig}; -use crate::serve::{ArcServer, ListenTcpRuntime, Server, ServerInternal, ServerQuitPolicy}; +use crate::serve::{ArcServer, Server, ServerInternal, ServerQuitPolicy}; pub(crate) struct PlainTcpPort { config: PlainTcpPortConfig, @@ -177,7 +177,7 @@ impl ServerInternal for PlainTcpPort { } fn _start_runtime(&self, server: &ArcServer) -> anyhow::Result<()> { - let runtime = ListenTcpRuntime::new(server, &self.config); + let runtime = ListenTcpRuntime::new(server.clone(), server.get_listen_stats()); runtime.run_all_instances( &self.config.listen, self.config.listen_in_worker, @@ -190,18 +190,41 @@ impl ServerInternal for PlainTcpPort { } } -#[async_trait] -impl Server for PlainTcpPort { +impl BaseServer for PlainTcpPort { #[inline] fn name(&self) -> &MetricsName { self.config.name() } + #[inline] + fn server_type(&self) -> &'static str { + self.config.server_type() + } + #[inline] fn version(&self) -> usize { self.reload_version } +} + +#[async_trait] +impl AcceptTcpServer for PlainTcpPort { + async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { + let client_addr = cc_info.client_addr(); + if self.drop_early(client_addr) { + return; + } + + self.run_task(stream, cc_info).await + } + + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} +#[async_trait] +impl Server for PlainTcpPort { fn get_listen_stats(&self) -> Arc { Arc::clone(&self.listen_stats) } @@ -214,13 +237,4 @@ impl Server for PlainTcpPort { fn quit_policy(&self) -> &Arc { &self.quit_policy } - - async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { - let client_addr = cc_info.client_addr(); - if self.drop_early(client_addr) { - return; - } - - self.run_task(stream, cc_info).await - } } diff --git a/g3tiles/src/serve/runtime/listen_tcp.rs b/g3tiles/src/serve/runtime/listen_tcp.rs deleted file mode 100644 index 7c67849d1..000000000 --- a/g3tiles/src/serve/runtime/listen_tcp.rs +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Copyright 2023 ByteDance and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::net::SocketAddr; -use std::os::fd::AsRawFd; -use std::sync::Arc; - -use log::{info, warn}; -use tokio::net::TcpStream; -use tokio::runtime::Handle; -use tokio::sync::broadcast; - -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; -use g3_io_ext::LimitedTcpListener; -use g3_socket::util::native_socket_addr; -use g3_types::net::TcpListenConfig; - -use crate::config::server::ServerConfig; -use crate::serve::ArcServer; - -#[derive(Clone)] -pub(crate) struct ListenTcpRuntime { - server: ArcServer, - server_type: &'static str, - server_version: usize, - worker_id: Option, - listen_stats: Arc, - instance_id: usize, -} - -impl ListenTcpRuntime { - pub(crate) fn new(server: &ArcServer, server_config: &C) -> Self { - ListenTcpRuntime { - server: Arc::clone(server), - server_type: server_config.server_type(), - server_version: server.version(), - worker_id: None, - listen_stats: server.get_listen_stats(), - instance_id: 0, - } - } - - fn pre_start(&self) { - info!( - "started {} SRT[{}_v{}#{}]", - self.server_type, - self.server.name(), - self.server_version, - self.instance_id, - ); - self.listen_stats.add_running_runtime(); - } - - fn pre_stop(&self) { - info!( - "stopping {} SRT[{}_v{}#{}]", - self.server_type, - self.server.name(), - self.server_version, - self.instance_id, - ); - } - - fn post_stop(&self) { - info!( - "stopped {} SRT[{}_v{}#{}]", - self.server_type, - self.server.name(), - self.server_version, - self.instance_id, - ); - self.listen_stats.del_running_runtime(); - } - - async fn run( - mut self, - mut listener: LimitedTcpListener, - mut server_reload_channel: broadcast::Receiver, - ) { - use broadcast::error::RecvError; - - loop { - tokio::select! { - biased; - - ev = server_reload_channel.recv() => { - let cmd = match ev { - Ok(ServerReloadCommand::ReloadVersion(version)) => { - info!("SRT[{}_v{}#{}] received reload request from v{version}", - self.server.name(), self.server_version, self.instance_id); - match crate::serve::get_server(self.server.name()) { - Ok(server) => { - self.server_version = server.version(); - self.server = server; - ServerReloadCommand::ReloadVersion(version) - } - Err(_) => { - info!("SRT[{}_v{}#{}] will quit as no server v{version}+ found", - self.server.name(), self.server_version, self.instance_id); - ServerReloadCommand::QuitRuntime - } - } - } - Ok(ServerReloadCommand::QuitRuntime) => ServerReloadCommand::QuitRuntime, - Err(RecvError::Closed) => ServerReloadCommand::QuitRuntime, - Err(RecvError::Lagged(dropped)) => { - warn!("SRT[{}_v{}#{}] server {} reload notify channel overflowed, {dropped} msg dropped", - self.server.name(), self.server_version, self.instance_id, self.server.name()); - continue - }, - }; - match cmd { - ServerReloadCommand::ReloadVersion(_) => { - } - ServerReloadCommand::QuitRuntime => { - info!("SRT[{}_v{}#{}] will go offline", - self.server.name(), self.server_version, self.instance_id); - self.pre_stop(); - let accept_again = listener.set_offline(); - if accept_again { - info!("SRT[{}_v{}#{}] will accept all pending connections", - self.server.name(), self.server_version, self.instance_id); - continue; - } else { - break; - } - } - } - } - result = listener.accept() => { - if listener.accept_current_available(result, |result| { - match result { - Ok(Some((stream, peer_addr, local_addr))) => { - self.listen_stats.add_accepted(); - self.run_task( - stream, - native_socket_addr(peer_addr), - native_socket_addr(local_addr), - ); - Ok(()) - } - Ok(None) => { - info!("SRT[{}_v{}#{}] offline", - self.server.name(), self.server_version, self.instance_id); - Err(()) - } - Err(e) => { - self.listen_stats.add_failed(); - warn!("SRT[{}_v{}#{}] accept: {e:?}", - self.server.name(), self.server_version, self.instance_id); - Ok(()) - } - } - }).await.is_err() { - break; - } - } - } - } - self.post_stop(); - } - - fn run_task(&self, stream: TcpStream, peer_addr: SocketAddr, local_addr: SocketAddr) { - let server = Arc::clone(&self.server); - - let mut cc_info = ClientConnectionInfo::new(peer_addr, local_addr); - cc_info.set_tcp_raw_fd(stream.as_raw_fd()); - if let Some(worker_id) = self.worker_id { - cc_info.set_worker_id(Some(worker_id)); - tokio::spawn(async move { - server.run_tcp_task(stream, cc_info).await; - }); - } else if let Some(rt) = g3_daemon::runtime::worker::select_handle() { - cc_info.set_worker_id(Some(rt.id)); - rt.handle.spawn(async move { - server.run_tcp_task(stream, cc_info).await; - }); - } else { - tokio::spawn(async move { - server.run_tcp_task(stream, cc_info).await; - }); - } - } - - fn get_rt_handle(&mut self, listen_in_worker: bool) -> Handle { - if listen_in_worker { - if let Some(rt) = g3_daemon::runtime::worker::select_listen_handle() { - self.worker_id = Some(rt.id); - return rt.handle; - } - } - Handle::current() - } - - fn into_running( - mut self, - listener: std::net::TcpListener, - listen_in_worker: bool, - server_reload_channel: broadcast::Receiver, - ) { - let handle = self.get_rt_handle(listen_in_worker); - handle.spawn(async move { - // make sure the listen socket associated with the correct reactor - match tokio::net::TcpListener::from_std(listener) { - Ok(listener) => { - self.pre_start(); - self.run(LimitedTcpListener::new(listener), server_reload_channel) - .await; - } - Err(e) => { - warn!( - "SRT[{}_v{}#{}] listen async: {e:?}", - self.server.name(), - self.server_version, - self.instance_id - ); - } - } - }); - } - - pub(crate) fn run_all_instances( - &self, - listen_config: &TcpListenConfig, - listen_in_worker: bool, - server_reload_sender: &broadcast::Sender, - ) -> anyhow::Result<()> { - let mut instance_count = listen_config.instance(); - if listen_in_worker { - let worker_count = g3_daemon::runtime::worker::worker_count(); - if worker_count > 0 { - instance_count = worker_count; - } - } - - for i in 0..instance_count { - let mut runtime = self.clone(); - runtime.instance_id = i; - - let listener = g3_socket::tcp::new_std_listener(listen_config)?; - runtime.into_running(listener, listen_in_worker, server_reload_sender.subscribe()); - } - Ok(()) - } -} diff --git a/g3tiles/src/serve/runtime/mod.rs b/g3tiles/src/serve/runtime/mod.rs deleted file mode 100644 index 33779bc2a..000000000 --- a/g3tiles/src/serve/runtime/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright 2023 ByteDance and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -mod listen_tcp; -pub(crate) use listen_tcp::ListenTcpRuntime; diff --git a/g3tiles/src/serve/rustls_proxy/server.rs b/g3tiles/src/serve/rustls_proxy/server.rs index 9c1e15782..b6d333def 100644 --- a/g3tiles/src/serve/rustls_proxy/server.rs +++ b/g3tiles/src/serve/rustls_proxy/server.rs @@ -24,8 +24,8 @@ use slog::Logger; use tokio::net::TcpStream; use tokio::sync::broadcast; -use g3_daemon::listen::ListenStats; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; +use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; use g3_types::route::HostMatch; @@ -34,8 +34,7 @@ use super::{CommonTaskContext, RustlsAcceptTask, RustlsHost, RustlsProxyServerSt use crate::config::server::rustls_proxy::RustlsProxyServerConfig; use crate::config::server::{AnyServerConfig, ServerConfig}; use crate::serve::{ - ArcServer, ArcServerStats, ListenTcpRuntime, Server, ServerInternal, ServerQuitPolicy, - ServerStats, + ArcServer, ArcServerStats, Server, ServerInternal, ServerQuitPolicy, ServerStats, }; pub(crate) struct RustlsProxyServer { @@ -200,7 +199,7 @@ impl ServerInternal for RustlsProxyServer { } fn _start_runtime(&self, server: &ArcServer) -> anyhow::Result<()> { - let runtime = ListenTcpRuntime::new(server, &*self.config); + let runtime = ListenTcpRuntime::new(server.clone(), server.get_listen_stats()); runtime .run_all_instances( &self.config.listen, @@ -216,16 +215,42 @@ impl ServerInternal for RustlsProxyServer { } } -#[async_trait] -impl Server for RustlsProxyServer { +impl BaseServer for RustlsProxyServer { + #[inline] fn name(&self) -> &MetricsName { self.config.name() } + #[inline] + fn server_type(&self) -> &'static str { + self.config.server_type() + } + + #[inline] fn version(&self) -> usize { self.reload_version } +} + +#[async_trait] +impl AcceptTcpServer for RustlsProxyServer { + async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { + let client_addr = cc_info.client_addr(); + self.server_stats.add_conn(client_addr); + if self.drop_early(client_addr) { + return; + } + + self.run_task(stream, cc_info).await + } + + fn get_reloaded(&self) -> ArcAcceptTcpServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} +#[async_trait] +impl Server for RustlsProxyServer { fn get_server_stats(&self) -> Option { Some(Arc::clone(&self.server_stats) as _) } @@ -242,14 +267,4 @@ impl Server for RustlsProxyServer { fn quit_policy(&self) -> &Arc { &self.quit_policy } - - async fn run_tcp_task(&self, stream: TcpStream, cc_info: ClientConnectionInfo) { - let client_addr = cc_info.client_addr(); - self.server_stats.add_conn(client_addr); - if self.drop_early(client_addr) { - return; - } - - self.run_task(stream, cc_info).await - } } From da6891f8a7bce21e11c1598fbf9d413a61a5f71b Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 20 Nov 2023 14:52:15 +0800 Subject: [PATCH 7/8] update ci config --- .github/workflows/linux.yml | 6 +++--- .github/workflows/macos.yml | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index e7282550d..91ac37a4a 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -13,8 +13,8 @@ jobs: matrix: rust: #- stable - - beta - - nightly + #- beta + - nightly steps: - name: Checkout sources uses: actions/checkout@v4 @@ -38,7 +38,7 @@ jobs: - name: Checkout sources uses: actions/checkout@v4 - name: Install rust toolchain - uses: dtolnay/rust-toolchain@beta + uses: dtolnay/rust-toolchain@nightly with: components: clippy - name: Install dependencies diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index fcd6b6582..4b6c761e3 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -13,7 +13,7 @@ jobs: matrix: rust: #- stable - - beta + #- beta - nightly steps: - name: Checkout sources @@ -37,7 +37,7 @@ jobs: - name: Checkout sources uses: actions/checkout@v4 - name: Install rust toolchain - uses: dtolnay/rust-toolchain@beta + uses: dtolnay/rust-toolchain@nightly with: components: clippy - name: Install dependencies From a693b009859b438a84c1624d3ea204ee33269935 Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 20 Nov 2023 16:05:44 +0800 Subject: [PATCH 8/8] switch to use common quic listen code --- Cargo.lock | 1 + g3proxy/src/serve/dummy_close/server.rs | 15 +- g3proxy/src/serve/http_proxy/server.rs | 62 +-- g3proxy/src/serve/http_rproxy/server.rs | 16 +- g3proxy/src/serve/intelli_proxy/server.rs | 16 +- g3proxy/src/serve/mod.rs | 12 +- g3proxy/src/serve/native_tls_port/mod.rs | 16 +- g3proxy/src/serve/plain_quic_port/mod.rs | 30 +- g3proxy/src/serve/plain_tcp_port/mod.rs | 16 +- g3proxy/src/serve/plain_tls_port/mod.rs | 16 +- g3proxy/src/serve/runtime/listen_quic.rs | 463 ---------------------- g3proxy/src/serve/runtime/mod.rs | 18 - g3proxy/src/serve/sni_proxy/server.rs | 16 +- g3proxy/src/serve/socks_proxy/server.rs | 16 +- g3proxy/src/serve/tcp_stream/server.rs | 62 +-- g3proxy/src/serve/tls_stream/server.rs | 16 +- g3tiles/Cargo.toml | 3 +- g3tiles/src/serve/dummy_close/server.rs | 14 +- g3tiles/src/serve/mod.rs | 6 +- g3tiles/src/serve/openssl_proxy/server.rs | 15 +- g3tiles/src/serve/plain_tcp_port/mod.rs | 15 +- g3tiles/src/serve/rustls_proxy/server.rs | 15 +- 22 files changed, 276 insertions(+), 583 deletions(-) delete mode 100644 g3proxy/src/serve/runtime/listen_quic.rs delete mode 100644 g3proxy/src/serve/runtime/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 3567e05ed..1e2d91435 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1672,6 +1672,7 @@ dependencies = [ "log", "once_cell", "openssl-probe", + "quinn", "rand", "rustc_version", "rustls", diff --git a/g3proxy/src/serve/dummy_close/server.rs b/g3proxy/src/serve/dummy_close/server.rs index 2e51262c7..e00368e6b 100644 --- a/g3proxy/src/serve/dummy_close/server.rs +++ b/g3proxy/src/serve/dummy_close/server.rs @@ -24,7 +24,9 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::metrics::MetricsName; @@ -151,6 +153,15 @@ impl AcceptTcpServer for DummyCloseServer { } } +#[async_trait] +impl AcceptQuicServer for DummyCloseServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for DummyCloseServer { fn escaper(&self) -> &MetricsName { @@ -187,6 +198,4 @@ impl Server for DummyCloseServer { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/http_proxy/server.rs b/g3proxy/src/serve/http_proxy/server.rs index 9adf69cd9..7f43c436c 100644 --- a/g3proxy/src/serve/http_proxy/server.rs +++ b/g3proxy/src/serve/http_proxy/server.rs @@ -30,7 +30,10 @@ use tokio::sync::{broadcast, mpsc}; use tokio_openssl::SslStream; use tokio_rustls::{server::TlsStream, TlsAcceptor}; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::acl_set::AclDstHostRuleSet; @@ -385,6 +388,38 @@ impl AcceptTcpServer for HttpProxyServer { } } +#[async_trait] +impl AcceptQuicServer for HttpProxyServer { + async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) { + let client_addr = cc_info.client_addr(); + self.server_stats.add_conn(client_addr); + if self.drop_early(client_addr) { + return; + } + + loop { + // TODO update ctx and quit gracefully + match connection.accept_bi().await { + Ok((send_stream, recv_stream)) => { + self.spawn_quic_stream_task(send_stream, recv_stream, cc_info.clone()) + } + Err(e) => { + debug!( + "{} - {} quic connection error: {e:?}", + cc_info.sock_local_addr(), + cc_info.sock_peer_addr() + ); + break; + } + } + } + } + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for HttpProxyServer { fn escaper(&self) -> &MetricsName { @@ -435,29 +470,4 @@ impl Server for HttpProxyServer { self.spawn_stream_task(stream, cc_info).await; } - - async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) { - let client_addr = cc_info.client_addr(); - self.server_stats.add_conn(client_addr); - if self.drop_early(client_addr) { - return; - } - - loop { - // TODO update ctx and quit gracefully - match connection.accept_bi().await { - Ok((send_stream, recv_stream)) => { - self.spawn_quic_stream_task(send_stream, recv_stream, cc_info.clone()) - } - Err(e) => { - debug!( - "{} - {} quic connection error: {e:?}", - cc_info.sock_local_addr(), - cc_info.sock_peer_addr() - ); - break; - } - } - } - } } diff --git a/g3proxy/src/serve/http_rproxy/server.rs b/g3proxy/src/serve/http_rproxy/server.rs index 4e235d7c8..774c13266 100644 --- a/g3proxy/src/serve/http_rproxy/server.rs +++ b/g3proxy/src/serve/http_rproxy/server.rs @@ -31,7 +31,10 @@ use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; use tokio_rustls::LazyConfigAcceptor; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; @@ -402,6 +405,15 @@ impl AcceptTcpServer for HttpRProxyServer { } } +#[async_trait] +impl AcceptQuicServer for HttpRProxyServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for HttpRProxyServer { fn escaper(&self) -> &MetricsName { @@ -452,6 +464,4 @@ impl Server for HttpRProxyServer { self.spawn_stream_task(stream, cc_info).await; } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/intelli_proxy/server.rs b/g3proxy/src/serve/intelli_proxy/server.rs index 48fbe3d1d..d2f6f6b83 100644 --- a/g3proxy/src/serve/intelli_proxy/server.rs +++ b/g3proxy/src/serve/intelli_proxy/server.rs @@ -26,7 +26,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; @@ -266,6 +269,15 @@ impl AcceptTcpServer for IntelliProxy { } } +#[async_trait] +impl AcceptQuicServer for IntelliProxy { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for IntelliProxy { fn escaper(&self) -> &MetricsName { @@ -302,6 +314,4 @@ impl Server for IntelliProxy { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/mod.rs b/g3proxy/src/serve/mod.rs index 0e1013e2d..7ea827d2a 100644 --- a/g3proxy/src/serve/mod.rs +++ b/g3proxy/src/serve/mod.rs @@ -17,13 +17,12 @@ use std::sync::Arc; use async_trait::async_trait; -use quinn::Connection; use tokio::net::TcpStream; use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ListenStats}; +use g3_daemon::listen::{AcceptQuicServer, AcceptTcpServer, ListenStats}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerQuitPolicy, ServerReloadCommand}; use g3_types::metrics::MetricsName; @@ -35,9 +34,6 @@ pub(crate) use registry::{foreach_online as foreach_server, get_names, get_or_in mod idle_check; pub(crate) use idle_check::ServerIdleChecker; -mod runtime; -use runtime::ListenQuicRuntime; - mod dummy_close; mod intelli_proxy; mod native_tls_port; @@ -90,7 +86,9 @@ pub(crate) trait ServerInternal { } #[async_trait] -pub(crate) trait Server: ServerInternal + BaseServer + AcceptTcpServer { +pub(crate) trait Server: + ServerInternal + BaseServer + AcceptTcpServer + AcceptQuicServer +{ fn escaper(&self) -> &MetricsName; fn user_group(&self) -> &MetricsName; fn auditor(&self) -> &MetricsName; @@ -106,8 +104,6 @@ pub(crate) trait Server: ServerInternal + BaseServer + AcceptTcpServer { async fn run_rustls_task(&self, stream: TlsStream, cc_info: ClientConnectionInfo); async fn run_openssl_task(&self, stream: SslStream, cc_info: ClientConnectionInfo); - - async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo); } pub(crate) type ArcServer = Arc; diff --git a/g3proxy/src/serve/native_tls_port/mod.rs b/g3proxy/src/serve/native_tls_port/mod.rs index 720d46256..7254d6553 100644 --- a/g3proxy/src/serve/native_tls_port/mod.rs +++ b/g3proxy/src/serve/native_tls_port/mod.rs @@ -29,7 +29,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; @@ -282,6 +285,15 @@ impl AcceptTcpServer for NativeTlsPort { } } +#[async_trait] +impl AcceptQuicServer for NativeTlsPort { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for NativeTlsPort { fn escaper(&self) -> &MetricsName { @@ -318,6 +330,4 @@ impl Server for NativeTlsPort { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/plain_quic_port/mod.rs b/g3proxy/src/serve/plain_quic_port/mod.rs index c167461de..2b50d69ec 100644 --- a/g3proxy/src/serve/plain_quic_port/mod.rs +++ b/g3proxy/src/serve/plain_quic_port/mod.rs @@ -26,7 +26,10 @@ use tokio::sync::{broadcast, watch}; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenQuicConf}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenQuicConf, + ListenQuicRuntime, ListenStats, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::AclNetworkRule; use g3_types::metrics::MetricsName; @@ -34,7 +37,7 @@ use g3_types::net::UdpListenConfig; use crate::config::server::plain_quic_port::{PlainQuicPortConfig, PlainQuicPortUpdateFlags}; use crate::config::server::{AnyServerConfig, ServerConfig}; -use crate::serve::{ArcServer, ListenQuicRuntime, Server, ServerInternal, ServerQuitPolicy}; +use crate::serve::{ArcServer, Server, ServerInternal, ServerQuitPolicy}; #[derive(Clone)] struct PlainQuicPortAuxConfig { @@ -229,7 +232,11 @@ impl ServerInternal for PlainQuicPort { fn _start_runtime(&self, server: &ArcServer) -> anyhow::Result<()> { let config = self.config.load(); - let runtime = ListenQuicRuntime::new(server, config.as_ref(), config.listen.clone()); + let runtime = ListenQuicRuntime::new( + server.clone(), + server.get_listen_stats(), + config.listen.clone(), + ); runtime.run_all_instances( config.listen_in_worker, &self.quinn_config, @@ -269,6 +276,18 @@ impl AcceptTcpServer for PlainQuicPort { } } +#[async_trait] +impl AcceptQuicServer for PlainQuicPort { + async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) { + let next_server = self.next_server.load().as_ref().clone(); + next_server.run_quic_task(connection, cc_info).await + } + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(&self.name) + } +} + #[async_trait] impl Server for PlainQuicPort { fn escaper(&self) -> &MetricsName { @@ -305,9 +324,4 @@ impl Server for PlainQuicPort { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) { - let next_server = self.next_server.load().as_ref().clone(); - next_server.run_quic_task(connection, cc_info).await; - } } diff --git a/g3proxy/src/serve/plain_tcp_port/mod.rs b/g3proxy/src/serve/plain_tcp_port/mod.rs index 53c89254f..f1189f5a5 100644 --- a/g3proxy/src/serve/plain_tcp_port/mod.rs +++ b/g3proxy/src/serve/plain_tcp_port/mod.rs @@ -26,7 +26,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; @@ -232,6 +235,15 @@ impl AcceptTcpServer for PlainTcpPort { } } +#[async_trait] +impl AcceptQuicServer for PlainTcpPort { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for PlainTcpPort { fn escaper(&self) -> &MetricsName { @@ -268,6 +280,4 @@ impl Server for PlainTcpPort { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/plain_tls_port/mod.rs b/g3proxy/src/serve/plain_tls_port/mod.rs index f5ca14be1..d6af042b8 100644 --- a/g3proxy/src/serve/plain_tls_port/mod.rs +++ b/g3proxy/src/serve/plain_tls_port/mod.rs @@ -28,7 +28,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::{server::TlsStream, TlsAcceptor}; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; @@ -270,6 +273,15 @@ impl AcceptTcpServer for PlainTlsPort { } } +#[async_trait] +impl AcceptQuicServer for PlainTlsPort { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for PlainTlsPort { fn escaper(&self) -> &MetricsName { @@ -306,6 +318,4 @@ impl Server for PlainTlsPort { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/runtime/listen_quic.rs b/g3proxy/src/serve/runtime/listen_quic.rs deleted file mode 100644 index a1f21ff74..000000000 --- a/g3proxy/src/serve/runtime/listen_quic.rs +++ /dev/null @@ -1,463 +0,0 @@ -/* - * Copyright 2023 ByteDance and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::io; -use std::net::{SocketAddr, UdpSocket}; -use std::os::fd::{AsRawFd, RawFd}; -use std::sync::Arc; -use std::time::Duration; - -use log::{info, warn}; -use quinn::{Connecting, Endpoint}; -use tokio::runtime::Handle; -use tokio::sync::{broadcast, watch}; - -use g3_daemon::listen::{ListenQuicConf, ListenStats}; -use g3_daemon::server::{ClientConnectionInfo, ServerReloadCommand}; -use g3_socket::util::native_socket_addr; -use g3_types::acl::AclAction; -use g3_types::net::UdpListenConfig; - -use crate::config::server::ServerConfig; -use crate::serve::ArcServer; - -#[derive(Clone)] -pub(crate) struct ListenQuicRuntime { - server: ArcServer, - server_type: &'static str, - server_version: usize, - worker_id: Option, - listen_config: UdpListenConfig, - listen_stats: Arc, - instance_id: usize, -} - -impl ListenQuicRuntime { - pub(crate) fn new( - server: &ArcServer, - server_config: &C, - listen_config: UdpListenConfig, - ) -> Self { - ListenQuicRuntime { - server: Arc::clone(server), - server_type: server_config.server_type(), - server_version: server.version(), - worker_id: None, - listen_config, - listen_stats: server.get_listen_stats(), - instance_id: 0, - } - } - - fn pre_start(&self) { - info!( - "started {} SRT[{}_v{}#{}]", - self.server_type, - self.server.name(), - self.server_version, - self.instance_id, - ); - self.listen_stats.add_running_runtime(); - } - - fn pre_stop(&self) { - info!( - "stopping {} SRT[{}_v{}#{}]", - self.server_type, - self.server.name(), - self.server_version, - self.instance_id, - ); - } - - fn post_stop(&self) { - info!( - "stopped {} SRT[{}_v{}#{}]", - self.server_type, - self.server.name(), - self.server_version, - self.instance_id, - ); - self.listen_stats.del_running_runtime(); - } - - async fn run( - mut self, - listener: Endpoint, - mut listen_addr: SocketAddr, - mut sock_raw_fd: RawFd, - mut server_reload_channel: broadcast::Receiver, - mut quic_cfg_receiver: watch::Receiver, - ) where - C: ListenQuicConf + Send + Clone + 'static, - { - use broadcast::error::RecvError; - - let mut aux_config = quic_cfg_receiver.borrow().clone(); - - loop { - tokio::select! { - biased; - - ev = server_reload_channel.recv() => { - match ev { - Ok(ServerReloadCommand::ReloadVersion(version)) => { - info!("SRT[{}_v{}#{}] received reload request from v{version}", - self.server.name(), self.server_version, self.instance_id); - match crate::serve::get_server(self.server.name()) { - Ok(server) => { - self.server_version = server.version(); - self.server = server; - continue; - } - Err(_) => { - info!("SRT[{}_v{}#{}] will quit as no server v{version}+ found", - self.server.name(), self.server_version, self.instance_id); - } - } - } - Ok(ServerReloadCommand::QuitRuntime) => {}, - Err(RecvError::Closed) => {}, - Err(RecvError::Lagged(dropped)) => { - warn!("SRT[{}_v{}#{}] server {} reload notify channel overflowed, {dropped} msg dropped", - self.server.name(), self.server_version, self.instance_id, self.server.name()); - continue; - }, - } - - info!("SRT[{}_v{}#{}] will go offline", - self.server.name(), self.server_version, self.instance_id); - self.pre_stop(); - self.goto_offline(listener, listen_addr, aux_config.offline_rebind_port()); - break; - } - ev = quic_cfg_receiver.changed() => { - if ev.is_err() { - warn!("SRT[{}_v{}#{}] quit as quic cfg channel closed", - self.server.name(), self.server_version, self.instance_id); - self.goto_close(listener); - break; - } - aux_config = quic_cfg_receiver.borrow().clone(); - if let Some(quinn_config) = aux_config.take_quinn_config() { - listener.set_server_config(Some(quinn_config)); - } - if let Some(listen_config) = aux_config.take_udp_listen_config() { - self.listen_config = listen_config; - if self.listen_config.address() != listen_addr { - if let Ok((fd, addr)) = self.rebind_socket(&listener) { - sock_raw_fd = fd; - listen_addr = addr; - } - } else { - self.update_socket_opts(sock_raw_fd); - } - } - } - result = listener.accept() => { - let Some(connecting) = result else { - continue; - }; - self.listen_stats.add_accepted(); - self.run_task(connecting, listen_addr, &aux_config); - } - } - } - self.post_stop(); - } - - fn run_task(&self, connecting: Connecting, listen_addr: SocketAddr, aux_config: &C) - where - C: ListenQuicConf + Send + Clone + 'static, - { - let peer_addr = connecting.remote_address(); - if let Some(filter) = aux_config.ingress_network_acl() { - let (_, action) = filter.check(peer_addr.ip()); - match action { - AclAction::Permit | AclAction::PermitAndLog => {} - AclAction::Forbid | AclAction::ForbidAndLog => { - self.listen_stats.add_dropped(); - return; - } - } - } - - let local_addr = connecting - .local_ip() - .map(|ip| SocketAddr::new(ip, listen_addr.port())) - .unwrap_or(listen_addr); - let mut cc_info = ClientConnectionInfo::new( - native_socket_addr(peer_addr), - native_socket_addr(local_addr), - ); - - let server = self.server.clone(); - let listen_stats = self.listen_stats.clone(); - let accept_timeout = aux_config.accept_timeout(); - if let Some(worker_id) = self.worker_id { - cc_info.set_worker_id(Some(worker_id)); - tokio::spawn(async move { - Self::accept_connection_and_run( - server, - connecting, - cc_info, - accept_timeout, - listen_stats, - ) - .await - }); - } else if let Some(rt) = g3_daemon::runtime::worker::select_handle() { - cc_info.set_worker_id(Some(rt.id)); - rt.handle.spawn(async move { - Self::accept_connection_and_run( - server, - connecting, - cc_info, - accept_timeout, - listen_stats, - ) - .await - }); - } else { - tokio::spawn(async move { - Self::accept_connection_and_run( - server, - connecting, - cc_info, - accept_timeout, - listen_stats, - ) - .await - }); - } - } - - async fn accept_connection_and_run( - server: ArcServer, - connecting: Connecting, - cc_info: ClientConnectionInfo, - timeout: Duration, - listen_stats: Arc, - ) { - match tokio::time::timeout(timeout, connecting).await { - Ok(Ok(c)) => { - listen_stats.add_accepted(); - server.run_quic_task(c, cc_info).await; - } - Ok(Err(_e)) => { - listen_stats.add_failed(); - // TODO may be attack - } - Err(_) => { - listen_stats.add_failed(); - // TODO may be attack - } - } - } - - fn update_socket_opts(&self, raw_fd: RawFd) { - if let Err(e) = g3_socket::udp::set_raw_opts(raw_fd, self.listen_config.socket_misc_opts()) - { - warn!( - "SRT[{}_v{}#{}] update socket misc opts failed: {e}", - self.server.name(), - self.server_version, - self.instance_id, - ); - } - if let Err(e) = g3_socket::udp::set_raw_buf_opts(raw_fd, self.listen_config.socket_buffer()) - { - warn!( - "SRT[{}_v{}#{}] update socket buf opts failed: {e}", - self.server.name(), - self.server_version, - self.instance_id, - ); - } - } - - fn rebind_socket(&self, listener: &Endpoint) -> io::Result<(RawFd, SocketAddr)> { - match g3_socket::udp::new_std_bind_listen(&self.listen_config) { - Ok(socket) => { - let raw_fd = socket.as_raw_fd(); - match listener.rebind(socket) { - Ok(_) => Ok((raw_fd, listener.local_addr().unwrap())), - Err(e) => { - warn!( - "SRT[{}_v{}#{}] reload rebind {} failed: {e}", - self.server.name(), - self.server_version, - self.instance_id, - self.listen_config.address() - ); - Err(e) - } - } - } - Err(e) => { - warn!( - "SRT[{}_v{}#{}] reload create new socket {} failed: {e}", - self.server.name(), - self.server_version, - self.instance_id, - self.listen_config.address() - ); - Err(e) - } - } - } - - fn goto_offline(&self, listener: Endpoint, listen_addr: SocketAddr, rebind_port: Option) { - if let Some(port) = rebind_port { - let rebind_addr = SocketAddr::new(listen_addr.ip(), port); - match g3_socket::udp::new_std_rebind_listen( - &self.listen_config, - SocketAddr::new(listen_addr.ip(), port), - ) { - Ok(socket) => match listener.rebind(socket) { - Ok(_) => { - info!( - "SRT[{}_v{}#{}] re-bound to: {rebind_addr}", - self.server.name(), - self.server_version, - self.instance_id - ); - listener.reject_new_connections(); - tokio::spawn(async move { listener.wait_idle().await }); - return; - } - Err(e) => { - warn!( - "SRT[{}_v{}#{}] rebind failed: {e}", - self.server.name(), - self.server_version, - self.instance_id - ); - } - }, - Err(e) => { - warn!( - "SRT[{}_v{}#{}] create rebind socket failed: {e}", - self.server.name(), - self.server_version, - self.instance_id - ); - } - } - } - self.goto_close(listener); - } - - fn goto_close(&self, listener: Endpoint) { - info!( - "SRT[{}_v{}#{}] will close all quic connections immediately", - self.server.name(), - self.server_version, - self.instance_id - ); - listener.close(quinn::VarInt::default(), b"close as server shutdown"); - } - - fn get_rt_handle(&mut self, listen_in_worker: bool) -> Handle { - if listen_in_worker { - if let Some(rt) = g3_daemon::runtime::worker::select_listen_handle() { - self.worker_id = Some(rt.id); - return rt.handle; - } - } - Handle::current() - } - - fn into_running( - mut self, - socket: UdpSocket, - listen_addr: SocketAddr, - config: quinn::ServerConfig, - listen_in_worker: bool, - server_reload_channel: broadcast::Receiver, - quic_cfg_receiver: watch::Receiver, - ) where - C: ListenQuicConf + Clone + Send + Sync + 'static, - { - let handle = self.get_rt_handle(listen_in_worker); - handle.spawn(async move { - let sock_raw_fd = socket.as_raw_fd(); - // make sure the listen socket associated with the correct reactor - match Endpoint::new( - Default::default(), - Some(config), - socket, - Arc::new(quinn::TokioRuntime), - ) { - Ok(endpoint) => { - self.pre_start(); - self.run( - endpoint, - listen_addr, - sock_raw_fd, - server_reload_channel, - quic_cfg_receiver, - ) - .await; - } - Err(e) => { - warn!( - "SRT[{}_v{}#{}] listen async: {e:?}", - self.server.name(), - self.server_version, - self.instance_id - ); - } - } - }); - } - - pub(crate) fn run_all_instances( - &self, - listen_in_worker: bool, - quic_config: &quinn::ServerConfig, - server_reload_sender: &broadcast::Sender, - quic_cfg_receiver: &watch::Sender, - ) -> anyhow::Result<()> - where - C: ListenQuicConf + Clone + Send + Sync + 'static, - { - let mut instance_count = self.listen_config.instance(); - if listen_in_worker { - let worker_count = g3_daemon::runtime::worker::worker_count(); - if worker_count > 0 { - instance_count = worker_count; - } - } - - for i in 0..instance_count { - let mut runtime = self.clone(); - runtime.instance_id = i; - - let socket = g3_socket::udp::new_std_bind_listen(&self.listen_config)?; - let listen_addr = socket.local_addr()?; - runtime.into_running( - socket, - listen_addr, - quic_config.clone(), - listen_in_worker, - server_reload_sender.subscribe(), - quic_cfg_receiver.subscribe(), - ); - } - Ok(()) - } -} diff --git a/g3proxy/src/serve/runtime/mod.rs b/g3proxy/src/serve/runtime/mod.rs deleted file mode 100644 index daf2fac24..000000000 --- a/g3proxy/src/serve/runtime/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright 2023 ByteDance and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -mod listen_quic; -pub(crate) use listen_quic::ListenQuicRuntime; diff --git a/g3proxy/src/serve/sni_proxy/server.rs b/g3proxy/src/serve/sni_proxy/server.rs index fda18502d..896818c14 100644 --- a/g3proxy/src/serve/sni_proxy/server.rs +++ b/g3proxy/src/serve/sni_proxy/server.rs @@ -27,7 +27,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_dpi::ProtocolPortMap; use g3_types::acl::{AclAction, AclNetworkRule}; @@ -258,6 +261,15 @@ impl AcceptTcpServer for SniProxyServer { } } +#[async_trait] +impl AcceptQuicServer for SniProxyServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for SniProxyServer { fn escaper(&self) -> &MetricsName { @@ -298,6 +310,4 @@ impl Server for SniProxyServer { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/socks_proxy/server.rs b/g3proxy/src/serve/socks_proxy/server.rs index fa69a7077..3a4765ba8 100644 --- a/g3proxy/src/serve/socks_proxy/server.rs +++ b/g3proxy/src/serve/socks_proxy/server.rs @@ -27,7 +27,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::acl_set::AclDstHostRuleSet; @@ -267,6 +270,15 @@ impl AcceptTcpServer for SocksProxyServer { } } +#[async_trait] +impl AcceptQuicServer for SocksProxyServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for SocksProxyServer { fn escaper(&self) -> &MetricsName { @@ -307,6 +319,4 @@ impl Server for SocksProxyServer { self.server_stats.add_conn(cc_info.client_addr()); self.listen_stats.add_dropped(); } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3proxy/src/serve/tcp_stream/server.rs b/g3proxy/src/serve/tcp_stream/server.rs index 531d45b08..5cf11cad5 100644 --- a/g3proxy/src/serve/tcp_stream/server.rs +++ b/g3proxy/src/serve/tcp_stream/server.rs @@ -29,7 +29,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::collection::{SelectivePickPolicy, SelectiveVec, SelectiveVecBuilder}; @@ -332,6 +335,38 @@ impl AcceptTcpServer for TcpStreamServer { } } +#[async_trait] +impl AcceptQuicServer for TcpStreamServer { + async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) { + let client_addr = cc_info.client_addr(); + self.server_stats.add_conn(client_addr); + if self.drop_early(client_addr) { + return; + } + + loop { + // TODO update ctx and quit gracefully + match connection.accept_bi().await { + Ok((send_stream, recv_stream)) => { + self.run_task_with_quic_stream(send_stream, recv_stream, cc_info.clone()) + } + Err(e) => { + debug!( + "{} - {} quic connection error: {e:?}", + cc_info.sock_local_addr(), + cc_info.sock_peer_addr() + ); + break; + } + } + } + } + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for TcpStreamServer { fn escaper(&self) -> &MetricsName { @@ -382,29 +417,4 @@ impl Server for TcpStreamServer { self.run_task_with_stream(stream, cc_info).await } - - async fn run_quic_task(&self, connection: Connection, cc_info: ClientConnectionInfo) { - let client_addr = cc_info.client_addr(); - self.server_stats.add_conn(client_addr); - if self.drop_early(client_addr) { - return; - } - - loop { - // TODO update ctx and quit gracefully - match connection.accept_bi().await { - Ok((send_stream, recv_stream)) => { - self.run_task_with_quic_stream(send_stream, recv_stream, cc_info.clone()) - } - Err(e) => { - debug!( - "{} - {} quic connection error: {e:?}", - cc_info.sock_local_addr(), - cc_info.sock_peer_addr() - ); - break; - } - } - } - } } diff --git a/g3proxy/src/serve/tls_stream/server.rs b/g3proxy/src/serve/tls_stream/server.rs index a12e96f17..caa6a2297 100644 --- a/g3proxy/src/serve/tls_stream/server.rs +++ b/g3proxy/src/serve/tls_stream/server.rs @@ -29,7 +29,10 @@ use tokio::sync::broadcast; use tokio_openssl::SslStream; use tokio_rustls::{server::TlsStream, TlsAcceptor}; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::collection::{SelectivePickPolicy, SelectiveVec, SelectiveVecBuilder}; @@ -330,6 +333,15 @@ impl AcceptTcpServer for TlsStreamServer { } } +#[async_trait] +impl AcceptQuicServer for TlsStreamServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for TlsStreamServer { fn escaper(&self) -> &MetricsName { @@ -370,6 +382,4 @@ impl Server for TlsStreamServer { _cc_info: ClientConnectionInfo, ) { } - - async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} } diff --git a/g3tiles/Cargo.toml b/g3tiles/Cargo.toml index 5c8454ce4..936f602fa 100644 --- a/g3tiles/Cargo.toml +++ b/g3tiles/Cargo.toml @@ -32,12 +32,13 @@ openssl.workspace = true openssl-probe = { workspace = true, optional = true } tokio-openssl.workspace = true rustls.workspace = true +quinn.workspace = true tokio-rustls.workspace = true governor = { workspace = true, features = ["std", "jitter"] } chrono = { workspace = true, features = ["clock"] } uuid.workspace = true g3-compat.workspace = true -g3-daemon.workspace = true +g3-daemon = { workspace = true, features = ["quic"] } g3-signal.workspace = true g3-yaml = { workspace = true, features = ["acl-rule", "route", "openssl", "rustls"] } g3-types = { workspace = true, features = ["acl-rule", "route", "openssl", "rustls"] } diff --git a/g3tiles/src/serve/dummy_close/server.rs b/g3tiles/src/serve/dummy_close/server.rs index 2e2720eb4..3720c0d75 100644 --- a/g3tiles/src/serve/dummy_close/server.rs +++ b/g3tiles/src/serve/dummy_close/server.rs @@ -18,10 +18,13 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; +use quinn::Connection; use tokio::net::TcpStream; use tokio::sync::broadcast; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::metrics::MetricsName; @@ -140,6 +143,15 @@ impl AcceptTcpServer for DummyCloseServer { } } +#[async_trait] +impl AcceptQuicServer for DummyCloseServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for DummyCloseServer { fn get_listen_stats(&self) -> Arc { diff --git a/g3tiles/src/serve/mod.rs b/g3tiles/src/serve/mod.rs index 8c94bc3a5..38457f806 100644 --- a/g3tiles/src/serve/mod.rs +++ b/g3tiles/src/serve/mod.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use async_trait::async_trait; use tokio::sync::broadcast; -use g3_daemon::listen::{AcceptTcpServer, ListenStats}; +use g3_daemon::listen::{AcceptQuicServer, AcceptTcpServer, ListenStats}; use g3_daemon::server::{BaseServer, ServerQuitPolicy, ServerReloadCommand}; use g3_types::metrics::MetricsName; @@ -66,7 +66,9 @@ pub(crate) trait ServerInternal { } #[async_trait] -pub(crate) trait Server: ServerInternal + BaseServer + AcceptTcpServer { +pub(crate) trait Server: + ServerInternal + BaseServer + AcceptTcpServer + AcceptQuicServer +{ fn get_server_stats(&self) -> Option { None } diff --git a/g3tiles/src/serve/openssl_proxy/server.rs b/g3tiles/src/serve/openssl_proxy/server.rs index c853dbfc7..62ade2b2f 100644 --- a/g3tiles/src/serve/openssl_proxy/server.rs +++ b/g3tiles/src/serve/openssl_proxy/server.rs @@ -23,11 +23,15 @@ use async_trait::async_trait; use log::warn; use openssl::ex_data::Index; use openssl::ssl::{Ssl, SslContext}; +use quinn::Connection; use slog::Logger; use tokio::net::TcpStream; use tokio::sync::broadcast; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; @@ -342,6 +346,15 @@ impl AcceptTcpServer for OpensslProxyServer { } } +#[async_trait] +impl AcceptQuicServer for OpensslProxyServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for OpensslProxyServer { fn get_server_stats(&self) -> Option { diff --git a/g3tiles/src/serve/plain_tcp_port/mod.rs b/g3tiles/src/serve/plain_tcp_port/mod.rs index 6f624d29a..044861d4e 100644 --- a/g3tiles/src/serve/plain_tcp_port/mod.rs +++ b/g3tiles/src/serve/plain_tcp_port/mod.rs @@ -20,10 +20,14 @@ use std::sync::Arc; use anyhow::anyhow; use arc_swap::ArcSwap; use async_trait::async_trait; +use quinn::Connection; use tokio::net::TcpStream; use tokio::sync::broadcast; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_io_ext::haproxy::{ProxyProtocolV1Reader, ProxyProtocolV2Reader}; use g3_types::acl::{AclAction, AclNetworkRule}; @@ -223,6 +227,15 @@ impl AcceptTcpServer for PlainTcpPort { } } +#[async_trait] +impl AcceptQuicServer for PlainTcpPort { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for PlainTcpPort { fn get_listen_stats(&self) -> Arc { diff --git a/g3tiles/src/serve/rustls_proxy/server.rs b/g3tiles/src/serve/rustls_proxy/server.rs index b6d333def..f372a3d44 100644 --- a/g3tiles/src/serve/rustls_proxy/server.rs +++ b/g3tiles/src/serve/rustls_proxy/server.rs @@ -20,11 +20,15 @@ use std::sync::Arc; use ahash::AHashMap; use anyhow::anyhow; use async_trait::async_trait; +use quinn::Connection; use slog::Logger; use tokio::net::TcpStream; use tokio::sync::broadcast; -use g3_daemon::listen::{AcceptTcpServer, ArcAcceptTcpServer, ListenStats, ListenTcpRuntime}; +use g3_daemon::listen::{ + AcceptQuicServer, AcceptTcpServer, ArcAcceptQuicServer, ArcAcceptTcpServer, ListenStats, + ListenTcpRuntime, +}; use g3_daemon::server::{BaseServer, ClientConnectionInfo, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; use g3_types::metrics::MetricsName; @@ -249,6 +253,15 @@ impl AcceptTcpServer for RustlsProxyServer { } } +#[async_trait] +impl AcceptQuicServer for RustlsProxyServer { + async fn run_quic_task(&self, _connection: Connection, _cc_info: ClientConnectionInfo) {} + + fn get_reloaded(&self) -> ArcAcceptQuicServer { + crate::serve::get_or_insert_default(self.config.name()) + } +} + #[async_trait] impl Server for RustlsProxyServer { fn get_server_stats(&self) -> Option {