Skip to content

Commit

Permalink
Create general interface for asynchronous processing
Browse files Browse the repository at this point in the history
Will migrate to this from AudioNode::Processor soon,
as this can work for more situations than just audio signal flow, which is useful for other "fire and forget" tasks (like those from the GUI for example).
  • Loading branch information
sakertooth committed Jan 23, 2024
1 parent cb59802 commit ae3b333
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 0 deletions.
81 changes: 81 additions & 0 deletions include/AsyncWorkerPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* AsyncWorkerPool.h
*
* Copyright (c) 2024 saker <[email protected]>
*
* This file is part of LMMS - https://lmms.io
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program (see COPYING); if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301 USA.
*
*/

#ifndef LMMS_ASYNC_WORKER_POOL_H
#define LMMS_ASYNC_WORKER_POOL_H

#include <condition_variable>
#include <emmintrin.h>
#include <future>
#include <memory>
#include <queue>
#include <thread>
#include <vector>

namespace lmms {
class AsyncWorkerPool
{
public:
AsyncWorkerPool(unsigned int numWorkers = std::thread::hardware_concurrency());
~AsyncWorkerPool();

//! Enqueue function `fn` with arguments `args` to be ran asynchronously.
//! Workers are not notified to do the work enqueued until `run` or `runAsync` are called.
template <typename Fn, typename... Args>
auto enqueue(Fn fn, Args&&... args) -> std::future<std::invoke_result_t<Fn, Args...>>
{
const auto lock = std::lock_guard{m_runMutex};
using Task = std::packaged_task<std::invoke_result_t<Fn, Args...>()>;

// TODO C++20: Use initialized lambda pack captures
auto work = [fn = std::move(fn), args = std::make_tuple(std::forward<Args>(args)...)] {
return std::apply(fn, std::move(args));
};

auto task = std::make_shared<Task>(work);
m_tasks.emplace([task] { return (*task)(); });

return task->get_future();
}

//! Starts the worker pool and blocks until no more tasks can be processed.
void run();

//! Starts the worker pool but returns immediately.
void runAsync();

//! Wait for the worker pool to finish processing any tasks.
void wait();

private:
void process();
std::queue<std::function<void()>> m_tasks;
std::vector<std::thread> m_workers;
std::condition_variable m_runCond, m_waitCond;
std::mutex m_runMutex;
bool m_done = false;
};
} // namespace lmms

#endif // LMMS_ASYNC_WORKER_POOL_H
87 changes: 87 additions & 0 deletions src/core/AsyncWorkerPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* AsyncWorkerPool.cpp
*
* Copyright (c) 2024 saker <[email protected]>
*
* This file is part of LMMS - https://lmms.io
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program (see COPYING); if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301 USA.
*
*/

#include "AsyncWorkerPool.h"

namespace lmms {

AsyncWorkerPool::AsyncWorkerPool(unsigned int numWorkers)
{
for (unsigned int i = 0; i < numWorkers; ++i)
{
m_tasks.emplace([this] { process(); });
}
}

AsyncWorkerPool::~AsyncWorkerPool()
{
{
const auto lock = std::lock_guard{m_runMutex};
m_done = true;
}

m_runCond.notify_all();
for (auto& worker : m_workers)
{
worker.join();
}
}

void AsyncWorkerPool::process()
{
while (!m_done)
{
auto task = std::function<void()>{};
{
auto lock = std::unique_lock{m_runMutex};
m_runCond.wait(lock, [this] { return !m_tasks.empty() || m_done; });
if (m_done) { break; }

task = std::move(m_tasks.front());
m_tasks.pop();
}

task();
m_waitCond.notify_one();
}
}

void AsyncWorkerPool::run()
{
m_runCond.notify_all();
wait();
}

void AsyncWorkerPool::runAsync()
{
m_runCond.notify_all();
}

void AsyncWorkerPool::wait()
{
auto lock = std::unique_lock{m_runMutex};
m_waitCond.wait(lock, [this] { return m_tasks.empty(); });
}

} // namespace lmms
1 change: 1 addition & 0 deletions src/core/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
set(LMMS_SRCS
${LMMS_SRCS}

core/AsyncWorkerPool.cpp
core/AudioEngine.cpp
core/AudioEngineProfiler.cpp
core/AudioResampler.cpp
Expand Down

0 comments on commit ae3b333

Please sign in to comment.