Skip to content

Use kanal #200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 11 additions & 33 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ average = "0.15.1"
byte-unit = "5.1.4"
clap = { version = "4.5.9", features = ["derive"] }
float-ord = "0.3.2"
flume = "0.11"
kanal = "0.1.1"
humantime = "2.1.0"
libc = "0.2.155"
serde = { version = "1.0.204", features = ["derive"] }
Expand Down
60 changes: 32 additions & 28 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ async fn setup_http2(client: &Client) -> Result<(ConnectionTime, SendRequestHttp
async fn work_http2_once(
client: &Client,
client_state: &mut ClientStateHttp2,
report_tx: &flume::Sender<Result<RequestResult, ClientError>>,
report_tx: &kanal::Sender<Result<RequestResult, ClientError>>,
connection_time: ConnectionTime,
start_latency_correction: Option<Instant>,
) -> (bool, bool) {
Expand Down Expand Up @@ -982,7 +982,7 @@ pub async fn work_debug<W: Write>(w: &mut W, client: Arc<Client>) -> Result<(),
/// Run n tasks by m workers
pub async fn work(
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
report_tx: kanal::Sender<Result<RequestResult, ClientError>>,
n_tasks: usize,
n_connections: usize,
n_http2_parallel: usize,
Expand Down Expand Up @@ -1094,13 +1094,13 @@ pub async fn work(
/// n tasks by m workers limit to qps works in a second
pub async fn work_with_qps(
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
report_tx: kanal::Sender<Result<RequestResult, ClientError>>,
query_limit: QueryLimit,
n_tasks: usize,
n_connections: usize,
n_http2_parallel: usize,
) {
let (tx, rx) = flume::unbounded();
let (tx, rx) = kanal::unbounded();

let work_queue = async move {
match query_limit {
Expand Down Expand Up @@ -1135,9 +1135,10 @@ pub async fn work_with_qps(
}
// tx gone
drop(tx);
Ok::<(), flume::SendError<_>>(())
Ok::<(), kanal::SendError>(())
};

let rx = rx.to_async();
if client.is_work_http2() {
let futures = (0..n_connections)
.map(|_| {
Expand All @@ -1158,7 +1159,7 @@ pub async fn work_with_qps(
send_request: send_request.clone(),
};
tokio::spawn(async move {
while let Ok(()) = rx.recv_async().await {
while let Ok(()) = rx.recv().await {
let (is_cancel, is_reconnect) = work_http2_once(
&client,
&mut client_state,
Expand Down Expand Up @@ -1196,7 +1197,7 @@ pub async fn work_with_qps(
}
Err(err) => {
// Consume a task
if let Ok(()) = rx.recv_async().await {
if let Ok(()) = rx.recv().await {
report_tx.send(Err(err)).unwrap();
} else {
return;
Expand All @@ -1220,7 +1221,7 @@ pub async fn work_with_qps(
let client = client.clone();
tokio::spawn(async move {
let mut client_state = ClientStateHttp1::default();
while let Ok(()) = rx.recv_async().await {
while let Ok(()) = rx.recv().await {
let res = client.work_http1(&mut client_state).await;
let is_cancel = is_cancel_error(&res);
report_tx.send(res).unwrap();
Expand All @@ -1242,13 +1243,13 @@ pub async fn work_with_qps(
/// n tasks by m workers limit to qps works in a second with latency correction
pub async fn work_with_qps_latency_correction(
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
report_tx: kanal::Sender<Result<RequestResult, ClientError>>,
query_limit: QueryLimit,
n_tasks: usize,
n_connections: usize,
n_http2_parallel: usize,
) {
let (tx, rx) = flume::unbounded();
let (tx, rx) = kanal::unbounded();

let work_queue = async move {
match query_limit {
Expand Down Expand Up @@ -1286,9 +1287,10 @@ pub async fn work_with_qps_latency_correction(

// tx gone
drop(tx);
Ok::<(), flume::SendError<_>>(())
Ok::<(), kanal::SendError>(())
};

let rx = rx.to_async();
if client.is_work_http2() {
let futures = (0..n_connections)
.map(|_| {
Expand All @@ -1309,7 +1311,7 @@ pub async fn work_with_qps_latency_correction(
send_request: send_request.clone(),
};
tokio::spawn(async move {
while let Ok(start) = rx.recv_async().await {
while let Ok(start) = rx.recv().await {
let (is_cancel, is_reconnect) = work_http2_once(
&client,
&mut client_state,
Expand Down Expand Up @@ -1347,7 +1349,7 @@ pub async fn work_with_qps_latency_correction(
}
Err(err) => {
// Consume a task
if rx.recv_async().await.is_ok() {
if rx.recv().await.is_ok() {
report_tx.send(Err(err)).unwrap();
} else {
return;
Expand All @@ -1371,7 +1373,7 @@ pub async fn work_with_qps_latency_correction(
let report_tx = report_tx.clone();
let rx = rx.clone();
tokio::spawn(async move {
while let Ok(start) = rx.recv_async().await {
while let Ok(start) = rx.recv().await {
let mut res = client.work_http1(&mut client_state).await;
set_start_latency_correction(&mut res, start);
let is_cancel = is_cancel_error(&res);
Expand All @@ -1394,7 +1396,7 @@ pub async fn work_with_qps_latency_correction(
/// Run until dead_line by n workers
pub async fn work_until(
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
report_tx: kanal::Sender<Result<RequestResult, ClientError>>,
dead_line: std::time::Instant,
n_connections: usize,
n_http2_parallel: usize,
Expand Down Expand Up @@ -1538,7 +1540,7 @@ pub async fn work_until(
#[allow(clippy::too_many_arguments)]
pub async fn work_until_with_qps(
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
report_tx: kanal::Sender<Result<RequestResult, ClientError>>,
query_limit: QueryLimit,
start: std::time::Instant,
dead_line: std::time::Instant,
Expand All @@ -1548,7 +1550,7 @@ pub async fn work_until_with_qps(
) {
let rx = match query_limit {
QueryLimit::Qps(qps) => {
let (tx, rx) = flume::unbounded();
let (tx, rx) = kanal::unbounded();
tokio::spawn(async move {
for i in 0.. {
if std::time::Instant::now() > dead_line {
Expand All @@ -1565,7 +1567,7 @@ pub async fn work_until_with_qps(
rx
}
QueryLimit::Burst(duration, rate) => {
let (tx, rx) = flume::unbounded();
let (tx, rx) = kanal::unbounded();
tokio::spawn(async move {
// Handle via rate till deadline is reached
for _ in 0.. {
Expand All @@ -1584,6 +1586,7 @@ pub async fn work_until_with_qps(
}
};

let rx = rx.to_async();
if client.is_work_http2() {
let s = Arc::new(tokio::sync::Semaphore::new(0));

Expand All @@ -1608,7 +1611,7 @@ pub async fn work_until_with_qps(
};
let s = s.clone();
tokio::spawn(async move {
while let Ok(()) = rx.recv_async().await {
while let Ok(()) = rx.recv().await {
let (is_cancel, is_reconnect) = work_http2_once(
&client,
&mut client_state,
Expand Down Expand Up @@ -1655,7 +1658,7 @@ pub async fn work_until_with_qps(
}
Err(err) => {
// Consume a task
if rx.recv_async().await.is_ok() {
if rx.recv().await.is_ok() {
report_tx.send(Err(err)).unwrap();
} else {
return;
Expand Down Expand Up @@ -1688,7 +1691,7 @@ pub async fn work_until_with_qps(
let rx = rx.clone();
let is_end = is_end.clone();
tokio::spawn(async move {
while let Ok(()) = rx.recv_async().await {
while let Ok(()) = rx.recv().await {
let res = client.work_http1(&mut client_state).await;
let is_cancel = is_cancel_error(&res);
report_tx.send(res).unwrap();
Expand Down Expand Up @@ -1724,15 +1727,15 @@ pub async fn work_until_with_qps(
#[allow(clippy::too_many_arguments)]
pub async fn work_until_with_qps_latency_correction(
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
report_tx: kanal::Sender<Result<RequestResult, ClientError>>,
query_limit: QueryLimit,
start: std::time::Instant,
dead_line: std::time::Instant,
n_connections: usize,
n_http2_parallel: usize,
wait_ongoing_requests_after_deadline: bool,
) {
let (tx, rx) = flume::unbounded();
let (tx, rx) = kanal::unbounded();
match query_limit {
QueryLimit::Qps(qps) => {
tokio::spawn(async move {
Expand Down Expand Up @@ -1769,6 +1772,7 @@ pub async fn work_until_with_qps_latency_correction(
}
};

let rx = rx.to_async();
if client.is_work_http2() {
let s = Arc::new(tokio::sync::Semaphore::new(0));

Expand All @@ -1793,7 +1797,7 @@ pub async fn work_until_with_qps_latency_correction(
};
let s = s.clone();
tokio::spawn(async move {
while let Ok(start) = rx.recv_async().await {
while let Ok(start) = rx.recv().await {
let (is_cancel, is_reconnect) = work_http2_once(
&client,
&mut client_state,
Expand Down Expand Up @@ -1839,7 +1843,7 @@ pub async fn work_until_with_qps_latency_correction(
}

Err(err) => {
if rx.recv_async().await.is_ok() {
if rx.recv().await.is_ok() {
report_tx.send(Err(err)).unwrap();
} else {
return;
Expand Down Expand Up @@ -1872,7 +1876,7 @@ pub async fn work_until_with_qps_latency_correction(
let rx = rx.clone();
let is_end = is_end.clone();
tokio::spawn(async move {
while let Ok(start) = rx.recv_async().await {
while let Ok(start) = rx.recv().await {
let mut res = client.work_http1(&mut client_state).await;
set_start_latency_correction(&mut res, start);
let is_cancel = is_cancel_error(&res);
Expand Down Expand Up @@ -1924,7 +1928,7 @@ pub mod fast {
/// Run n tasks by m workers
pub async fn work(
client: Arc<Client>,
report_tx: flume::Sender<ResultData>,
report_tx: kanal::Sender<ResultData>,
n_tasks: usize,
n_connections: usize,
n_http2_parallel: usize,
Expand Down Expand Up @@ -2128,7 +2132,7 @@ pub mod fast {
/// Run until dead_line by n workers
pub async fn work_until(
client: Arc<Client>,
report_tx: flume::Sender<ResultData>,
report_tx: kanal::Sender<ResultData>,
dead_line: std::time::Instant,
n_connections: usize,
n_http2_parallel: usize,
Expand Down
Loading
Loading