Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TestScheduler to support testing time-based coroutines without waiting for timeouts #453

Merged
merged 13 commits into from
Mar 25, 2024
1 change: 1 addition & 0 deletions cpp/mrc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ add_library(libmrc
src/public/coroutines/io_scheduler.cpp
src/public/coroutines/sync_wait.cpp
src/public/coroutines/task_container.cpp
src/public/coroutines/test_scheduler.cpp
src/public/coroutines/thread_local_context.cpp
src/public/coroutines/thread_pool.cpp
src/public/cuda/device_guard.cpp
Expand Down
105 changes: 105 additions & 0 deletions cpp/mrc/include/mrc/coroutines/test_scheduler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "mrc/coroutines/scheduler.hpp"
#include "mrc/coroutines/task.hpp"

#include <chrono>
#include <coroutine>
#include <queue>
#include <utility>
#include <vector>

#pragma once

namespace mrc::coroutines {

class TestScheduler : public Scheduler
{
private:
struct Operation
{
public:
Operation(TestScheduler* self, std::chrono::time_point<std::chrono::steady_clock> time);

static constexpr bool await_ready()
{
return false;

Check warning on line 41 in cpp/mrc/include/mrc/coroutines/test_scheduler.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/coroutines/test_scheduler.hpp#L41

Added line #L41 was not covered by tests
}

void await_suspend(std::coroutine_handle<> handle);

void await_resume() {}

Check warning on line 46 in cpp/mrc/include/mrc/coroutines/test_scheduler.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/coroutines/test_scheduler.hpp#L46

Added line #L46 was not covered by tests

private:
TestScheduler* m_self;
std::chrono::time_point<std::chrono::steady_clock> m_time;
};

using item_t = std::pair<std::coroutine_handle<>, std::chrono::time_point<std::chrono::steady_clock>>;
struct ItemCompare
{
bool operator()(item_t& lhs, item_t& rhs);
};

std::priority_queue<item_t, std::vector<item_t>, ItemCompare> m_queue;
std::chrono::time_point<std::chrono::steady_clock> m_time = std::chrono::steady_clock::now();

public:
/**
* @brief Enqueue's the coroutine handle to be resumed at the current logical time.
*/
void resume(std::coroutine_handle<> handle) noexcept override;

/**
* Suspends the current function and enqueue's it to be resumed at the current logical time.
*/
mrc::coroutines::Task<> yield() override;

/**
* Suspends the current function and enqueue's it to be resumed at the current logica time + the given duration.
*/
mrc::coroutines::Task<> yield_for(std::chrono::milliseconds time) override;

/**
* Suspends the current function and enqueue's it to be resumed at the given logical time.
*/
mrc::coroutines::Task<> yield_until(std::chrono::time_point<std::chrono::steady_clock> time) override;

/**
* Immediately resumes the next-in-queue coroutine handle.
*
* @return true if more coroutines exist in the queue after resuming, false otherwise.
*/
bool resume_next();

/**
* Immediately resumes next-in-queue coroutines up to the current logical time + the given duration, in-order.
*
* @return true if more coroutines exist in the queue after resuming, false otherwise.
*/
bool resume_for(std::chrono::milliseconds time);

/**
* Immediately resumes next-in-queue coroutines up to the given logical time.
*
* @return true if more coroutines exist in the queue after resuming, false otherwise.
*/
bool resume_until(std::chrono::time_point<std::chrono::steady_clock> time);
};

} // namespace mrc::coroutines
102 changes: 102 additions & 0 deletions cpp/mrc/src/public/coroutines/test_scheduler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "mrc/coroutines/test_scheduler.hpp"

#include <compare>

namespace mrc::coroutines {

TestScheduler::Operation::Operation(TestScheduler* self, std::chrono::time_point<std::chrono::steady_clock> time) :
m_self(self),
m_time(time)

Check warning on line 26 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L24-L26

Added lines #L24 - L26 were not covered by tests
{}

bool TestScheduler::ItemCompare::operator()(item_t& lhs, item_t& rhs)

Check warning on line 29 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L29

Added line #L29 was not covered by tests
{
return lhs.second > rhs.second;

Check warning on line 31 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L31

Added line #L31 was not covered by tests
}

void TestScheduler::Operation::await_suspend(std::coroutine_handle<> handle)

Check warning on line 34 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L34

Added line #L34 was not covered by tests
{
m_self->m_queue.emplace(std::move(handle), m_time);

Check warning on line 36 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L36

Added line #L36 was not covered by tests
}

void TestScheduler::resume(std::coroutine_handle<> handle) noexcept

Check warning on line 39 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L39

Added line #L39 was not covered by tests
{
m_queue.emplace(std::move(handle), std::chrono::steady_clock::now());

Check warning on line 41 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L41

Added line #L41 was not covered by tests
}

mrc::coroutines::Task<> TestScheduler::yield()

Check warning on line 44 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L44

Added line #L44 was not covered by tests
{
co_return co_await TestScheduler::Operation{this, m_time};
}

mrc::coroutines::Task<> TestScheduler::yield_for(std::chrono::milliseconds time)

Check warning on line 49 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L49

Added line #L49 was not covered by tests
{
co_return co_await TestScheduler::Operation{this, m_time + time};
}

mrc::coroutines::Task<> TestScheduler::yield_until(std::chrono::time_point<std::chrono::steady_clock> time)

Check warning on line 54 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L54

Added line #L54 was not covered by tests
{
co_return co_await TestScheduler::Operation{this, time};
}

bool TestScheduler::resume_next()

Check warning on line 59 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L59

Added line #L59 was not covered by tests
{
if (m_queue.empty())

Check warning on line 61 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L61

Added line #L61 was not covered by tests
{
return false;

Check warning on line 63 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L63

Added line #L63 was not covered by tests
}

auto handle = m_queue.top();

Check warning on line 66 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L66

Added line #L66 was not covered by tests

m_queue.pop();

Check warning on line 68 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L68

Added line #L68 was not covered by tests

m_time = handle.second;

Check warning on line 70 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L70

Added line #L70 was not covered by tests

handle.first.resume();

Check warning on line 72 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L72

Added line #L72 was not covered by tests

return true;

Check warning on line 74 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L74

Added line #L74 was not covered by tests
}

bool TestScheduler::resume_for(std::chrono::milliseconds time)

Check warning on line 77 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L77

Added line #L77 was not covered by tests
{
return resume_until(m_time + time);

Check warning on line 79 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L79

Added line #L79 was not covered by tests
}

bool TestScheduler::resume_until(std::chrono::time_point<std::chrono::steady_clock> time)

Check warning on line 82 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L82

Added line #L82 was not covered by tests
{
m_time = time;

Check warning on line 84 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L84

Added line #L84 was not covered by tests

while (not m_queue.empty())

Check warning on line 86 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L86

Added line #L86 was not covered by tests
{
if (m_queue.top().second <= m_time)

Check warning on line 88 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L88

Added line #L88 was not covered by tests
{
m_queue.top().first.resume();
m_queue.pop();

Check warning on line 91 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L90-L91

Added lines #L90 - L91 were not covered by tests
}
else
{
return true;

Check warning on line 95 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L95

Added line #L95 was not covered by tests
}
}

return false;

Check warning on line 99 in cpp/mrc/src/public/coroutines/test_scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/coroutines/test_scheduler.cpp#L99

Added line #L99 was not covered by tests
}

} // namespace mrc::coroutines
Loading