Skip to content

Commit

Permalink
kernel: Remove dependency on CScheduler
Browse files Browse the repository at this point in the history
By defining a virtual interface class for the scheduler client, users of
the kernel can now define their own event consuming infrastructure,
without having to spawn threads or rely on the scheduler design.

Removing CScheduler also allows removing the thread and
exception modules from the kernel library.
  • Loading branch information
TheCharlatan committed Jan 18, 2024
1 parent 3a62410 commit 4780a10
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 31 deletions.
4 changes: 1 addition & 3 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ BITCOIN_CORE_H = \
util/spanparsing.h \
util/string.h \
util/syserror.h \
util/task_queue_interface.h \
util/thread.h \
util/threadinterrupt.h \
util/threadnames.h \
Expand Down Expand Up @@ -960,7 +961,6 @@ libbitcoinkernel_la_SOURCES = \
pubkey.cpp \
random.cpp \
randomenv.cpp \
scheduler.cpp \
script/interpreter.cpp \
script/script.cpp \
script/script_error.cpp \
Expand All @@ -977,7 +977,6 @@ libbitcoinkernel_la_SOURCES = \
util/batchpriority.cpp \
util/chaintype.cpp \
util/check.cpp \
util/exception.cpp \
util/fs.cpp \
util/fs_helpers.cpp \
util/hasher.cpp \
Expand All @@ -988,7 +987,6 @@ libbitcoinkernel_la_SOURCES = \
util/strencodings.cpp \
util/string.cpp \
util/syserror.cpp \
util/thread.cpp \
util/threadnames.cpp \
util/time.cpp \
util/tokenpipe.cpp \
Expand Down
29 changes: 19 additions & 10 deletions src/bitcoin-chainstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
#include <node/caches.h>
#include <node/chainstate.h>
#include <random.h>
#include <scheduler.h>
#include <script/sigcache.h>
#include <util/chaintype.h>
#include <util/fs.h>
#include <util/thread.h>
#include <util/task_queue_interface.h>
#include <validation.h>
#include <validationinterface.h>

Expand Down Expand Up @@ -68,16 +67,27 @@ int main(int argc, char* argv[])
Assert(InitSignatureCache(validation_cache_sizes.signature_cache_bytes));
Assert(InitScriptExecutionCache(validation_cache_sizes.script_execution_cache_bytes));

class ImmediateTaskQueue : public TaskQueueInterface
{
public:
ImmediateTaskQueue() {}
void AddToProcessQueue(std::function<void()> func) override
{
func();
}

// SETUP: Scheduling and Background Signals
CScheduler scheduler{};
// Start the lightweight task scheduler thread
scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); });
void EmptyQueue() override
{
return;
}

ValidationSignals validation_signals{scheduler};
size_t CallbacksPending() override
{
return 0;
}
};

// Gather some entropy once per minute.
scheduler.scheduleEvery(RandAddPeriodic, std::chrono::minutes{1});
ValidationSignals validation_signals{std::make_unique<ImmediateTaskQueue>()};

class KernelNotifications : public kernel::Notifications
{
Expand Down Expand Up @@ -288,7 +298,6 @@ int main(int argc, char* argv[])
epilogue:
// Without this precise shutdown sequence, there will be a lot of nullptr
// dereferencing and UB.
scheduler.stop();
if (chainman.m_thread_load.joinable()) chainman.m_thread_load.join();

validation_signals.FlushBackgroundCallbacks();
Expand Down
3 changes: 2 additions & 1 deletion src/bitcoind.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <node/context.h>
#include <node/interface_ui.h>
#include <noui.h>
#include <scheduler.h>
#include <util/check.h>
#include <util/exception.h>
#include <util/strencodings.h>
Expand Down Expand Up @@ -185,7 +186,7 @@ static bool AppInit(NodeContext& node)
}

node.scheduler = std::make_unique<CScheduler>();
node.validation_signals = std::make_unique<ValidationSignals>(*node.scheduler);
node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SingleThreadedSchedulerClient>(*node.scheduler));
node.kernel = std::make_unique<kernel::Context>();
if (!AppInitSanityChecks(*node.kernel))
{
Expand Down
3 changes: 2 additions & 1 deletion src/node/interfaces.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <primitives/transaction.h>
#include <rpc/protocol.h>
#include <rpc/server.h>
#include <scheduler.h>
#include <support/allocators/secure.h>
#include <sync.h>
#include <txmempool.h>
Expand Down Expand Up @@ -100,7 +101,7 @@ class NodeImpl : public Node
if (!AppInitParameterInteraction(args())) return false;

m_context->scheduler = std::make_unique<CScheduler>();
m_context->validation_signals = std::make_unique<ValidationSignals>(*m_context->scheduler);
m_context->validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SingleThreadedSchedulerClient>(*m_context->scheduler));
m_context->kernel = std::make_unique<kernel::Context>();
if (!AppInitSanityChecks(*m_context->kernel)) return false;

Expand Down
9 changes: 5 additions & 4 deletions src/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <attributes.h>
#include <sync.h>
#include <threadsafety.h>
#include <util/task_queue_interface.h>

#include <chrono>
#include <condition_variable>
Expand Down Expand Up @@ -120,7 +121,7 @@ class CScheduler
* B() will be able to observe all of the effects of callback A() which executed
* before it.
*/
class SingleThreadedSchedulerClient
class SingleThreadedSchedulerClient : public TaskQueueInterface
{
private:
CScheduler& m_scheduler;
Expand All @@ -141,15 +142,15 @@ class SingleThreadedSchedulerClient
* Practically, this means that callbacks can behave as if they are executed
* in order by a single thread.
*/
void AddToProcessQueue(std::function<void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
void AddToProcessQueue(std::function<void()> func) override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);

/**
* Processes all remaining queue members on the calling thread, blocking until queue is empty
* Must be called after the CScheduler has no remaining processing threads!
*/
void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
void EmptyQueue() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);

size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
size_t CallbacksPending() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
};

#endif // BITCOIN_SCHEDULER_H
2 changes: 1 addition & 1 deletion src/test/util/setup_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ BasicTestingSetup::BasicTestingSetup(const ChainType chainType, const std::vecto
AppInitParameterInteraction(*m_node.args);
LogInstance().StartLogging();
m_node.scheduler = std::make_unique<CScheduler>();
m_node.validation_signals = std::make_unique<ValidationSignals>(*m_node.scheduler);
m_node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SingleThreadedSchedulerClient>(*m_node.scheduler));
m_node.kernel = std::make_unique<kernel::Context>();
SetupEnvironment();

Expand Down
35 changes: 35 additions & 0 deletions src/util/task_queue_interface.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2023-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#ifndef BITCOIN_TASK_QUEUE_INTERFACE_QUEUE_H
#define BITCOIN_TASK_QUEUE_INTERFACE_QUEUE_H

#include <cstddef>
#include <functional>

class TaskQueueInterface
{
public:
virtual ~TaskQueueInterface() {}

/**
* This is called for each subscriber on each validation interface event.
* The callback can either be queued for later/asynchronous/threaded
* processing, or be executed immediately for synchronous processing.
* Synchronous processing will block validation.
*/
virtual void AddToProcessQueue(std::function<void()> func) = 0;

/**
* This is called to force the processing of all queued events.
*/
virtual void EmptyQueue() = 0;

/**
* Returns the number of queued events.
*/
virtual size_t CallbacksPending() = 0;
};

#endif // BITCOIN_TASK_QUEUE_INTERFACE_QUEUE_H
15 changes: 7 additions & 8 deletions src/validationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@

#include <validationinterface.h>

#include <attributes.h>
#include <chain.h>
#include <consensus/validation.h>
#include <kernel/chain.h>
#include <kernel/mempool_entry.h>
#include <logging.h>
#include <primitives/block.h>
#include <primitives/transaction.h>
#include <scheduler.h>
#include <util/check.h>

#include <future>
#include <unordered_map>
Expand Down Expand Up @@ -61,17 +60,17 @@ template<typename F> void ValidationSignalsImpl::Iterate(F&& f) EXCLUSIVE_LOCKS_
}
}

ValidationSignals::ValidationSignals(CScheduler& scheduler)
: m_schedulerClient{scheduler} {}
ValidationSignals::ValidationSignals(std::unique_ptr<TaskQueueInterface> schedulerclient)
: m_schedulerClient{std::move(Assert(schedulerclient))} {}

void ValidationSignals::FlushBackgroundCallbacks()
{
m_schedulerClient.EmptyQueue();
m_schedulerClient->EmptyQueue();
}

size_t ValidationSignals::CallbacksPending()
{
return m_schedulerClient.CallbacksPending();
return m_schedulerClient->CallbacksPending();
}

void ValidationSignals::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
Expand Down Expand Up @@ -105,7 +104,7 @@ void ValidationSignals::UnregisterAllValidationInterfaces()

void ValidationSignals::CallFunctionInValidationInterfaceQueue(std::function<void()> func)
{
m_schedulerClient.AddToProcessQueue(std::move(func));
m_schedulerClient->AddToProcessQueue(std::move(func));
}

void ValidationSignals::SyncWithValidationInterfaceQueue()
Expand All @@ -127,7 +126,7 @@ void ValidationSignals::SyncWithValidationInterfaceQueue()
do { \
auto local_name = (name); \
LOG_EVENT("Enqueuing " fmt, local_name, __VA_ARGS__); \
m_schedulerClient.AddToProcessQueue([=] { \
m_schedulerClient->AddToProcessQueue([=] { \
LOG_EVENT(fmt, local_name, __VA_ARGS__); \
event(); \
}); \
Expand Down
7 changes: 4 additions & 3 deletions src/validationinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
#include <kernel/chain.h>
#include <kernel/cs_main.h>
#include <primitives/transaction.h> // CTransaction(Ref)
#include <scheduler.h>
#include <sync.h>
#include <util/task_queue_interface.h>

#include <cstddef>
#include <cstdint>
#include <functional>
#include <list>
#include <memory>
#include <unordered_map>
#include <utility>

class BlockValidationState;
class CBlock;
Expand Down Expand Up @@ -198,10 +199,10 @@ class ValidationSignals {
// We are not allowed to assume the scheduler only runs in one thread,
// but must ensure all callbacks happen in-order, so we end up creating
// our own queue here :(
SingleThreadedSchedulerClient m_schedulerClient;
std::unique_ptr<TaskQueueInterface> m_schedulerClient;

public:
explicit ValidationSignals(CScheduler& scheduler LIFETIMEBOUND);
explicit ValidationSignals(std::unique_ptr<TaskQueueInterface> schedulerclient);

/** Call any remaining callbacks on the calling thread */
void FlushBackgroundCallbacks();
Expand Down

0 comments on commit 4780a10

Please sign in to comment.