Skip to content

Commit

Permalink
Add wait timeout to cv
Browse files Browse the repository at this point in the history
  • Loading branch information
kthui committed Mar 6, 2024
1 parent 5274544 commit 558f59c
Showing 1 changed file with 31 additions and 21 deletions.
52 changes: 31 additions & 21 deletions src/dynamic_batch_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,30 +460,40 @@ DynamicBatchScheduler::WaitForPayloadSlotAvailable(
// Enqueue threads above to make progress.
lock->unlock();

std::chrono::microseconds wait_timeout(wait_microseconds);
const auto reject_and_release_timeout_requests = [this]() {
std::vector<std::deque<std::unique_ptr<InferenceRequest>>>
rejected_requests, cancelled_requests;
{
std::lock_guard<std::mutex> lock(mu_);
queue_.RejectTimeoutRequests();
queue_.ReleaseSkippedRequests(&rejected_requests, &cancelled_requests);
}
FinishRejectedCancelledRequests(
std::move(rejected_requests), std::move(cancelled_requests));
};
const std::chrono::microseconds wait_timeout(wait_microseconds);
std::mutex slot_mu;
std::unique_lock<std::mutex> slot_lock(slot_mu);

cv_.wait(slot_lock, [this, &wait_timeout]() {
auto slot_available_future = std::async(std::launch::async, [this]() {
return model_->Server()->GetRateLimiter()->PayloadSlotAvailable(
model_, model_instance_, queue_.SupportPrefetching());
});
while (slot_available_future.wait_for(wait_timeout) !=
std::future_status::ready) {
// Reject and release timed-out requests while waiting.
std::vector<std::deque<std::unique_ptr<InferenceRequest>>>
rejected_requests, cancelled_requests;
{
std::lock_guard<std::mutex> lock(mu_);
queue_.RejectTimeoutRequests();
queue_.ReleaseSkippedRequests(&rejected_requests, &cancelled_requests);
}
FinishRejectedCancelledRequests(
std::move(rejected_requests), std::move(cancelled_requests));
bool slot_available = false;

while (!slot_available) {
slot_available = cv_.wait_for(
slot_lock, wait_timeout,
[this, &wait_timeout, &reject_and_release_timeout_requests]() {
auto slot_available_future = std::async(std::launch::async, [this]() {
return model_->Server()->GetRateLimiter()->PayloadSlotAvailable(
model_, model_instance_, queue_.SupportPrefetching());
});
while (slot_available_future.wait_for(wait_timeout) !=
std::future_status::ready) {
reject_and_release_timeout_requests();
}
return slot_available_future.get();
});
if (!slot_available) {
reject_and_release_timeout_requests();
}
return slot_available_future.get();
});
}

// Recapture the lock.
lock->lock();
Expand Down

0 comments on commit 558f59c

Please sign in to comment.