Skip to content

Commit

Permalink
kernel: Remove dependency on CScheduler
Browse files Browse the repository at this point in the history
By defining a virtual interface class for the scheduler client, users of
the kernel can now define their own event consuming infrastructure,
without having to spawn threads or rely on the scheduler design.

Removing CScheduler also allows removing the thread and
exception modules from the kernel library.
  • Loading branch information
TheCharlatan committed Jan 18, 2024
1 parent 3a62410 commit f9758d7
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 34 deletions.
7 changes: 4 additions & 3 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ BITCOIN_CORE_H = \
util/golombrice.h \
util/hash_type.h \
util/hasher.h \
util/immediate_task_queue.h \
util/insert.h \
util/macros.h \
util/message.h \
Expand All @@ -321,6 +322,7 @@ BITCOIN_CORE_H = \
util/spanparsing.h \
util/string.h \
util/syserror.h \
util/task_queue_interface.h \
util/thread.h \
util/threadinterrupt.h \
util/threadnames.h \
Expand Down Expand Up @@ -739,6 +741,7 @@ libbitcoin_util_a_SOURCES = \
util/fs.cpp \
util/fs_helpers.cpp \
util/hasher.cpp \
util/immediate_task_queue.cpp \
util/sock.cpp \
util/syserror.cpp \
util/message.cpp \
Expand Down Expand Up @@ -960,7 +963,6 @@ libbitcoinkernel_la_SOURCES = \
pubkey.cpp \
random.cpp \
randomenv.cpp \
scheduler.cpp \
script/interpreter.cpp \
script/script.cpp \
script/script_error.cpp \
Expand All @@ -977,18 +979,17 @@ libbitcoinkernel_la_SOURCES = \
util/batchpriority.cpp \
util/chaintype.cpp \
util/check.cpp \
util/exception.cpp \
util/fs.cpp \
util/fs_helpers.cpp \
util/hasher.cpp \
util/immediate_task_queue.cpp \
util/moneystr.cpp \
util/rbf.cpp \
util/serfloat.cpp \
util/signalinterrupt.cpp \
util/strencodings.cpp \
util/string.cpp \
util/syserror.cpp \
util/thread.cpp \
util/threadnames.cpp \
util/time.cpp \
util/tokenpipe.cpp \
Expand Down
16 changes: 3 additions & 13 deletions src/bitcoin-chainstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
#include <node/caches.h>
#include <node/chainstate.h>
#include <random.h>
#include <scheduler.h>
#include <script/sigcache.h>
#include <util/chaintype.h>
#include <util/fs.h>
#include <util/thread.h>
#include <util/immediate_task_queue.h>
#include <util/task_queue_interface.h>
#include <validation.h>
#include <validationinterface.h>

Expand Down Expand Up @@ -68,16 +68,7 @@ int main(int argc, char* argv[])
Assert(InitSignatureCache(validation_cache_sizes.signature_cache_bytes));
Assert(InitScriptExecutionCache(validation_cache_sizes.script_execution_cache_bytes));


// SETUP: Scheduling and Background Signals
CScheduler scheduler{};
// Start the lightweight task scheduler thread
scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); });

ValidationSignals validation_signals{scheduler};

// Gather some entropy once per minute.
scheduler.scheduleEvery(RandAddPeriodic, std::chrono::minutes{1});
ValidationSignals validation_signals{std::make_unique<util::ImmediateTaskQueue>()};

class KernelNotifications : public kernel::Notifications
{
Expand Down Expand Up @@ -288,7 +279,6 @@ int main(int argc, char* argv[])
epilogue:
// Without this precise shutdown sequence, there will be a lot of nullptr
// dereferencing and UB.
scheduler.stop();
if (chainman.m_thread_load.joinable()) chainman.m_thread_load.join();

validation_signals.FlushBackgroundCallbacks();
Expand Down
3 changes: 2 additions & 1 deletion src/bitcoind.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <node/context.h>
#include <node/interface_ui.h>
#include <noui.h>
#include <scheduler.h>
#include <util/check.h>
#include <util/exception.h>
#include <util/strencodings.h>
Expand Down Expand Up @@ -185,7 +186,7 @@ static bool AppInit(NodeContext& node)
}

node.scheduler = std::make_unique<CScheduler>();
node.validation_signals = std::make_unique<ValidationSignals>(*node.scheduler);
node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SingleThreadedSchedulerClient>(*node.scheduler));
node.kernel = std::make_unique<kernel::Context>();
if (!AppInitSanityChecks(*node.kernel))
{
Expand Down
3 changes: 2 additions & 1 deletion src/node/interfaces.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <primitives/transaction.h>
#include <rpc/protocol.h>
#include <rpc/server.h>
#include <scheduler.h>
#include <support/allocators/secure.h>
#include <sync.h>
#include <txmempool.h>
Expand Down Expand Up @@ -100,7 +101,7 @@ class NodeImpl : public Node
if (!AppInitParameterInteraction(args())) return false;

m_context->scheduler = std::make_unique<CScheduler>();
m_context->validation_signals = std::make_unique<ValidationSignals>(*m_context->scheduler);
m_context->validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SingleThreadedSchedulerClient>(*m_context->scheduler));
m_context->kernel = std::make_unique<kernel::Context>();
if (!AppInitSanityChecks(*m_context->kernel)) return false;

Expand Down
9 changes: 5 additions & 4 deletions src/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <attributes.h>
#include <sync.h>
#include <threadsafety.h>
#include <util/task_queue_interface.h>

#include <chrono>
#include <condition_variable>
Expand Down Expand Up @@ -120,7 +121,7 @@ class CScheduler
* B() will be able to observe all of the effects of callback A() which executed
* before it.
*/
class SingleThreadedSchedulerClient
class SingleThreadedSchedulerClient : public util::TaskQueueInterface
{
private:
CScheduler& m_scheduler;
Expand All @@ -141,15 +142,15 @@ class SingleThreadedSchedulerClient
* Practically, this means that callbacks can behave as if they are executed
* in order by a single thread.
*/
void AddToProcessQueue(std::function<void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
void AddToProcessQueue(std::function<void()> func) override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);

/**
* Processes all remaining queue members on the calling thread, blocking until queue is empty
* Must be called after the CScheduler has no remaining processing threads!
*/
void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
void EmptyQueue() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);

size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
size_t CallbacksPending() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
};

#endif // BITCOIN_SCHEDULER_H
2 changes: 1 addition & 1 deletion src/test/util/setup_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ BasicTestingSetup::BasicTestingSetup(const ChainType chainType, const std::vecto
AppInitParameterInteraction(*m_node.args);
LogInstance().StartLogging();
m_node.scheduler = std::make_unique<CScheduler>();
m_node.validation_signals = std::make_unique<ValidationSignals>(*m_node.scheduler);
m_node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SingleThreadedSchedulerClient>(*m_node.scheduler));
m_node.kernel = std::make_unique<kernel::Context>();
SetupEnvironment();

Expand Down
39 changes: 39 additions & 0 deletions src/util/task_queue_interface.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) 2023-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#ifndef BITCOIN_UTIL_TASK_QUEUE_INTERFACE_H
#define BITCOIN_UTIL_TASK_QUEUE_INTERFACE_H

#include <cstddef>
#include <functional>

namespace util {

class TaskQueueInterface
{
public:
virtual ~TaskQueueInterface() {}

/**
* This is called for each subscriber on each validation interface event.
* The callback can either be queued for later/asynchronous/threaded
* processing, or be executed immediately for synchronous processing.
* Synchronous processing will block validation.
*/
virtual void AddToProcessQueue(std::function<void()> func) = 0;

/**
* This is called to force the processing of all queued events.
*/
virtual void EmptyQueue() = 0;

/**
* Returns the number of queued events.
*/
virtual size_t CallbacksPending() = 0;
};

} // namespace util

#endif // BITCOIN_UTIL_TASK_QUEUE_INTERFACE_H
15 changes: 7 additions & 8 deletions src/validationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@

#include <validationinterface.h>

#include <attributes.h>
#include <chain.h>
#include <consensus/validation.h>
#include <kernel/chain.h>
#include <kernel/mempool_entry.h>
#include <logging.h>
#include <primitives/block.h>
#include <primitives/transaction.h>
#include <scheduler.h>
#include <util/check.h>

#include <future>
#include <unordered_map>
Expand Down Expand Up @@ -61,17 +60,17 @@ template<typename F> void ValidationSignalsImpl::Iterate(F&& f) EXCLUSIVE_LOCKS_
}
}

ValidationSignals::ValidationSignals(CScheduler& scheduler)
: m_schedulerClient{scheduler} {}
ValidationSignals::ValidationSignals(std::unique_ptr<util::TaskQueueInterface> schedulerclient)
: m_schedulerClient{std::move(Assert(schedulerclient))} {}

void ValidationSignals::FlushBackgroundCallbacks()
{
m_schedulerClient.EmptyQueue();
m_schedulerClient->EmptyQueue();
}

size_t ValidationSignals::CallbacksPending()
{
return m_schedulerClient.CallbacksPending();
return m_schedulerClient->CallbacksPending();
}

void ValidationSignals::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
Expand Down Expand Up @@ -105,7 +104,7 @@ void ValidationSignals::UnregisterAllValidationInterfaces()

void ValidationSignals::CallFunctionInValidationInterfaceQueue(std::function<void()> func)
{
m_schedulerClient.AddToProcessQueue(std::move(func));
m_schedulerClient->AddToProcessQueue(std::move(func));
}

void ValidationSignals::SyncWithValidationInterfaceQueue()
Expand All @@ -127,7 +126,7 @@ void ValidationSignals::SyncWithValidationInterfaceQueue()
do { \
auto local_name = (name); \
LOG_EVENT("Enqueuing " fmt, local_name, __VA_ARGS__); \
m_schedulerClient.AddToProcessQueue([=] { \
m_schedulerClient->AddToProcessQueue([=] { \
LOG_EVENT(fmt, local_name, __VA_ARGS__); \
event(); \
}); \
Expand Down
7 changes: 4 additions & 3 deletions src/validationinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
#include <kernel/chain.h>
#include <kernel/cs_main.h>
#include <primitives/transaction.h> // CTransaction(Ref)
#include <scheduler.h>
#include <sync.h>
#include <util/task_queue_interface.h>

#include <cstddef>
#include <cstdint>
#include <functional>
#include <list>
#include <memory>
#include <unordered_map>
#include <utility>

class BlockValidationState;
class CBlock;
Expand Down Expand Up @@ -198,10 +199,10 @@ class ValidationSignals {
// We are not allowed to assume the scheduler only runs in one thread,
// but must ensure all callbacks happen in-order, so we end up creating
// our own queue here :(
SingleThreadedSchedulerClient m_schedulerClient;
std::unique_ptr<util::TaskQueueInterface> m_schedulerClient;

public:
explicit ValidationSignals(CScheduler& scheduler LIFETIMEBOUND);
explicit ValidationSignals(std::unique_ptr<util::TaskQueueInterface> schedulerclient);

/** Call any remaining callbacks on the calling thread */
void FlushBackgroundCallbacks();
Expand Down

0 comments on commit f9758d7

Please sign in to comment.