diff --git a/src/sinks/util/auto_concurrency/controller.rs b/src/sinks/util/auto_concurrency/controller.rs index dc7d4f0163072..916f95e1230cb 100644 --- a/src/sinks/util/auto_concurrency/controller.rs +++ b/src/sinks/util/auto_concurrency/controller.rs @@ -5,7 +5,7 @@ use crate::internal_events::{ AutoConcurrencyAveragedRtt, AutoConcurrencyInFlight, AutoConcurrencyLimit, AutoConcurrencyObservedRtt, }; -use crate::sinks::util::retries::RetryLogic; +use crate::sinks::util::retries::{RetryAction, RetryLogic}; #[cfg(test)] use crate::test_util::stats::{TimeHistogram, TimeWeightedSum}; use std::future::Future; @@ -229,8 +229,14 @@ where start: Instant, response: &Result, ) { - let is_back_pressure = match response { - Ok(_) => false, + // It would be better to avoid generating the string in Retry(_) + // just to throw it away here, but it's probably not worth the + // effort. + let response_action = response + .as_ref() + .map(|resp| self.logic.should_retry_response(resp)); + let is_back_pressure = match &response_action { + Ok(action) => matches!(action, RetryAction::Retry(_)), Err(err) => { if let Some(err) = err.downcast_ref::() { self.logic.is_retriable_error(err) @@ -242,7 +248,7 @@ where } }; // Only adjust to the RTT when the request was successfully processed. - let use_rtt = response.is_ok(); + let use_rtt = matches!(response_action, Ok(RetryAction::Successful)); self.adjust_to_response_inner(start, is_back_pressure, use_rtt) } } diff --git a/src/sinks/util/auto_concurrency/tests.rs b/src/sinks/util/auto_concurrency/tests.rs index a5d3c07c7864d..3a0ae1f946a4b 100644 --- a/src/sinks/util/auto_concurrency/tests.rs +++ b/src/sinks/util/auto_concurrency/tests.rs @@ -410,7 +410,7 @@ async fn defers_at_high_concurrency() { // Since the concurrency will drop down by half each time, the // average will be below this maximum. assert_within!(in_flight.mode, 2, 4, "{:#?}", results); - assert_within!(in_flight.mean, 2.0, 4.0, "{:#?}", results); + assert_within!(in_flight.mean, 2.0, 5.0, "{:#?}", results); let observed_rtt = results.cstats.observed_rtt.stats().unwrap(); assert_within!(observed_rtt.min, 0.090, 0.120, "{:#?}", results); @@ -427,7 +427,7 @@ async fn defers_at_high_concurrency() { let c_in_flight = results.cstats.in_flight.stats().unwrap(); assert_within!(c_in_flight.max, 5, 6, "{:#?}", results); assert_within!(c_in_flight.mode, 2, 4, "{:#?}", results); - assert_within!(c_in_flight.mean, 2.0, 4.0, "{:#?}", results); + assert_within!(c_in_flight.mean, 2.0, 5.0, "{:#?}", results); } #[tokio::test]