diff --git a/utilities/rate_limiters/write_amp_based_rate_limiter.cc b/utilities/rate_limiters/write_amp_based_rate_limiter.cc index 3d5cfcd647af..7c406479fdf9 100644 --- a/utilities/rate_limiters/write_amp_based_rate_limiter.cc +++ b/utilities/rate_limiters/write_amp_based_rate_limiter.cc @@ -192,7 +192,11 @@ void WriteAmpBasedRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, (!queue_[Env::IO_LOW].empty() && &r == queue_[Env::IO_LOW].front()))) { leader_ = &r; int64_t delta = next_refill_us_ - NowMicrosMonotonic(env_); - delta = delta > 0 ? delta : 0; + // Clamp delta between 0 and refill_period_us_: + // (1) set negative values to 0 + // (2) cap maximum wait time to refill_period_us_ to prevent excessive + // delays that could occur due to clock skew. + delta = delta > 0 ? std::min(delta, refill_period_us_) : 0; if (delta == 0) { timedout = true; } else { @@ -328,19 +332,34 @@ Status WriteAmpBasedRateLimiter::Tune() { // lower bound for write amplification estimation const int kRatioLower = 10; const int kPercentDeltaMax = 6; + // Define the max limit of tick duration limits to handle clock skew. + const auto max_tune_tick_duration_limit = + std::chrono::microseconds(kSecondsPerTune * 1000 * 1000) * 7 / + 4; // 1.75x multiplier - std::chrono::microseconds prev_tuned_time = tuned_time_; + const std::chrono::microseconds prev_tuned_time = tuned_time_; tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic(env_)); - auto duration = tuned_time_ - prev_tuned_time; - auto duration_ms = - std::chrono::duration_cast(duration).count(); + // Validate tuning interval to detect system anomalies: + // (1) tuned_time_ < prev_tuned_time: Clock moved backwards (clock skew) + // (2) Interval > max_tune_tick_duration_limit: System stall or severe clock + // skew + if (tuned_time_ <= prev_tuned_time || + tuned_time_ >= prev_tuned_time + max_tune_tick_duration_limit) { + // Fall back to max rate limiter for safety if duration is invalid or + // exceeds max limit. + SetActualBytesPerSecond(max_bytes_per_sec_.load(std::memory_order_relaxed)); + return Status::Aborted(); + } + auto duration_ms = std::chrono::duration_cast( + tuned_time_ - prev_tuned_time) + .count(); int64_t prev_bytes_per_sec = GetBytesPerSecond(); - // This function can be called less frequent than we anticipate when // compaction rate is low. Loop through the actual time slice to correct // the estimation. - for (uint32_t i = 0; i < duration_ms / kMillisPerTune; i++) { + auto sampling_count = duration_ms / kMillisPerTune; + for (uint32_t i = 0; i < sampling_count; i++) { bytes_sampler_.AddSample(duration_bytes_through_ * 1000 / duration_ms); highpri_bytes_sampler_.AddSample(duration_highpri_bytes_through_ * 1000 / duration_ms); @@ -409,8 +428,8 @@ void WriteAmpBasedRateLimiter::PaceUp(bool critical) { } RateLimiter* NewWriteAmpBasedRateLimiter( - int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */, - int32_t fairness /* = 10 */, + int64_t rate_bytes_per_sec /* = 10GiB */, + int64_t refill_period_us /* = 100 * 1000 */, int32_t fairness /* = 10 */, RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */, bool auto_tuned /* = false */) { assert(rate_bytes_per_sec > 0);