Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promptly respond to timeout requests under reject policy #333

Merged
merged 7 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 56 additions & 22 deletions src/dynamic_batch_scheduler.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2018-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -67,6 +67,20 @@ FinishSkippedRequests(
}
}

void
FinishRejectedCancelledRequests(
std::vector<std::deque<std::unique_ptr<InferenceRequest>>>&&
rejected_requests,
std::vector<std::deque<std::unique_ptr<InferenceRequest>>>&&
cancelled_requests)
{
const static Status rejected_status =
Status(Status::Code::UNAVAILABLE, "Request timeout expired");
const static Status cancelled_status = Status(Status::Code::CANCELLED);
FinishSkippedRequests(std::move(rejected_requests), rejected_status);
FinishSkippedRequests(std::move(cancelled_requests), cancelled_status);
}

DynamicBatchScheduler::DynamicBatchScheduler(
TritonModel* model, TritonModelInstance* model_instance,
const bool dynamic_batching_enabled, const int32_t max_batch_size,
Expand Down Expand Up @@ -317,10 +331,6 @@ DynamicBatchScheduler::BatcherThread(const int nice)
}
}

auto wait_for_slots = [this]() {
return model_->Server()->GetRateLimiter()->PayloadSlotAvailable(
model_, model_instance_, queue_.SupportPrefetching());
};
const uint64_t default_wait_microseconds = 500 * 1000;

while (!scheduler_thread_exit_.load()) {
Expand Down Expand Up @@ -359,18 +369,7 @@ DynamicBatchScheduler::BatcherThread(const int nice)
continue;
}

{
// The wait_for_slots conditional can be blocking till the slots
// are available for execution. Need to explicitly release the
// outer lock to allow Enqueue threads above to make progress.
lock.unlock();
// Use slot lock to wait for the slot availability.
std::mutex slot_mu;
std::unique_lock<std::mutex> slot_lock(slot_mu);
cv_.wait(slot_lock, wait_for_slots);
// Recapture the outer most lock to keep making progress.
lock.lock();
}
WaitForPayloadSlotAvailable(&lock, default_wait_microseconds);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default_wait_microseconds is a fixed value, wouldn't there still be delay of 0.5 seconds? Or that is acceptable for rejecting requests?

Copy link
Contributor

@oandreeva-nv oandreeva-nv Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 0.5s should be fine for now. What they were complaining about is timeout of 2s in the scenario, when a model executes for 10s, so timeout requests are waiting for 8 extra seconds to be returned.

We can always adjust it upon their feedback. @kthui, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think we can have them try this version first, we can always reduce the delay interval if they ask. I have filed an enhancement ticket to remove the need of a manually set interval.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't there still be delay of 0.5 seconds? Or that is acceptable for rejecting requests?

I think it is acceptable, since they mentioned "[some large model] could last for tens of seconds" and it becomes a concern, which 0.5 seconds is significantly less than tens of seconds.

{
std::lock_guard<std::mutex> exec_lock(
Expand Down Expand Up @@ -444,17 +443,52 @@ DynamicBatchScheduler::BatcherThread(const int nice)
}

// Finish rejected and cancelled requests if any
const static Status rejected_status =
Status(Status::Code::UNAVAILABLE, "Request timeout expired");
const static Status cancelled_status = Status(Status::Code::CANCELLED);
FinishSkippedRequests(std::move(rejected_requests), rejected_status);
FinishSkippedRequests(std::move(cancelled_requests), cancelled_status);
FinishRejectedCancelledRequests(
std::move(rejected_requests), std::move(cancelled_requests));
} // end runner loop

LOG_VERBOSE(1) << "Stopping dynamic-batcher thread for " << model_name_
<< "...";
}

void
DynamicBatchScheduler::WaitForPayloadSlotAvailable(
std::unique_lock<std::mutex>* lock, uint64_t wait_microseconds)
{
// The wait_for_slots conditional can be blocking till the slots are available
// for execution. Need to explicitly release the 'mu_' lock to allow the
// Enqueue threads above to make progress.
lock->unlock();

const std::chrono::microseconds wait_timeout(wait_microseconds);
std::mutex slot_mu;
std::unique_lock<std::mutex> slot_lock(slot_mu);
bool slot_available = false;

while (!slot_available) {
slot_available = cv_.wait_for(slot_lock, wait_timeout, [this]() {
return model_->Server()->GetRateLimiter()->PayloadSlotAvailable(
model_, model_instance_, queue_.SupportPrefetching(),
true /* force_non_blocking */);
});
if (!slot_available) {
// Reject and release timeout requests from queue.
std::vector<std::deque<std::unique_ptr<InferenceRequest>>>
rejected_requests, cancelled_requests;
{
std::lock_guard<std::mutex> lock(mu_);
queue_.RejectTimeoutRequests();
queue_.ReleaseSkippedRequests(&rejected_requests, &cancelled_requests);
}
FinishRejectedCancelledRequests(
std::move(rejected_requests), std::move(cancelled_requests));
}
}

// Recapture the lock.
lock->lock();
}

uint64_t
DynamicBatchScheduler::GetDynamicBatch()
{
Expand Down
11 changes: 10 additions & 1 deletion src/dynamic_batch_scheduler.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2018-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -109,6 +109,15 @@ class DynamicBatchScheduler : public Scheduler {
std::unique_ptr<InferenceResponse>& cached_response);
void FinalizeResponses();

// Block until a payload slot is available on the rate limiter. The 'lock'
// should be acquired when calling this function. The 'lock' will be released
// when waiting for payload slot and re-acquired before this function returns.
// For queued requests under policy REJECT, they will be rejected if timed-out
// while waiting for a slot. The timeout will be checked every
// 'wait_microseconds'. The 'wait_microseconds' should be non-zero.
void WaitForPayloadSlotAvailable(
std::unique_lock<std::mutex>* lock, uint64_t wait_microseconds);

// Custom batching function calls
// Returns whether custom batching is enabled.
bool CustomBatchEnabled() const;
Expand Down
41 changes: 40 additions & 1 deletion src/scheduler_utils.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -237,6 +237,33 @@ PriorityQueue::PolicyQueue::ApplyPolicy(
return ((idx - queue_.size()) < delayed_queue_.size());
}

size_t
PriorityQueue::PolicyQueue::RejectTimeoutRequests()
{
if (timeout_action_ != inference::ModelQueuePolicy::REJECT) {
return 0;
}

size_t rejected_count = 0;
uint64_t now_nanoseconds =
std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
size_t idx = 0;
while (idx < queue_.size()) {
if (timeout_timestamp_ns_[idx] != 0 &&
now_nanoseconds > timeout_timestamp_ns_[idx]) {
rejected_count++;
rejected_queue_.emplace_back(std::move(queue_[idx]));
queue_.erase(queue_.begin() + idx);
timeout_timestamp_ns_.erase(timeout_timestamp_ns_.begin() + idx);
} else {
idx++;
}
}
return rejected_count;
}

void
PriorityQueue::PolicyQueue::ReleaseRejectedQueue(
std::deque<std::unique_ptr<InferenceRequest>>* requests)
Expand Down Expand Up @@ -358,6 +385,18 @@ PriorityQueue::Dequeue(std::unique_ptr<InferenceRequest>* request)
return Status(Status::Code::UNAVAILABLE, "dequeue on empty queue");
}

void
PriorityQueue::RejectTimeoutRequests()
{
for (auto it = queues_.begin(); it != queues_.end(); it++) {
size_t rejected_count = it->second.RejectTimeoutRequests();
size_ -= rejected_count;
if (rejected_count > 0 && it->first == pending_cursor_.curr_it_->first) {
pending_cursor_.valid_ = false;
}
}
}

void
PriorityQueue::ReleaseSkippedRequests(
std::vector<std::deque<std::unique_ptr<InferenceRequest>>>*
Expand Down
11 changes: 10 additions & 1 deletion src/scheduler_utils.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
// Copyright 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -84,6 +84,11 @@ class PriorityQueue {
// Dequeue the request at the front of the queue.
Status Dequeue(std::unique_ptr<InferenceRequest>* request);

// Reject timed-out requests from 'queues_', if queue policy set to reject.
// The cursor will be marked as invalid, if a request from the queue pointed
// to by the cursor is rejected.
void RejectTimeoutRequests();

// Retrieve the requests that are either rejected or cancelled.
void ReleaseSkippedRequests(
std::vector<std::deque<std::unique_ptr<InferenceRequest>>>*
Expand Down Expand Up @@ -203,6 +208,10 @@ class PriorityQueue {
size_t idx, size_t* rejected_count, size_t* rejected_batch_size,
size_t* cancelled_count, size_t* cancelled_batch_size);

// Move timed-out requests from 'queue_' to 'rejected_queue_', if
// 'timeout_action_' is to reject. Return the number of requests rejected.
size_t RejectTimeoutRequests();

// Return the rejected requests held by the queue.
void ReleaseRejectedQueue(
std::deque<std::unique_ptr<InferenceRequest>>* requests);
Expand Down
Loading