From 558f59c39c5c83160b5d4392fbcedb8f15f5837d Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Wed, 6 Mar 2024 10:47:45 -0800 Subject: [PATCH] Add wait timeout to cv --- src/dynamic_batch_scheduler.cc | 52 ++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/src/dynamic_batch_scheduler.cc b/src/dynamic_batch_scheduler.cc index 2a9815805..9a049fff5 100644 --- a/src/dynamic_batch_scheduler.cc +++ b/src/dynamic_batch_scheduler.cc @@ -460,30 +460,40 @@ DynamicBatchScheduler::WaitForPayloadSlotAvailable( // Enqueue threads above to make progress. lock->unlock(); - std::chrono::microseconds wait_timeout(wait_microseconds); + const auto reject_and_release_timeout_requests = [this]() { + std::vector>> + rejected_requests, cancelled_requests; + { + std::lock_guard lock(mu_); + queue_.RejectTimeoutRequests(); + queue_.ReleaseSkippedRequests(&rejected_requests, &cancelled_requests); + } + FinishRejectedCancelledRequests( + std::move(rejected_requests), std::move(cancelled_requests)); + }; + const std::chrono::microseconds wait_timeout(wait_microseconds); std::mutex slot_mu; std::unique_lock slot_lock(slot_mu); - - cv_.wait(slot_lock, [this, &wait_timeout]() { - auto slot_available_future = std::async(std::launch::async, [this]() { - return model_->Server()->GetRateLimiter()->PayloadSlotAvailable( - model_, model_instance_, queue_.SupportPrefetching()); - }); - while (slot_available_future.wait_for(wait_timeout) != - std::future_status::ready) { - // Reject and release timed-out requests while waiting. - std::vector>> - rejected_requests, cancelled_requests; - { - std::lock_guard lock(mu_); - queue_.RejectTimeoutRequests(); - queue_.ReleaseSkippedRequests(&rejected_requests, &cancelled_requests); - } - FinishRejectedCancelledRequests( - std::move(rejected_requests), std::move(cancelled_requests)); + bool slot_available = false; + + while (!slot_available) { + slot_available = cv_.wait_for( + slot_lock, wait_timeout, + [this, &wait_timeout, &reject_and_release_timeout_requests]() { + auto slot_available_future = std::async(std::launch::async, [this]() { + return model_->Server()->GetRateLimiter()->PayloadSlotAvailable( + model_, model_instance_, queue_.SupportPrefetching()); + }); + while (slot_available_future.wait_for(wait_timeout) != + std::future_status::ready) { + reject_and_release_timeout_requests(); + } + return slot_available_future.get(); + }); + if (!slot_available) { + reject_and_release_timeout_requests(); } - return slot_available_future.get(); - }); + } // Recapture the lock. lock->lock();