From 77bd0b1203a88fb58a191fd0c80b9ee9e0365ad2 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 27 Feb 2024 23:34:06 +0000 Subject: [PATCH 01/12] add IoScheduler from libcoro --- cpp/mrc/CMakeLists.txt | 1 + .../mrc/coroutines/detail/poll_info.hpp | 118 ++++ cpp/mrc/include/mrc/coroutines/fd.hpp | 44 ++ .../include/mrc/coroutines/io_scheduler.hpp | 413 +++++++++++++ cpp/mrc/include/mrc/coroutines/poll.hpp | 82 +++ cpp/mrc/include/mrc/coroutines/scheduler.hpp | 5 - cpp/mrc/include/mrc/coroutines/time.hpp | 46 ++ .../src/public/coroutines/io_scheduler.cpp | 560 ++++++++++++++++++ .../src/public/coroutines/task_container.cpp | 2 +- .../_pymrc/include/pymrc/asyncio_runnable.hpp | 6 +- .../include/pymrc/asyncio_scheduler.hpp | 8 - .../_pymrc/tests/test_asyncio_runnable.cpp | 2 +- 12 files changed, 1270 insertions(+), 17 deletions(-) create mode 100644 cpp/mrc/include/mrc/coroutines/detail/poll_info.hpp create mode 100644 cpp/mrc/include/mrc/coroutines/fd.hpp create mode 100644 cpp/mrc/include/mrc/coroutines/io_scheduler.hpp create mode 100644 cpp/mrc/include/mrc/coroutines/poll.hpp create mode 100644 cpp/mrc/include/mrc/coroutines/time.hpp create mode 100644 cpp/mrc/src/public/coroutines/io_scheduler.cpp diff --git a/cpp/mrc/CMakeLists.txt b/cpp/mrc/CMakeLists.txt index 4b7138edd..4c72eb6f7 100644 --- a/cpp/mrc/CMakeLists.txt +++ b/cpp/mrc/CMakeLists.txt @@ -115,6 +115,7 @@ add_library(libmrc src/public/core/logging.cpp src/public/core/thread.cpp src/public/coroutines/event.cpp + src/public/coroutines/io_scheduler.cpp src/public/coroutines/sync_wait.cpp src/public/coroutines/task_container.cpp src/public/coroutines/thread_local_context.cpp diff --git a/cpp/mrc/include/mrc/coroutines/detail/poll_info.hpp b/cpp/mrc/include/mrc/coroutines/detail/poll_info.hpp new file mode 100644 index 000000000..7fd6a2aef --- /dev/null +++ b/cpp/mrc/include/mrc/coroutines/detail/poll_info.hpp @@ -0,0 +1,118 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Original Source: https://github.com/jbaldwin/libcoro + * Original License: Apache License, Version 2.0; included below + */ + +/** + * Copyright 2021 Josh Baldwin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/coroutines/fd.hpp" +#include "mrc/coroutines/poll.hpp" +#include "mrc/coroutines/time.hpp" + +#include +#include +#include +#include + +namespace mrc::coroutines::detail { +/** + * Poll Info encapsulates everything about a poll operation for the event as well as its paired + * timeout. This is important since coroutines that are waiting on an event or timeout do not + * immediately execute, they are re-scheduled onto the thread pool, so its possible its pair + * event or timeout also triggers while the coroutine is still waiting to resume. This means that + * the first one to happen, the event itself or its timeout, needs to disable the other pair item + * prior to resuming the coroutine. + * + * Finally, its also important to note that the event and its paired timeout could happen during + * the same epoll_wait and possibly trigger the coroutine to start twice. Only one can win, so the + * first one processed sets m_processed to true and any subsequent events in the same epoll batch + * are effectively discarded. + */ +struct PollInfo +{ + using timed_events_t = std::multimap; + + PollInfo() = default; + ~PollInfo() = default; + + PollInfo(const PollInfo&) = delete; + PollInfo(PollInfo&&) = delete; + auto operator=(const PollInfo&) -> PollInfo& = delete; + auto operator=(PollInfo&&) -> PollInfo& = delete; + + struct PollAwaiter + { + explicit PollAwaiter(PollInfo& pi) noexcept : m_pi(pi) {} + + static auto await_ready() noexcept -> bool + { + return false; + } + auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void + { + m_pi.m_awaiting_coroutine = awaiting_coroutine; + std::atomic_thread_fence(std::memory_order::release); + } + auto await_resume() const noexcept -> mrc::coroutines::PollStatus + { + return m_pi.m_poll_status; + } + + PollInfo& m_pi; + }; + + auto operator co_await() noexcept -> PollAwaiter + { + return PollAwaiter{*this}; + } + + /// The file descriptor being polled on. This is needed so that if the timeout occurs first then + /// the event loop can immediately disable the event within epoll. + fd_t m_fd{-1}; + /// The timeout's position in the timeout map. A poll() with no timeout or yield() this is empty. + /// This is needed so that if the event occurs first then the event loop can immediately disable + /// the timeout within epoll. + std::optional m_timer_pos{std::nullopt}; + /// The awaiting coroutine for this poll info to resume upon event or timeout. + std::coroutine_handle<> m_awaiting_coroutine; + /// The status of the poll operation. + mrc::coroutines::PollStatus m_poll_status{mrc::coroutines::PollStatus::error}; + /// Did the timeout and event trigger at the same time on the same epoll_wait call? + /// Once this is set to true all future events on this poll info are null and void. + bool m_processed{false}; +}; + +} // namespace mrc::coroutines::detail \ No newline at end of file diff --git a/cpp/mrc/include/mrc/coroutines/fd.hpp b/cpp/mrc/include/mrc/coroutines/fd.hpp new file mode 100644 index 000000000..0ec42e06c --- /dev/null +++ b/cpp/mrc/include/mrc/coroutines/fd.hpp @@ -0,0 +1,44 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Original Source: https://github.com/jbaldwin/libcoro + * Original License: Apache License, Version 2.0; included below + */ + +/** + * Copyright 2021 Josh Baldwin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + #pragma once + +namespace mrc::coroutines { +using fd_t = int; + +} // namespace mrc::coroutines diff --git a/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp b/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp new file mode 100644 index 000000000..9b077455b --- /dev/null +++ b/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp @@ -0,0 +1,413 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Original Source: https://github.com/jbaldwin/libcoro + * Original License: Apache License, Version 2.0; included below + */ + +/** + * Copyright 2021 Josh Baldwin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + #pragma once + +#include "mrc/coroutines/detail/poll_info.hpp" +#include "mrc/coroutines/fd.hpp" +#include "mrc/coroutines/poll.hpp" +#include "mrc/coroutines/scheduler.hpp" +#include "mrc/coroutines/task_container.hpp" +#include "mrc/coroutines/thread_pool.hpp" + +#ifdef LIBCORO_FEATURE_NETWORKING + #include "coro/net/socket.hpp" +#endif + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace mrc::coroutines { +class IoScheduler : public Scheduler +{ + using timed_events_t = detail::PollInfo::timed_events_t; + + public: + class schedule_operation; + friend schedule_operation; + + enum class ThreadStrategy + { + /// Spawns a dedicated background thread for the scheduler to run on. + spawn, + /// Requires the user to call process_events() to drive the scheduler. + manual + }; + + enum class ExecutionStrategy + { + /// Tasks will be FIFO queued to be executed on a thread pool. This is better for tasks that + /// are long lived and will use lots of CPU because long lived tasks will block other i/o + /// operations while they complete. This strategy is generally better for lower latency + /// requirements at the cost of throughput. + process_tasks_on_thread_pool, + /// Tasks will be executed inline on the io scheduler thread. This is better for short tasks + /// that can be quickly processed and not block other i/o operations for very long. This + /// strategy is generally better for higher throughput at the cost of latency. + process_tasks_inline + }; + + struct Options + { + /// Should the io scheduler spawn a dedicated event processor? + ThreadStrategy thread_strategy{ThreadStrategy::spawn}; + /// If spawning a dedicated event processor a functor to call upon that thread starting. + std::function on_io_thread_start_functor{nullptr}; + /// If spawning a dedicated event processor a functor to call upon that thread stopping. + std::function on_io_thread_stop_functor{nullptr}; + /// Thread pool options for the task processor threads. See thread pool for more details. + ThreadPool::Options pool{ + .thread_count = ((std::thread::hardware_concurrency() > 1) ? (std::thread::hardware_concurrency() - 1) : 1), + .on_thread_start_functor = nullptr, + .on_thread_stop_functor = nullptr}; + + /// If inline task processing is enabled then the io worker will resume tasks on its thread + /// rather than scheduling them to be picked up by the thread pool. + const ExecutionStrategy execution_strategy{ExecutionStrategy::process_tasks_on_thread_pool}; + }; + + explicit IoScheduler(Options opts = Options{ + .thread_strategy = ThreadStrategy::spawn, + .on_io_thread_start_functor = nullptr, + .on_io_thread_stop_functor = nullptr, + .pool = {.thread_count = ((std::thread::hardware_concurrency() > 1) + ? (std::thread::hardware_concurrency() - 1) + : 1), + .on_thread_start_functor = nullptr, + .on_thread_stop_functor = nullptr}, + .execution_strategy = ExecutionStrategy::process_tasks_on_thread_pool}); + + IoScheduler(const IoScheduler&) = delete; + IoScheduler(IoScheduler&&) = delete; + auto operator=(const IoScheduler&) -> IoScheduler& = delete; + auto operator=(IoScheduler&&) -> IoScheduler& = delete; + + ~IoScheduler(); + + /** + * Given a ThreadStrategy::manual this function should be called at regular intervals to + * process events that are ready. If a using ThreadStrategy::spawn this is run continously + * on a dedicated background thread and does not need to be manually invoked. + * @param timeout If no events are ready how long should the function wait for events to be ready? + * Passing zero (default) for the timeout will check for any events that are + * ready now, and then return. This could be zero events. Passing -1 means block + * indefinitely until an event happens. + * @param return The number of tasks currently executing or waiting to execute. + */ + auto process_events(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> std::size_t; + + class schedule_operation + { + friend class IoScheduler; + explicit schedule_operation(IoScheduler& scheduler) noexcept : m_scheduler(scheduler) {} + + public: + /** + * Operations always pause so the executing thread can be switched. + */ + static constexpr auto await_ready() noexcept -> bool + { + return false; + } + + /** + * Suspending always returns to the caller (using void return of await_suspend()) and + * stores the coroutine internally for the executing thread to resume from. + */ + auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void + { + if (m_scheduler.m_opts.execution_strategy == ExecutionStrategy::process_tasks_inline) + { + m_scheduler.m_size.fetch_add(1, std::memory_order::release); + { + std::scoped_lock lk{m_scheduler.m_scheduled_tasks_mutex}; + m_scheduler.m_scheduled_tasks.emplace_back(awaiting_coroutine); + } + + // Trigger the event to wake-up the scheduler if this event isn't currently triggered. + bool expected{false}; + if (m_scheduler.m_schedule_fd_triggered.compare_exchange_strong(expected, + true, + std::memory_order::release, + std::memory_order::relaxed)) + { + eventfd_t value{1}; + eventfd_write(m_scheduler.m_schedule_fd, value); + } + } + else + { + m_scheduler.m_thread_pool->resume(awaiting_coroutine); + } + } + + /** + * no-op as this is the function called first by the thread pool's executing thread. + */ + auto await_resume() noexcept -> void {} + + private: + /// The thread pool that this operation will execute on. + IoScheduler& m_scheduler; + }; + + /** + * Schedules the current task onto this IoScheduler for execution. + */ + auto schedule() -> schedule_operation + { + return schedule_operation{*this}; + } + + /** + * Schedules a task onto the IoScheduler and moves ownership of the task to the IoScheduler. + * Only void return type tasks can be scheduled in this manner since the task submitter will no + * longer have control over the scheduled task. + * @param task The task to execute on this IoScheduler. It's lifetime ownership will be transferred + * to this IoScheduler. + */ + auto schedule(mrc::coroutines::Task&& task) -> void; + + /** + * Schedules the current task to run after the given amount of time has elapsed. + * @param amount The amount of time to wait before resuming execution of this task. + * Given zero or negative amount of time this behaves identical to schedule(). + */ + [[nodiscard]] auto schedule_after(std::chrono::milliseconds amount) -> mrc::coroutines::Task; + + /** + * Schedules the current task to run at a given time point in the future. + * @param time The time point to resume execution of this task. Given 'now' or a time point + * in the past this behaves identical to schedule(). + */ + [[nodiscard]] auto schedule_at(time_point_t time) -> mrc::coroutines::Task; + + /** + * Yields the current task to the end of the queue of waiting tasks. + */ + [[nodiscard]] mrc::coroutines::Task yield() override + { + co_await schedule_operation{*this}; + }; + + /** + * Yields the current task for the given amount of time. + * @param amount The amount of time to yield for before resuming executino of this task. + * Given zero or negative amount of time this behaves identical to yield(). + */ + [[nodiscard]] auto yield_for(std::chrono::milliseconds amount) -> mrc::coroutines::Task; + + /** + * Yields the current task until the given time point in the future. + * @param time The time point to resume execution of this task. Given 'now' or a time point in the + * in the past this behaves identical to yield(). + */ + [[nodiscard]] auto yield_until(time_point_t time) -> mrc::coroutines::Task; + + /** + * Polls the given file descriptor for the given operations. + * @param fd The file descriptor to poll for events. + * @param op The operations to poll for. + * @param timeout The amount of time to wait for the events to trigger. A timeout of zero will + * block indefinitely until the event triggers. + * @return The result of the poll operation. + */ + [[nodiscard]] auto poll(fd_t fd, + mrc::coroutines::PollOperation op, + std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + -> mrc::coroutines::Task; + +#ifdef LIBCORO_FEATURE_NETWORKING + /** + * Polls the given mrc::coroutines::net::socket for the given operations. + * @param sock The socket to poll for events on. + * @param op The operations to poll for. + * @param timeout The amount of time to wait for the events to trigger. A timeout of zero will + * block indefinitely until the event triggers. + * @return THe result of the poll operation. + */ + [[nodiscard]] auto poll(const net::socket& sock, + mrc::coroutines::poll_op op, + std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + -> mrc::coroutines::Task + { + return poll(sock.native_handle(), op, timeout); + } +#endif + + /** + * Resumes execution of a direct coroutine handle on this io scheduler. + * @param handle The coroutine handle to resume execution. + */ + void resume(std::coroutine_handle<> handle) noexcept override + { + if (m_opts.execution_strategy == ExecutionStrategy::process_tasks_inline) + { + { + std::scoped_lock lk{m_scheduled_tasks_mutex}; + m_scheduled_tasks.emplace_back(handle); + } + + bool expected{false}; + if (m_schedule_fd_triggered.compare_exchange_strong(expected, + true, + std::memory_order::release, + std::memory_order::relaxed)) + { + eventfd_t value{1}; + eventfd_write(m_schedule_fd, value); + } + } + else + { + m_thread_pool->resume(handle); + } + } + + /** + * @return The number of tasks waiting in the task queue + the executing tasks. + */ + auto size() const noexcept -> std::size_t + { + if (m_opts.execution_strategy == ExecutionStrategy::process_tasks_inline) + { + return m_size.load(std::memory_order::acquire); + } + + return m_size.load(std::memory_order::acquire) + m_thread_pool->size(); + } + + /** + * @return True if the task queue is empty and zero tasks are currently executing. + */ + auto empty() const noexcept -> bool + { + return size() == 0; + } + + /** + * Starts the shutdown of the io scheduler. All currently executing and pending tasks will complete + * prior to shutting down. This call is blocking and will not return until all tasks complete. + */ + auto shutdown() noexcept -> void; + + /** + * Scans for completed coroutines and destroys them freeing up resources. This is also done on starting + * new tasks but this allows the user to cleanup resources manually. One usage might be making sure fds + * are cleaned up as soon as possible. + */ + auto garbage_collect() noexcept -> void; + + private: + /// The configuration options. + Options m_opts; + + /// The event loop epoll file descriptor. + fd_t m_epoll_fd{-1}; + /// The event loop fd to trigger a shutdown. + fd_t m_shutdown_fd{-1}; + /// The event loop timer fd for timed events, e.g. yield_for() or scheduler_after(). + fd_t m_timer_fd{-1}; + /// The schedule file descriptor if the scheduler is in inline processing mode. + fd_t m_schedule_fd{-1}; + std::atomic m_schedule_fd_triggered{false}; + + /// The number of tasks executing or awaiting events in this io scheduler. + std::atomic m_size{0}; + + /// The background io worker threads. + std::thread m_io_thread; + /// Thread pool for executing tasks when not in inline mode. + std::unique_ptr m_thread_pool{nullptr}; + + std::mutex m_timed_events_mutex{}; + /// The map of time point's to poll infos for tasks that are yielding for a period of time + /// or for tasks that are polling with timeouts. + timed_events_t m_timed_events{}; + + /// Has the IoScheduler been requested to shut down? + std::atomic m_shutdown_requested{false}; + + std::atomic m_io_processing{false}; + auto process_events_manual(std::chrono::milliseconds timeout) -> void; + auto process_events_dedicated_thread() -> void; + auto process_events_execute(std::chrono::milliseconds timeout) -> void; + static auto event_to_poll_status(uint32_t events) -> PollStatus; + + auto process_scheduled_execute_inline() -> void; + std::mutex m_scheduled_tasks_mutex{}; + std::vector> m_scheduled_tasks{}; + + /// Tasks that have their ownership passed into the scheduler. This is a bit strange for now + /// but the concept doesn't pass since IoScheduler isn't fully defined yet. + /// The type is mrc::coroutines::Task_container* + /// Do not inline any functions that use this in the IoScheduler header, it can cause the linker + /// to complain about "defined in discarded section" because it gets defined multiple times + void* m_owned_tasks{nullptr}; + + static constexpr const int MShutdownObject{0}; + static constexpr const void* MShutdownPtr = &MShutdownObject; + + static constexpr const int MTimerObject{0}; + static constexpr const void* MTimerPtr = &MTimerObject; + + static constexpr const int MScheduleObject{0}; + static constexpr const void* MSchedulePtr = &MScheduleObject; + + static const constexpr std::chrono::milliseconds MDefaultTimeout{1000}; + static const constexpr std::chrono::milliseconds MNoTimeout{0}; + static const constexpr std::size_t MMaxEvents = 16; + std::array m_events{}; + std::vector> m_handles_to_resume{}; + + auto process_event_execute(detail::PollInfo* pi, PollStatus status) -> void; + auto process_timeout_execute() -> void; + + auto add_timer_token(time_point_t tp, detail::PollInfo& pi) -> timed_events_t::iterator; + auto remove_timer_token(timed_events_t::iterator pos) -> void; + auto update_timeout(time_point_t now) -> void; +}; + +} // namespace mrc::coroutines diff --git a/cpp/mrc/include/mrc/coroutines/poll.hpp b/cpp/mrc/include/mrc/coroutines/poll.hpp new file mode 100644 index 000000000..b7383bef4 --- /dev/null +++ b/cpp/mrc/include/mrc/coroutines/poll.hpp @@ -0,0 +1,82 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Original Source: https://github.com/jbaldwin/libcoro + * Original License: Apache License, Version 2.0; included below + */ + +/** + * Copyright 2021 Josh Baldwin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +namespace mrc::coroutines { +enum class PollOperation : uint64_t +{ + /// Poll for read operations. + read = EPOLLIN, + /// Poll for write operations. + write = EPOLLOUT, + /// Poll for read and write operations. + read_write = EPOLLIN | EPOLLOUT +}; + +inline auto poll_op_readable(PollOperation op) -> bool +{ + return (static_cast(op) & EPOLLIN) != 0; +} + +inline auto poll_op_writeable(PollOperation op) -> bool +{ + return (static_cast(op) & EPOLLOUT) != 0; +} + +auto to_string(PollOperation op) -> const std::string&; + +enum class PollStatus +{ + /// The poll operation was was successful. + event, + /// The poll operation timed out. + timeout, + /// The file descriptor had an error while polling. + error, + /// The file descriptor has been closed by the remote or an internal error/close. + closed +}; + +auto to_string(PollStatus status) -> const std::string&; + +} // namespace mrc::coroutines diff --git a/cpp/mrc/include/mrc/coroutines/scheduler.hpp b/cpp/mrc/include/mrc/coroutines/scheduler.hpp index 0e296924a..302fd6608 100644 --- a/cpp/mrc/include/mrc/coroutines/scheduler.hpp +++ b/cpp/mrc/include/mrc/coroutines/scheduler.hpp @@ -40,11 +40,6 @@ class Scheduler : public std::enable_shared_from_this */ virtual void resume(std::coroutine_handle<> handle) noexcept = 0; - /** - * @brief Suspends the current function and resumes it according to the scheduler's implementation. - */ - [[nodiscard]] virtual Task<> schedule() = 0; - /** * @brief Suspends the current function and resumes it according to the scheduler's implementation. */ diff --git a/cpp/mrc/include/mrc/coroutines/time.hpp b/cpp/mrc/include/mrc/coroutines/time.hpp new file mode 100644 index 000000000..825b01ec8 --- /dev/null +++ b/cpp/mrc/include/mrc/coroutines/time.hpp @@ -0,0 +1,46 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Original Source: https://github.com/jbaldwin/libcoro + * Original License: Apache License, Version 2.0; included below + */ + +/** + * Copyright 2021 Josh Baldwin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace mrc::coroutines { +using clock_t = std::chrono::steady_clock; +using time_point_t = clock_t::time_point; +} // namespace mrc::coroutines diff --git a/cpp/mrc/src/public/coroutines/io_scheduler.cpp b/cpp/mrc/src/public/coroutines/io_scheduler.cpp new file mode 100644 index 000000000..c67688290 --- /dev/null +++ b/cpp/mrc/src/public/coroutines/io_scheduler.cpp @@ -0,0 +1,560 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Original Source: https://github.com/jbaldwin/libcoro + * Original License: Apache License, Version 2.0; included below + */ + +/** + * Copyright 2021 Josh Baldwin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mrc/coroutines/io_scheduler.hpp" + +#include "mrc/coroutines/poll.hpp" +#include "mrc/coroutines/task.hpp" +#include "mrc/coroutines/task_container.hpp" +#include "mrc/coroutines/time.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +using namespace std::chrono_literals; + +namespace mrc::coroutines { +IoScheduler::IoScheduler(Options opts) : + m_opts(std::move(opts)), + m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)), + m_shutdown_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)), + m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)), + m_schedule_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)), + m_owned_tasks(new mrc::coroutines::TaskContainer(this->shared_from_this())) +{ + if (opts.execution_strategy == ExecutionStrategy::process_tasks_on_thread_pool) + { + m_thread_pool = std::make_unique(std::move(m_opts.pool)); + } + + epoll_event e{}; + e.events = EPOLLIN; + + e.data.ptr = const_cast(MShutdownPtr); + epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_shutdown_fd, &e); + + e.data.ptr = const_cast(MTimerPtr); + epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_timer_fd, &e); + + e.data.ptr = const_cast(MSchedulePtr); + epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_schedule_fd, &e); + + if (m_opts.thread_strategy == ThreadStrategy::spawn) + { + m_io_thread = std::thread([this]() { + process_events_dedicated_thread(); + }); + } + // else manual mode, the user must call process_events. +} + +IoScheduler::~IoScheduler() +{ + shutdown(); + + if (m_io_thread.joinable()) + { + m_io_thread.join(); + } + + if (m_epoll_fd != -1) + { + close(m_epoll_fd); + m_epoll_fd = -1; + } + if (m_timer_fd != -1) + { + close(m_timer_fd); + m_timer_fd = -1; + } + if (m_schedule_fd != -1) + { + close(m_schedule_fd); + m_schedule_fd = -1; + } + + if (m_owned_tasks != nullptr) + { + delete static_cast(m_owned_tasks); + m_owned_tasks = nullptr; + } +} + +auto IoScheduler::process_events(std::chrono::milliseconds timeout) -> std::size_t +{ + process_events_manual(timeout); + return size(); +} + +auto IoScheduler::schedule(mrc::coroutines::Task&& task) -> void +{ + auto* ptr = static_cast(m_owned_tasks); + ptr->start(std::move(task)); +} + +auto IoScheduler::schedule_after(std::chrono::milliseconds amount) -> mrc::coroutines::Task +{ + return yield_for(amount); +} + +auto IoScheduler::schedule_at(time_point_t time) -> mrc::coroutines::Task +{ + return yield_until(time); +} + +auto IoScheduler::yield_for(std::chrono::milliseconds amount) -> mrc::coroutines::Task +{ + if (amount <= 0ms) + { + co_await schedule(); + } + else + { + // Yield/timeout tasks are considered live in the scheduler and must be accounted for. Note + // that if the user gives an invalid amount and schedule() is directly called it will account + // for the scheduled task there. + m_size.fetch_add(1, std::memory_order::release); + + // Yielding does not requiring setting the timer position on the poll info since + // it doesn't have a corresponding 'event' that can trigger, it always waits for + // the timeout to occur before resuming. + + detail::PollInfo pi{}; + add_timer_token(clock_t::now() + amount, pi); + co_await pi; + + m_size.fetch_sub(1, std::memory_order::release); + } + co_return; +} + +auto IoScheduler::yield_until(time_point_t time) -> mrc::coroutines::Task +{ + auto now = clock_t::now(); + + // If the requested time is in the past (or now!) bail out! + if (time <= now) + { + co_await schedule(); + } + else + { + m_size.fetch_add(1, std::memory_order::release); + + auto amount = std::chrono::duration_cast(time - now); + + detail::PollInfo pi{}; + add_timer_token(now + amount, pi); + co_await pi; + + m_size.fetch_sub(1, std::memory_order::release); + } + co_return; +} + +auto IoScheduler::poll(fd_t fd, mrc::coroutines::PollOperation op, std::chrono::milliseconds timeout) + -> mrc::coroutines::Task +{ + // Because the size will drop when this coroutine suspends every poll needs to undo the subtraction + // on the number of active tasks in the scheduler. When this task is resumed by the event loop. + m_size.fetch_add(1, std::memory_order::release); + + // Setup two events, a timeout event and the actual poll for op event. + // Whichever triggers first will delete the other to guarantee only one wins. + // The resume token will be set by the scheduler to what the event turned out to be. + + bool timeout_requested = (timeout > 0ms); + + detail::PollInfo pi{}; + pi.m_fd = fd; + + if (timeout_requested) + { + pi.m_timer_pos = add_timer_token(clock_t::now() + timeout, pi); + } + + epoll_event e{}; + e.events = static_cast(op) | EPOLLONESHOT | EPOLLRDHUP; + e.data.ptr = π + if (epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e) == -1) + { + std::cerr << "epoll ctl error on fd " << fd << "\n"; + } + + // The event loop will 'clean-up' whichever event didn't win since the coroutine is scheduled + // onto the thread poll its possible the other type of event could trigger while its waiting + // to execute again, thus restarting the coroutine twice, that would be quite bad. + auto result = co_await pi; + m_size.fetch_sub(1, std::memory_order::release); + co_return result; +} + +auto IoScheduler::shutdown() noexcept -> void +{ + // Only allow shutdown to occur once. + if (not m_shutdown_requested.exchange(true, std::memory_order::acq_rel)) + { + if (m_thread_pool != nullptr) + { + m_thread_pool->shutdown(); + } + + // Signal the event loop to stop asap, triggering the event fd is safe. + uint64_t value{1}; + auto written = ::write(m_shutdown_fd, &value, sizeof(value)); + (void)written; + + if (m_io_thread.joinable()) + { + m_io_thread.join(); + } + } +} + +auto IoScheduler::garbage_collect() noexcept -> void +{ + auto* ptr = static_cast(m_owned_tasks); + ptr->garbage_collect(); +} + +auto IoScheduler::process_events_manual(std::chrono::milliseconds timeout) -> void +{ + bool expected{false}; + if (m_io_processing.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed)) + { + process_events_execute(timeout); + m_io_processing.exchange(false, std::memory_order::release); + } +} + +auto IoScheduler::process_events_dedicated_thread() -> void +{ + if (m_opts.on_io_thread_start_functor != nullptr) + { + m_opts.on_io_thread_start_functor(); + } + + m_io_processing.exchange(true, std::memory_order::release); + // Execute tasks until stopped or there are no more tasks to complete. + while (!m_shutdown_requested.load(std::memory_order::acquire) || size() > 0) + { + process_events_execute(MDefaultTimeout); + } + m_io_processing.exchange(false, std::memory_order::release); + + if (m_opts.on_io_thread_stop_functor != nullptr) + { + m_opts.on_io_thread_stop_functor(); + } +} + +auto IoScheduler::process_events_execute(std::chrono::milliseconds timeout) -> void +{ + auto event_count = epoll_wait(m_epoll_fd, m_events.data(), MMaxEvents, timeout.count()); + if (event_count > 0) + { + for (std::size_t i = 0; i < static_cast(event_count); ++i) + { + epoll_event& event = m_events[i]; + void* handle_ptr = event.data.ptr; + + if (handle_ptr == MTimerPtr) + { + // Process all events that have timed out. + process_timeout_execute(); + } + else if (handle_ptr == MSchedulePtr) + { + // Process scheduled coroutines. + process_scheduled_execute_inline(); + } + else if (handle_ptr == MShutdownPtr) [[unlikely]] + { + // Nothing to do , just needed to wake-up and smell the flowers + } + else + { + // Individual poll task wake-up. + process_event_execute(static_cast(handle_ptr), event_to_poll_status(event.events)); + } + } + } + + // Its important to not resume any handles until the full set is accounted for. If a timeout + // and an event for the same handle happen in the same epoll_wait() call then inline processing + // will destruct the poll_info object before the second event is handled. This is also possible + // with thread pool processing, but probably has an extremely low chance of occuring due to + // the thread switch required. If m_max_events == 1 this would be unnecessary. + + if (!m_handles_to_resume.empty()) + { + if (m_opts.execution_strategy == ExecutionStrategy::process_tasks_inline) + { + for (auto& handle : m_handles_to_resume) + { + handle.resume(); + } + } + else + { + m_thread_pool->resume(m_handles_to_resume); + } + + m_handles_to_resume.clear(); + } +} + +auto IoScheduler::event_to_poll_status(uint32_t events) -> PollStatus +{ + if (((events & EPOLLIN) != 0) || ((events & EPOLLOUT) != 0)) + { + return PollStatus::event; + } + + if ((events & EPOLLERR) != 0) + { + return PollStatus::error; + } + + if (((events & EPOLLRDHUP) != 0) || ((events & EPOLLHUP) != 0)) + { + return PollStatus::closed; + } + + throw std::runtime_error{"invalid epoll state"}; +} + +auto IoScheduler::process_scheduled_execute_inline() -> void +{ + std::vector> tasks{}; + { + // Acquire the entire list, and then reset it. + std::scoped_lock lk{m_scheduled_tasks_mutex}; + tasks.swap(m_scheduled_tasks); + + // Clear the schedule eventfd if this is a scheduled task. + eventfd_t value{0}; + eventfd_read(m_schedule_fd, &value); + + // Clear the in memory flag to reduce eventfd_* calls on scheduling. + m_schedule_fd_triggered.exchange(false, std::memory_order::release); + } + + // This set of handles can be safely resumed now since they do not have a corresponding timeout event. + for (auto& task : tasks) + { + task.resume(); + } + m_size.fetch_sub(tasks.size(), std::memory_order::release); +} + +auto IoScheduler::process_event_execute(detail::PollInfo* pi, PollStatus status) -> void +{ + if (!pi->m_processed) + { + std::atomic_thread_fence(std::memory_order::acquire); + // Its possible the event and the timeout occurred in the same epoll, make sure only one + // is ever processed, the other is discarded. + pi->m_processed = true; + + // Given a valid fd always remove it from epoll so the next poll can blindly EPOLL_CTL_ADD. + if (pi->m_fd != -1) + { + epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, pi->m_fd, nullptr); + } + + // Since this event triggered, remove its corresponding timeout if it has one. + if (pi->m_timer_pos.has_value()) + { + remove_timer_token(pi->m_timer_pos.value()); + } + + pi->m_poll_status = status; + + while (pi->m_awaiting_coroutine == nullptr) + { + std::atomic_thread_fence(std::memory_order::acquire); + } + + m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine); + } +} + +auto IoScheduler::process_timeout_execute() -> void +{ + std::vector poll_infos{}; + auto now = clock_t::now(); + + { + std::scoped_lock lk{m_timed_events_mutex}; + while (!m_timed_events.empty()) + { + auto first = m_timed_events.begin(); + auto [tp, pi] = *first; + + if (tp <= now) + { + m_timed_events.erase(first); + poll_infos.emplace_back(pi); + } + else + { + break; + } + } + } + + for (auto* pi : poll_infos) + { + if (!pi->m_processed) + { + // Its possible the event and the timeout occurred in the same epoll, make sure only one + // is ever processed, the other is discarded. + pi->m_processed = true; + + // Since this timed out, remove its corresponding event if it has one. + if (pi->m_fd != -1) + { + epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, pi->m_fd, nullptr); + } + + while (pi->m_awaiting_coroutine == nullptr) + { + std::atomic_thread_fence(std::memory_order::acquire); + // std::cerr << "process_event_execute() has a nullptr event\n"; + } + + m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine); + pi->m_poll_status = mrc::coroutines::PollStatus::timeout; + } + } + + // Update the time to the next smallest time point, re-take the current now time + // since updating and resuming tasks could shift the time. + update_timeout(clock_t::now()); +} + +auto IoScheduler::add_timer_token(time_point_t tp, detail::PollInfo& pi) -> timed_events_t::iterator +{ + std::scoped_lock lk{m_timed_events_mutex}; + auto pos = m_timed_events.emplace(tp, &pi); + + // If this item was inserted as the smallest time point, update the timeout. + if (pos == m_timed_events.begin()) + { + update_timeout(clock_t::now()); + } + + return pos; +} + +auto IoScheduler::remove_timer_token(timed_events_t::iterator pos) -> void +{ + { + std::scoped_lock lk{m_timed_events_mutex}; + auto is_first = (m_timed_events.begin() == pos); + + m_timed_events.erase(pos); + + // If this was the first item, update the timeout. It would be acceptable to just let it + // also fire the timeout as the event loop will ignore it since nothing will have timed + // out but it feels like the right thing to do to update it to the correct timeout value. + if (is_first) + { + update_timeout(clock_t::now()); + } + } +} + +auto IoScheduler::update_timeout(time_point_t now) -> void +{ + if (!m_timed_events.empty()) + { + auto& [tp, pi] = *m_timed_events.begin(); + + auto amount = tp - now; + + auto seconds = std::chrono::duration_cast(amount); + amount -= seconds; + auto nanoseconds = std::chrono::duration_cast(amount); + + // As a safeguard if both values end up as zero (or negative) then trigger the timeout + // immediately as zero disarms timerfd according to the man pages and negative values + // will result in an error return value. + if (seconds <= 0s) + { + seconds = 0s; + if (nanoseconds <= 0ns) + { + // just trigger "immediately"! + nanoseconds = 1ns; + } + } + + itimerspec ts{}; + ts.it_value.tv_sec = seconds.count(); + ts.it_value.tv_nsec = nanoseconds.count(); + + if (timerfd_settime(m_timer_fd, 0, &ts, nullptr) == -1) + { + std::cerr << "Failed to set timerfd errorno=[" << std::string{strerror(errno)} << "]."; + } + } + else + { + // Setting these values to zero disables the timer. + itimerspec ts{}; + ts.it_value.tv_sec = 0; + ts.it_value.tv_nsec = 0; + if (timerfd_settime(m_timer_fd, 0, &ts, nullptr) == -1) + { + std::cerr << "Failed to set timerfd errorno=[" << std::string{strerror(errno)} << "]."; + } + } +} + +} // namespace mrc::coroutines \ No newline at end of file diff --git a/cpp/mrc/src/public/coroutines/task_container.cpp b/cpp/mrc/src/public/coroutines/task_container.cpp index e29b50fc2..874c73e03 100644 --- a/cpp/mrc/src/public/coroutines/task_container.cpp +++ b/cpp/mrc/src/public/coroutines/task_container.cpp @@ -136,7 +136,7 @@ auto TaskContainer::gc_internal() -> std::size_t auto TaskContainer::make_cleanup_task(Task user_task, task_position_t pos) -> Task { // Immediately move the task onto the executor. - co_await m_scheduler->schedule(); + co_await m_scheduler->yield(); try { diff --git a/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp b/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp index 36dad7208..0add732c1 100644 --- a/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp +++ b/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp @@ -205,7 +205,7 @@ class AsyncioRunnable : public AsyncSink, * @brief Value's read from the sink's channel are fed to this function and yields from the * resulting generator are written to the source's channel. */ - virtual mrc::coroutines::AsyncGenerator on_data(InputT&& value) = 0; + virtual mrc::coroutines::AsyncGenerator on_data(InputT&& value, std::shared_ptr on) = 0; std::stop_source m_stop_source; @@ -316,9 +316,10 @@ coroutines::Task<> AsyncioRunnable::process_one(InputT value, try { // Call the on_data function - auto on_data_gen = this->on_data(std::move(value)); + auto on_data_gen = this->on_data(std::move(value), on); auto iter = co_await on_data_gen.begin(); + // co_await on->yield(); while (iter != on_data_gen.end()) { @@ -326,6 +327,7 @@ coroutines::Task<> AsyncioRunnable::process_one(InputT value, auto data = std::move(*iter); co_await this->write_async(std::move(data)); + // co_await on->yield(); // Advance the iterator co_await ++iter; diff --git a/python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp b/python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp index 3d9e563b9..e9c507856 100644 --- a/python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp +++ b/python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp @@ -82,14 +82,6 @@ class AsyncioScheduler : public mrc::coroutines::Scheduler AsyncioScheduler::resume(m_loop, handle); } - /** - * @brief Suspends the current function and resumes it on the scheduler's Asyncio event loop - */ - [[nodiscard]] coroutines::Task<> schedule() override - { - co_await ContinueOnLoopOperation(m_loop); - } - /** * @brief Suspends the current function and resumes it on the scheduler's Asyncio event loop */ diff --git a/python/mrc/_pymrc/tests/test_asyncio_runnable.cpp b/python/mrc/_pymrc/tests/test_asyncio_runnable.cpp index a46bea824..0c51c64a9 100644 --- a/python/mrc/_pymrc/tests/test_asyncio_runnable.cpp +++ b/python/mrc/_pymrc/tests/test_asyncio_runnable.cpp @@ -85,7 +85,7 @@ class __attribute__((visibility("default"))) PythonCallbackAsyncioRunnable : pub public: PythonCallbackAsyncioRunnable(pymrc::PyObjectHolder operation) : m_operation(std::move(operation)) {} - mrc::coroutines::AsyncGenerator on_data(int&& value) override + mrc::coroutines::AsyncGenerator on_data(int&& value, std::shared_ptr on) override { py::gil_scoped_acquire acquire; From 9f7ffc87f660d3e481efb64a4151f5ac6f630514 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 28 Feb 2024 17:08:04 +0000 Subject: [PATCH 02/12] fix io_scheduler/task_container circular reference --- .../include/mrc/coroutines/io_scheduler.hpp | 10 +++-- cpp/mrc/include/mrc/coroutines/scheduler.hpp | 5 +++ .../src/public/coroutines/io_scheduler.cpp | 21 ++++++++- cpp/mrc/tests/CMakeLists.txt | 1 + .../tests/coroutines/test_io_scheduler.cpp | 43 +++++++++++++++++++ .../include/pymrc/asyncio_scheduler.hpp | 14 ++++++ 6 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 cpp/mrc/tests/coroutines/test_io_scheduler.cpp diff --git a/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp b/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp index 9b077455b..e79fa545b 100644 --- a/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp +++ b/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp @@ -35,8 +35,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - #pragma once + +#pragma once #include "mrc/coroutines/detail/poll_info.hpp" #include "mrc/coroutines/fd.hpp" @@ -60,11 +60,15 @@ #include namespace mrc::coroutines { + class IoScheduler : public Scheduler { + private: using timed_events_t = detail::PollInfo::timed_events_t; public: + static std::shared_ptr get_instance(); + class schedule_operation; friend schedule_operation; @@ -124,7 +128,7 @@ class IoScheduler : public Scheduler auto operator=(const IoScheduler&) -> IoScheduler& = delete; auto operator=(IoScheduler&&) -> IoScheduler& = delete; - ~IoScheduler(); + ~IoScheduler() override; /** * Given a ThreadStrategy::manual this function should be called at regular intervals to diff --git a/cpp/mrc/include/mrc/coroutines/scheduler.hpp b/cpp/mrc/include/mrc/coroutines/scheduler.hpp index 302fd6608..b085b5801 100644 --- a/cpp/mrc/include/mrc/coroutines/scheduler.hpp +++ b/cpp/mrc/include/mrc/coroutines/scheduler.hpp @@ -18,6 +18,7 @@ #pragma once #include "mrc/coroutines/task.hpp" +#include "mrc/coroutines/time.hpp" #include #include @@ -44,6 +45,10 @@ class Scheduler : public std::enable_shared_from_this * @brief Suspends the current function and resumes it according to the scheduler's implementation. */ [[nodiscard]] virtual Task<> yield() = 0; + + [[nodiscard]] virtual Task<> yield_for(std::chrono::milliseconds amount) = 0; + + [[nodiscard]] virtual Task<> yield_until(time_point_t time) = 0; }; } // namespace mrc::coroutines diff --git a/cpp/mrc/src/public/coroutines/io_scheduler.cpp b/cpp/mrc/src/public/coroutines/io_scheduler.cpp index c67688290..31ca56ee2 100644 --- a/cpp/mrc/src/public/coroutines/io_scheduler.cpp +++ b/cpp/mrc/src/public/coroutines/io_scheduler.cpp @@ -58,13 +58,32 @@ using namespace std::chrono_literals; namespace mrc::coroutines { + +std::shared_ptr IoScheduler::get_instance() +{ + static std::shared_ptr instance; + static std::mutex instance_mutex{}; + + if (instance == nullptr) + { + auto lock = std::lock_guard(instance_mutex); + + if (instance == nullptr) + { + instance = std::make_shared(); + } + } + + return instance; +} + IoScheduler::IoScheduler(Options opts) : m_opts(std::move(opts)), m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)), m_shutdown_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)), m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)), m_schedule_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)), - m_owned_tasks(new mrc::coroutines::TaskContainer(this->shared_from_this())) + m_owned_tasks(new mrc::coroutines::TaskContainer(std::shared_ptr(this, [](auto _) {}))) { if (opts.execution_strategy == ExecutionStrategy::process_tasks_on_thread_pool) { diff --git a/cpp/mrc/tests/CMakeLists.txt b/cpp/mrc/tests/CMakeLists.txt index db193b455..224f26bf7 100644 --- a/cpp/mrc/tests/CMakeLists.txt +++ b/cpp/mrc/tests/CMakeLists.txt @@ -17,6 +17,7 @@ add_executable(test_mrc coroutines/test_async_generator.cpp coroutines/test_event.cpp + coroutines/test_io_scheduler.cpp coroutines/test_latch.cpp coroutines/test_ring_buffer.cpp coroutines/test_task_container.cpp diff --git a/cpp/mrc/tests/coroutines/test_io_scheduler.cpp b/cpp/mrc/tests/coroutines/test_io_scheduler.cpp new file mode 100644 index 000000000..0be414881 --- /dev/null +++ b/cpp/mrc/tests/coroutines/test_io_scheduler.cpp @@ -0,0 +1,43 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mrc/coroutines/async_generator.hpp" +#include "mrc/coroutines/io_scheduler.hpp" +#include "mrc/coroutines/sync_wait.hpp" +#include "mrc/coroutines/task.hpp" + +#include + +#include + +using namespace mrc; +using namespace std::chrono_literals; + +class TestCoroIoScheduler : public ::testing::Test +{}; + +TEST_F(TestCoroIoScheduler, YieldFor) +{ + auto scheduler = coroutines::IoScheduler::get_instance(); + + auto task = [scheduler]() -> coroutines::Task<> { + // co_await scheduler->yield_for(1000ms); + co_return; + }; + + coroutines::sync_wait(task()); +} \ No newline at end of file diff --git a/python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp b/python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp index e9c507856..8240a88d0 100644 --- a/python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp +++ b/python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp @@ -22,9 +22,11 @@ #include "pymrc/utilities/object_wrappers.hpp" #include +#include #include #include #include +#include #include #include @@ -90,6 +92,18 @@ class AsyncioScheduler : public mrc::coroutines::Scheduler co_await ContinueOnLoopOperation(m_loop); } + [[nodiscard]] coroutines::Task<> yield_for(std::chrono::milliseconds amount) override + { + co_await coroutines::IoScheduler::get_instance()->yield_for(amount); + co_await ContinueOnLoopOperation(m_loop); + }; + + [[nodiscard]] coroutines::Task<> yield_until(mrc::coroutines::time_point_t time) override + { + co_await coroutines::IoScheduler::get_instance()->yield_until(time); + co_await ContinueOnLoopOperation(m_loop); + }; + private: mrc::pymrc::PyHolder m_loop; }; From e04ff6b106cfce0833e2b76020855930df07c00d Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 28 Feb 2024 20:14:54 +0000 Subject: [PATCH 03/12] test yield_until and yield_for concurrency --- cpp/mrc/include/mrc/coroutines/scheduler.hpp | 2 +- .../tests/coroutines/test_io_scheduler.cpp | 40 +++++++++++++++++-- .../_pymrc/include/pymrc/asyncio_runnable.hpp | 2 - 3 files changed, 38 insertions(+), 6 deletions(-) diff --git a/cpp/mrc/include/mrc/coroutines/scheduler.hpp b/cpp/mrc/include/mrc/coroutines/scheduler.hpp index b085b5801..0872502f2 100644 --- a/cpp/mrc/include/mrc/coroutines/scheduler.hpp +++ b/cpp/mrc/include/mrc/coroutines/scheduler.hpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/cpp/mrc/tests/coroutines/test_io_scheduler.cpp b/cpp/mrc/tests/coroutines/test_io_scheduler.cpp index 0be414881..f41b41e61 100644 --- a/cpp/mrc/tests/coroutines/test_io_scheduler.cpp +++ b/cpp/mrc/tests/coroutines/test_io_scheduler.cpp @@ -19,6 +19,8 @@ #include "mrc/coroutines/io_scheduler.hpp" #include "mrc/coroutines/sync_wait.hpp" #include "mrc/coroutines/task.hpp" +#include "mrc/coroutines/time.hpp" +#include "mrc/coroutines/when_all.hpp" #include @@ -35,9 +37,41 @@ TEST_F(TestCoroIoScheduler, YieldFor) auto scheduler = coroutines::IoScheduler::get_instance(); auto task = [scheduler]() -> coroutines::Task<> { - // co_await scheduler->yield_for(1000ms); - co_return; + co_await scheduler->yield_for(10ms); }; coroutines::sync_wait(task()); -} \ No newline at end of file +} + +TEST_F(TestCoroIoScheduler, YieldUntil) +{ + auto scheduler = coroutines::IoScheduler::get_instance(); + + auto task = [scheduler]() -> coroutines::Task<> { + co_await scheduler->yield_until(coroutines::clock_t::now() + 10ms); + }; + + coroutines::sync_wait(task()); +} + +TEST_F(TestCoroIoScheduler, Concurrent) +{ + auto scheduler = coroutines::IoScheduler::get_instance(); + + auto task = [scheduler]() -> coroutines::Task<> { + co_await scheduler->yield_for(10ms); + }; + + auto start = coroutines::clock_t::now(); + + std::vector> tasks; + + for (uint32_t i = 0; i < 1000; i++) + { + tasks.push_back(task()); + } + + coroutines::sync_wait(coroutines::when_all(std::move(tasks))); + + ASSERT_LT(coroutines::clock_t::now() - start, 20ms); +} diff --git a/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp b/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp index 0add732c1..a595580a0 100644 --- a/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp +++ b/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp @@ -319,7 +319,6 @@ coroutines::Task<> AsyncioRunnable::process_one(InputT value, auto on_data_gen = this->on_data(std::move(value), on); auto iter = co_await on_data_gen.begin(); - // co_await on->yield(); while (iter != on_data_gen.end()) { @@ -327,7 +326,6 @@ coroutines::Task<> AsyncioRunnable::process_one(InputT value, auto data = std::move(*iter); co_await this->write_async(std::move(data)); - // co_await on->yield(); // Advance the iterator co_await ++iter; From 1b5f672fc9451ae175ba3d021321903707266005 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 28 Feb 2024 20:17:22 +0000 Subject: [PATCH 04/12] add doc strings for yield_for and yield_until --- cpp/mrc/include/mrc/coroutines/scheduler.hpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cpp/mrc/include/mrc/coroutines/scheduler.hpp b/cpp/mrc/include/mrc/coroutines/scheduler.hpp index 0872502f2..90b490930 100644 --- a/cpp/mrc/include/mrc/coroutines/scheduler.hpp +++ b/cpp/mrc/include/mrc/coroutines/scheduler.hpp @@ -46,9 +46,16 @@ class Scheduler : public std::enable_shared_from_this */ [[nodiscard]] virtual Task<> yield() = 0; + /** + * @brief Suspends the current function for a given duration and resumes it according to the schedulers's implementation. + */ [[nodiscard]] virtual Task<> yield_for(std::chrono::milliseconds amount) = 0; + /** + * @brief Suspends the current function until a given time point and resumes it according to the schedulers's implementation. + */ [[nodiscard]] virtual Task<> yield_until(time_point_t time) = 0; + }; } // namespace mrc::coroutines From 87c624bacc7b5a104e19ce19c002d0e903a199c5 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 28 Feb 2024 20:19:15 +0000 Subject: [PATCH 05/12] add newlines --- cpp/mrc/src/public/coroutines/io_scheduler.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/mrc/src/public/coroutines/io_scheduler.cpp b/cpp/mrc/src/public/coroutines/io_scheduler.cpp index 31ca56ee2..e763e8604 100644 --- a/cpp/mrc/src/public/coroutines/io_scheduler.cpp +++ b/cpp/mrc/src/public/coroutines/io_scheduler.cpp @@ -576,4 +576,4 @@ auto IoScheduler::update_timeout(time_point_t now) -> void } } -} // namespace mrc::coroutines \ No newline at end of file +} // namespace mrc::coroutines From 8d3ae25628685161d1b35b75e207880bde81366a Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 28 Feb 2024 20:19:39 +0000 Subject: [PATCH 06/12] add newlines --- cpp/mrc/include/mrc/coroutines/detail/poll_info.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/mrc/include/mrc/coroutines/detail/poll_info.hpp b/cpp/mrc/include/mrc/coroutines/detail/poll_info.hpp index 7fd6a2aef..d1173fe90 100644 --- a/cpp/mrc/include/mrc/coroutines/detail/poll_info.hpp +++ b/cpp/mrc/include/mrc/coroutines/detail/poll_info.hpp @@ -115,4 +115,4 @@ struct PollInfo bool m_processed{false}; }; -} // namespace mrc::coroutines::detail \ No newline at end of file +} // namespace mrc::coroutines::detail From 0ee4ea4c6242412bd15abe25f2bcde8f364a9f21 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 28 Feb 2024 20:31:41 +0000 Subject: [PATCH 07/12] fix styles --- cpp/mrc/include/mrc/coroutines/fd.hpp | 6 +++--- cpp/mrc/include/mrc/coroutines/io_scheduler.hpp | 15 +++++++++++---- cpp/mrc/include/mrc/coroutines/poll.hpp | 2 +- cpp/mrc/include/mrc/coroutines/scheduler.hpp | 7 ++++--- cpp/mrc/include/mrc/coroutines/time.hpp | 2 +- cpp/mrc/src/public/coroutines/io_scheduler.cpp | 10 ++++++++-- cpp/mrc/src/public/coroutines/task_container.cpp | 2 +- cpp/mrc/tests/CMakeLists.txt | 2 +- cpp/mrc/tests/coroutines/test_io_scheduler.cpp | 5 +++++ .../mrc/_pymrc/include/pymrc/asyncio_runnable.hpp | 5 +++-- .../_pymrc/include/pymrc/asyncio_scheduler.hpp | 2 +- python/mrc/_pymrc/tests/test_asyncio_runnable.cpp | 6 +++++- 12 files changed, 44 insertions(+), 20 deletions(-) diff --git a/cpp/mrc/include/mrc/coroutines/fd.hpp b/cpp/mrc/include/mrc/coroutines/fd.hpp index 0ec42e06c..86a5e1563 100644 --- a/cpp/mrc/include/mrc/coroutines/fd.hpp +++ b/cpp/mrc/include/mrc/coroutines/fd.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -35,8 +35,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - #pragma once + +#pragma once namespace mrc::coroutines { using fd_t = int; diff --git a/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp b/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp index e79fa545b..d061a94df 100644 --- a/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp +++ b/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp @@ -40,26 +40,32 @@ #include "mrc/coroutines/detail/poll_info.hpp" #include "mrc/coroutines/fd.hpp" -#include "mrc/coroutines/poll.hpp" #include "mrc/coroutines/scheduler.hpp" -#include "mrc/coroutines/task_container.hpp" +#include "mrc/coroutines/task.hpp" #include "mrc/coroutines/thread_pool.hpp" +#include "mrc/coroutines/time.hpp" #ifdef LIBCORO_FEATURE_NETWORKING #include "coro/net/socket.hpp" #endif +#include #include +#include +#include #include +#include +#include #include -#include #include -#include +#include #include #include namespace mrc::coroutines { +enum class PollOperation : uint64_t; +enum class PollStatus; class IoScheduler : public Scheduler { @@ -70,6 +76,7 @@ class IoScheduler : public Scheduler static std::shared_ptr get_instance(); class schedule_operation; + friend schedule_operation; enum class ThreadStrategy diff --git a/cpp/mrc/include/mrc/coroutines/poll.hpp b/cpp/mrc/include/mrc/coroutines/poll.hpp index b7383bef4..86bb28867 100644 --- a/cpp/mrc/include/mrc/coroutines/poll.hpp +++ b/cpp/mrc/include/mrc/coroutines/poll.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/cpp/mrc/include/mrc/coroutines/scheduler.hpp b/cpp/mrc/include/mrc/coroutines/scheduler.hpp index 90b490930..d8efff83b 100644 --- a/cpp/mrc/include/mrc/coroutines/scheduler.hpp +++ b/cpp/mrc/include/mrc/coroutines/scheduler.hpp @@ -47,15 +47,16 @@ class Scheduler : public std::enable_shared_from_this [[nodiscard]] virtual Task<> yield() = 0; /** - * @brief Suspends the current function for a given duration and resumes it according to the schedulers's implementation. + * @brief Suspends the current function for a given duration and resumes it according to the schedulers's + * implementation. */ [[nodiscard]] virtual Task<> yield_for(std::chrono::milliseconds amount) = 0; /** - * @brief Suspends the current function until a given time point and resumes it according to the schedulers's implementation. + * @brief Suspends the current function until a given time point and resumes it according to the schedulers's + * implementation. */ [[nodiscard]] virtual Task<> yield_until(time_point_t time) = 0; - }; } // namespace mrc::coroutines diff --git a/cpp/mrc/include/mrc/coroutines/time.hpp b/cpp/mrc/include/mrc/coroutines/time.hpp index 825b01ec8..f7844b5b7 100644 --- a/cpp/mrc/include/mrc/coroutines/time.hpp +++ b/cpp/mrc/include/mrc/coroutines/time.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/cpp/mrc/src/public/coroutines/io_scheduler.cpp b/cpp/mrc/src/public/coroutines/io_scheduler.cpp index e763e8604..801eacd0f 100644 --- a/cpp/mrc/src/public/coroutines/io_scheduler.cpp +++ b/cpp/mrc/src/public/coroutines/io_scheduler.cpp @@ -43,17 +43,23 @@ #include "mrc/coroutines/task_container.hpp" #include "mrc/coroutines/time.hpp" +#include #include #include -#include #include -#include +#include #include #include +#include #include #include +#include #include +#include +#include +#include +#include using namespace std::chrono_literals; diff --git a/cpp/mrc/src/public/coroutines/task_container.cpp b/cpp/mrc/src/public/coroutines/task_container.cpp index 874c73e03..317f489f9 100644 --- a/cpp/mrc/src/public/coroutines/task_container.cpp +++ b/cpp/mrc/src/public/coroutines/task_container.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/cpp/mrc/tests/CMakeLists.txt b/cpp/mrc/tests/CMakeLists.txt index 224f26bf7..0e7eef64e 100644 --- a/cpp/mrc/tests/CMakeLists.txt +++ b/cpp/mrc/tests/CMakeLists.txt @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2018-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2018-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/cpp/mrc/tests/coroutines/test_io_scheduler.cpp b/cpp/mrc/tests/coroutines/test_io_scheduler.cpp index f41b41e61..87571032b 100644 --- a/cpp/mrc/tests/coroutines/test_io_scheduler.cpp +++ b/cpp/mrc/tests/coroutines/test_io_scheduler.cpp @@ -23,8 +23,13 @@ #include "mrc/coroutines/when_all.hpp" #include +#include +#include #include +#include +#include +#include using namespace mrc; using namespace std::chrono_literals; diff --git a/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp b/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp index a595580a0..5d8d9fd32 100644 --- a/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp +++ b/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -205,7 +205,8 @@ class AsyncioRunnable : public AsyncSink, * @brief Value's read from the sink's channel are fed to this function and yields from the * resulting generator are written to the source's channel. */ - virtual mrc::coroutines::AsyncGenerator on_data(InputT&& value, std::shared_ptr on) = 0; + virtual mrc::coroutines::AsyncGenerator on_data(InputT&& value, + std::shared_ptr on) = 0; std::stop_source m_stop_source; diff --git a/python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp b/python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp index 8240a88d0..47246cad7 100644 --- a/python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp +++ b/python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/python/mrc/_pymrc/tests/test_asyncio_runnable.cpp b/python/mrc/_pymrc/tests/test_asyncio_runnable.cpp index 0c51c64a9..997ae978e 100644 --- a/python/mrc/_pymrc/tests/test_asyncio_runnable.cpp +++ b/python/mrc/_pymrc/tests/test_asyncio_runnable.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -54,6 +54,10 @@ #include #include +namespace mrc::coroutines { +class Scheduler; +} // namespace mrc::coroutines + namespace py = pybind11; namespace pymrc = mrc::pymrc; using namespace std::string_literals; From 2d1d9ac33ba750a0c81fa516d73b47f40f6d1c01 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 28 Feb 2024 21:22:49 +0000 Subject: [PATCH 08/12] fix styles --- cpp/mrc/include/mrc/coroutines/io_scheduler.hpp | 6 +++--- cpp/mrc/src/public/coroutines/io_scheduler.cpp | 4 ++-- cpp/mrc/tests/coroutines/test_io_scheduler.cpp | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp b/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp index d061a94df..0345a6c0c 100644 --- a/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp +++ b/cpp/mrc/include/mrc/coroutines/io_scheduler.hpp @@ -49,7 +49,6 @@ #include "coro/net/socket.hpp" #endif -#include #include #include @@ -57,6 +56,7 @@ #include #include #include +#include #include #include #include @@ -248,14 +248,14 @@ class IoScheduler : public Scheduler * @param amount The amount of time to yield for before resuming executino of this task. * Given zero or negative amount of time this behaves identical to yield(). */ - [[nodiscard]] auto yield_for(std::chrono::milliseconds amount) -> mrc::coroutines::Task; + [[nodiscard]] mrc::coroutines::Task yield_for(std::chrono::milliseconds amount) override; /** * Yields the current task until the given time point in the future. * @param time The time point to resume execution of this task. Given 'now' or a time point in the * in the past this behaves identical to yield(). */ - [[nodiscard]] auto yield_until(time_point_t time) -> mrc::coroutines::Task; + [[nodiscard]] mrc::coroutines::Task yield_until(time_point_t time) override; /** * Polls the given file descriptor for the given operations. diff --git a/cpp/mrc/src/public/coroutines/io_scheduler.cpp b/cpp/mrc/src/public/coroutines/io_scheduler.cpp index 801eacd0f..aec14be75 100644 --- a/cpp/mrc/src/public/coroutines/io_scheduler.cpp +++ b/cpp/mrc/src/public/coroutines/io_scheduler.cpp @@ -43,11 +43,11 @@ #include "mrc/coroutines/task_container.hpp" #include "mrc/coroutines/time.hpp" -#include +#include #include #include #include -#include +#include #include #include diff --git a/cpp/mrc/tests/coroutines/test_io_scheduler.cpp b/cpp/mrc/tests/coroutines/test_io_scheduler.cpp index 87571032b..e722e4c43 100644 --- a/cpp/mrc/tests/coroutines/test_io_scheduler.cpp +++ b/cpp/mrc/tests/coroutines/test_io_scheduler.cpp @@ -23,7 +23,7 @@ #include "mrc/coroutines/when_all.hpp" #include -#include +#include #include #include From b810ee3fb02a50ae6630d4c0ec9bc9c768fed72e Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 28 Feb 2024 21:30:51 +0000 Subject: [PATCH 09/12] fix styles --- cpp/mrc/src/public/coroutines/io_scheduler.cpp | 4 ++-- cpp/mrc/tests/coroutines/test_io_scheduler.cpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/mrc/src/public/coroutines/io_scheduler.cpp b/cpp/mrc/src/public/coroutines/io_scheduler.cpp index aec14be75..a52f7a756 100644 --- a/cpp/mrc/src/public/coroutines/io_scheduler.cpp +++ b/cpp/mrc/src/public/coroutines/io_scheduler.cpp @@ -43,16 +43,16 @@ #include "mrc/coroutines/task_container.hpp" #include "mrc/coroutines/time.hpp" -#include #include #include #include -#include #include #include +#include #include #include +#include #include #include #include diff --git a/cpp/mrc/tests/coroutines/test_io_scheduler.cpp b/cpp/mrc/tests/coroutines/test_io_scheduler.cpp index e722e4c43..26efb93c1 100644 --- a/cpp/mrc/tests/coroutines/test_io_scheduler.cpp +++ b/cpp/mrc/tests/coroutines/test_io_scheduler.cpp @@ -23,10 +23,10 @@ #include "mrc/coroutines/when_all.hpp" #include -#include #include #include +#include #include #include #include From 88fdbd2b4f426809f25ba670715e481b14f06e94 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 12 Mar 2024 18:08:04 +0000 Subject: [PATCH 10/12] test scheduler --- cpp/mrc/CMakeLists.txt | 1 + .../include/mrc/coroutines/test_scheduler.hpp | 58 +++++++++++++ .../src/public/coroutines/test_scheduler.cpp | 83 +++++++++++++++++++ 3 files changed, 142 insertions(+) create mode 100644 cpp/mrc/include/mrc/coroutines/test_scheduler.hpp create mode 100644 cpp/mrc/src/public/coroutines/test_scheduler.cpp diff --git a/cpp/mrc/CMakeLists.txt b/cpp/mrc/CMakeLists.txt index 4c72eb6f7..5e570182c 100644 --- a/cpp/mrc/CMakeLists.txt +++ b/cpp/mrc/CMakeLists.txt @@ -118,6 +118,7 @@ add_library(libmrc src/public/coroutines/io_scheduler.cpp src/public/coroutines/sync_wait.cpp src/public/coroutines/task_container.cpp + src/public/coroutines/test_scheduler.cpp src/public/coroutines/thread_local_context.cpp src/public/coroutines/thread_pool.cpp src/public/cuda/device_guard.cpp diff --git a/cpp/mrc/include/mrc/coroutines/test_scheduler.hpp b/cpp/mrc/include/mrc/coroutines/test_scheduler.hpp new file mode 100644 index 000000000..dc65bf628 --- /dev/null +++ b/cpp/mrc/include/mrc/coroutines/test_scheduler.hpp @@ -0,0 +1,58 @@ +#include "mrc/coroutines/scheduler.hpp" + +#include +#include +#include + +#pragma once + +namespace mrc::coroutines { + +class TestScheduler : public Scheduler +{ + private: + struct Operation + { + public: + Operation(TestScheduler* self, std::chrono::time_point time); + + static constexpr bool await_ready() + { + return false; + } + + void await_suspend(std::coroutine_handle<> handle); + + void await_resume() {} + + private: + TestScheduler* m_self; + std::chrono::time_point m_time; + }; + + using item_t = std::pair, std::chrono::time_point>; + struct ItemCompare + { + bool operator()(item_t& lhs, item_t& rhs); + }; + + std::priority_queue, ItemCompare> m_queue; + std::chrono::time_point m_time = std::chrono::steady_clock::now(); + + public: + void resume(std::coroutine_handle<> handle) noexcept override; + + mrc::coroutines::Task<> yield() override; + + mrc::coroutines::Task<> yield_for(std::chrono::milliseconds time) override; + + mrc::coroutines::Task<> yield_until(std::chrono::time_point time) override; + + bool resume_next(); + + bool resume_for(std::chrono::milliseconds time); + + bool resume_until(std::chrono::time_point time); +}; + +} // namespace mrc::coroutines diff --git a/cpp/mrc/src/public/coroutines/test_scheduler.cpp b/cpp/mrc/src/public/coroutines/test_scheduler.cpp new file mode 100644 index 000000000..b4c3dcfb4 --- /dev/null +++ b/cpp/mrc/src/public/coroutines/test_scheduler.cpp @@ -0,0 +1,83 @@ +#include "mrc/coroutines/test_scheduler.hpp" + +namespace mrc::coroutines { + +TestScheduler::Operation::Operation(TestScheduler* self, std::chrono::time_point time) : + m_self(self), + m_time(time) +{} + +bool TestScheduler::ItemCompare::operator()(item_t& lhs, item_t& rhs) +{ + return lhs.second > rhs.second; +} + +void TestScheduler::Operation::await_suspend(std::coroutine_handle<> handle) +{ + m_self->m_queue.emplace(std::move(handle), m_time); +} + +void TestScheduler::resume(std::coroutine_handle<> handle) noexcept +{ + m_queue.emplace(std::move(handle), std::chrono::steady_clock::now()); +} + +mrc::coroutines::Task<> TestScheduler::yield() +{ + co_return co_await TestScheduler::Operation{this, m_time}; +} + +mrc::coroutines::Task<> TestScheduler::yield_for(std::chrono::milliseconds time) +{ + co_return co_await TestScheduler::Operation{this, m_time + time}; +} + +mrc::coroutines::Task<> TestScheduler::yield_until(std::chrono::time_point time) +{ + co_return co_await TestScheduler::Operation{this, time}; +} + +bool TestScheduler::resume_next() +{ + if (m_queue.empty()) + { + return false; + } + + auto handle = m_queue.top(); + + m_queue.pop(); + + m_time = handle.second; + + handle.first.resume(); + + return true; +} + +bool TestScheduler::resume_for(std::chrono::milliseconds time) +{ + return resume_until(m_time + time); +} + +bool TestScheduler::resume_until(std::chrono::time_point time) +{ + m_time = time; + + while (not m_queue.empty()) + { + if (m_queue.top().second <= m_time) + { + m_queue.top().first.resume(); + m_queue.pop(); + } + else + { + return true; + } + } + + return false; +} + +} // namespace mrc::coroutines From 6b1bc990635a74029e51f816ee32033b70ef2662 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 15 Mar 2024 19:25:19 +0000 Subject: [PATCH 11/12] add comments to TestScheduler --- .../include/mrc/coroutines/test_scheduler.hpp | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/cpp/mrc/include/mrc/coroutines/test_scheduler.hpp b/cpp/mrc/include/mrc/coroutines/test_scheduler.hpp index dc65bf628..ad7e1315a 100644 --- a/cpp/mrc/include/mrc/coroutines/test_scheduler.hpp +++ b/cpp/mrc/include/mrc/coroutines/test_scheduler.hpp @@ -40,18 +40,46 @@ class TestScheduler : public Scheduler std::chrono::time_point m_time = std::chrono::steady_clock::now(); public: + + /** + * @brief Enqueue's the coroutine handle to be resumed at the current logical time. + */ void resume(std::coroutine_handle<> handle) noexcept override; + /** + * Suspends the current function and enqueue's it to be resumed at the current logical time. + */ mrc::coroutines::Task<> yield() override; + /** + * Suspends the current function and enqueue's it to be resumed at the current logica time + the given duration. + */ mrc::coroutines::Task<> yield_for(std::chrono::milliseconds time) override; + /** + * Suspends the current function and enqueue's it to be resumed at the given logical time. + */ mrc::coroutines::Task<> yield_until(std::chrono::time_point time) override; + /** + * Immediately resumes the next-in-queue coroutine handle. + * + * @return true if more coroutines exist in the queue after resuming, false otherwise. + */ bool resume_next(); + /** + * Immediately resumes next-in-queue coroutines up to the current logical time + the given duration, in-order. + * + * @return true if more coroutines exist in the queue after resuming, false otherwise. + */ bool resume_for(std::chrono::milliseconds time); + /** + * Immediately resumes next-in-queue coroutines up to the given logical time. + * + * @return true if more coroutines exist in the queue after resuming, false otherwise. + */ bool resume_until(std::chrono::time_point time); }; From 89c299c24e6a3887ac1ef4198d0e31a699b6e270 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 15 Mar 2024 19:30:20 +0000 Subject: [PATCH 12/12] iwyu + copyright header --- .../include/mrc/coroutines/test_scheduler.hpp | 23 +++++++++++++++++-- .../src/public/coroutines/test_scheduler.cpp | 19 +++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/cpp/mrc/include/mrc/coroutines/test_scheduler.hpp b/cpp/mrc/include/mrc/coroutines/test_scheduler.hpp index ad7e1315a..ba2847415 100644 --- a/cpp/mrc/include/mrc/coroutines/test_scheduler.hpp +++ b/cpp/mrc/include/mrc/coroutines/test_scheduler.hpp @@ -1,8 +1,28 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #include "mrc/coroutines/scheduler.hpp" +#include "mrc/coroutines/task.hpp" #include #include #include +#include +#include #pragma once @@ -40,7 +60,6 @@ class TestScheduler : public Scheduler std::chrono::time_point m_time = std::chrono::steady_clock::now(); public: - /** * @brief Enqueue's the coroutine handle to be resumed at the current logical time. */ @@ -79,7 +98,7 @@ class TestScheduler : public Scheduler * Immediately resumes next-in-queue coroutines up to the given logical time. * * @return true if more coroutines exist in the queue after resuming, false otherwise. - */ + */ bool resume_until(std::chrono::time_point time); }; diff --git a/cpp/mrc/src/public/coroutines/test_scheduler.cpp b/cpp/mrc/src/public/coroutines/test_scheduler.cpp index b4c3dcfb4..0cc3ef130 100644 --- a/cpp/mrc/src/public/coroutines/test_scheduler.cpp +++ b/cpp/mrc/src/public/coroutines/test_scheduler.cpp @@ -1,5 +1,24 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #include "mrc/coroutines/test_scheduler.hpp" +#include + namespace mrc::coroutines { TestScheduler::Operation::Operation(TestScheduler* self, std::chrono::time_point time) :