Skip to content

Commit

Permalink
Share lock among callback registries
Browse files Browse the repository at this point in the history
  • Loading branch information
wch committed Apr 3, 2020
1 parent 2ab634d commit b6b93e0
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 46 deletions.
16 changes: 8 additions & 8 deletions src/callback_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ void testCallbackOrdering() {
}
}

CallbackRegistry::CallbackRegistry(int id)
: id(id), mutex(tct_mtx_recursive), condvar(mutex)
CallbackRegistry::CallbackRegistry(int id, Mutex* mutex, ConditionVariable* condvar)
: id(id), mutex(mutex), condvar(condvar)
{
ASSERT_MAIN_THREAD()
}
Expand All @@ -270,7 +270,7 @@ int CallbackRegistry::getId() const {

void CallbackRegistry::signal(bool recursive) {
Guard guard(mutex);
condvar.signal();
condvar->signal();

if (recursive && parent != nullptr) {
parent->signal();
Expand All @@ -284,7 +284,7 @@ uint64_t CallbackRegistry::add(Rcpp::Function func, double secs) {
Callback_sp cb = boost::make_shared<RcppFunctionCallback>(when, func);
Guard guard(mutex);
queue.insert(cb);
condvar.signal();
condvar->signal();

if (parent != nullptr) {
parent->signal(true);
Expand All @@ -298,7 +298,7 @@ uint64_t CallbackRegistry::add(void (*func)(void*), void* data, double secs) {
Callback_sp cb = boost::make_shared<BoostFunctionCallback>(when, boost::bind(func, data));
Guard guard(mutex);
queue.insert(cb);
condvar.signal();
condvar->signal();

if (parent != nullptr) {
parent->signal(true);
Expand Down Expand Up @@ -335,7 +335,7 @@ Optional<Timestamp> CallbackRegistry::nextTimestamp(bool recursive) const {

// Now check children
if (recursive) {
for (std::vector<boost::shared_ptr<CallbackRegistry>>::const_iterator it = children.begin();
for (std::vector<boost::shared_ptr<CallbackRegistry> >::const_iterator it = children.begin();
it != children.end();
++it)
{
Expand Down Expand Up @@ -372,7 +372,7 @@ bool CallbackRegistry::due(const Timestamp& time, bool recursive) const {

// Now check children
if (recursive) {
for (std::vector<boost::shared_ptr<CallbackRegistry>>::const_iterator it = children.begin();
for (std::vector<boost::shared_ptr<CallbackRegistry> >::const_iterator it = children.begin();
it != children.end();
++it)
{
Expand Down Expand Up @@ -421,7 +421,7 @@ bool CallbackRegistry::wait(double timeoutSecs, bool recursive) const {
if (waitFor > 2) {
waitFor = 2;
}
condvar.timedwait(waitFor);
condvar->timedwait(waitFor);
Rcpp::checkUserInterrupt();
}

Expand Down
12 changes: 8 additions & 4 deletions src/callback_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,15 @@ class CallbackRegistry {
// objects to be copied on the wrong thread, and even trigger an R GC event
// on the wrong thread. https://github.com/r-lib/later/issues/39
cbSet queue;
mutable Mutex mutex;
mutable ConditionVariable condvar;
Mutex* mutex;
ConditionVariable* condvar;

public:
CallbackRegistry(int id);
// The CallbackRegistry must be given a Mutex and ConditionVariable when
// initialized, because they are shared among the CallbackRegistry objects
// and the CallbackRegistryTable; they serve as a global lock. Note that the
// lifetime of these objects must be longer than the CallbackRegistry.
CallbackRegistry(int id, Mutex* mutex, ConditionVariable* condvar);
~CallbackRegistry();

int getId() const;
Expand Down Expand Up @@ -150,7 +154,7 @@ class CallbackRegistry {
// automatically running child loops. They should only be accessed and
// modified from the main thread.
boost::shared_ptr<CallbackRegistry> parent;
std::vector<boost::shared_ptr<CallbackRegistry>> children;
std::vector<boost::shared_ptr<CallbackRegistry> > children;

// To let another object signal this one. It will also signal its parent.
void signal(bool recursive = true);
Expand Down
18 changes: 10 additions & 8 deletions src/callback_registry_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <Rcpp.h>
#include <boost/shared_ptr.hpp>
#include <boost/make_shared.hpp>
#include "threadutils.h"
#include "debug.h"
#include "callback_registry.h"
Expand All @@ -21,19 +22,19 @@ using boost::make_shared;
//
class CallbackRegistryTable {
public:
CallbackRegistryTable() : mutex(tct_mtx_plain | tct_mtx_recursive) {
CallbackRegistryTable() : mutex(tct_mtx_plain | tct_mtx_recursive), condvar(mutex) {
}

bool exists(int id) {
Guard guard(mutex);
Guard guard(&mutex);
return (registries.find(id) != registries.end());
}

// Create a new CallbackRegistry. If parent_id is -1, then there is no parent.
void create(int id, int parent_id) {
ASSERT_MAIN_THREAD()
Guard guard(mutex);
shared_ptr<CallbackRegistry> registry = make_shared<CallbackRegistry>(id);
Guard guard(&mutex);
shared_ptr<CallbackRegistry> registry = make_shared<CallbackRegistry>(id, &mutex, &condvar);

if (exists(id)) {
Rcpp::stop("Can't create event loop %d because it already exists.", id);
Expand All @@ -55,7 +56,7 @@ class CallbackRegistryTable {
// the table, or if the target CallbackRegistry has already been deleted,
// then the shared_ptr is empty.
shared_ptr<CallbackRegistry> get(int id) {
Guard guard(mutex);
Guard guard(&mutex);
if (!exists(id)) {
return shared_ptr<CallbackRegistry>();
}
Expand All @@ -65,7 +66,7 @@ class CallbackRegistryTable {
}

bool remove(int id) {
Guard guard(mutex);
Guard guard(&mutex);

shared_ptr<CallbackRegistry> registry = get(id);
if (registry == nullptr) {
Expand All @@ -83,7 +84,7 @@ class CallbackRegistryTable {
shared_ptr<CallbackRegistry> parent = registry->parent;
if (parent != nullptr) {
// Remove this registry from the parent's list of children.
for (std::vector<shared_ptr<CallbackRegistry>>::iterator it = parent->children.begin();
for (std::vector<shared_ptr<CallbackRegistry> >::iterator it = parent->children.begin();
it != parent->children.end();
)
{
Expand All @@ -97,7 +98,7 @@ class CallbackRegistryTable {
}

// Tell the children that they no longer have a parent.
for (std::vector<boost::shared_ptr<CallbackRegistry>>::iterator it = registry->children.begin();
for (std::vector<boost::shared_ptr<CallbackRegistry> >::iterator it = registry->children.begin();
it != registry->children.end();
++it)
{
Expand All @@ -112,6 +113,7 @@ class CallbackRegistryTable {
private:
std::map<int, shared_ptr<CallbackRegistry> > registries;
Mutex mutex;
ConditionVariable condvar;
};


Expand Down
2 changes: 1 addition & 1 deletion src/later_posix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ size_t BUF_SIZE = 256;
void *buf;

void set_fd(bool ready) {
Guard g(m);
Guard g(&m);

if (ready != hot) {
if (ready) {
Expand Down
32 changes: 16 additions & 16 deletions src/threadutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ConditionVariable;
class Mutex : boost::noncopyable {
friend class ConditionVariable;
tct_mtx_t _m;

public:
// type must be one of:
//
Expand All @@ -32,17 +32,17 @@ class Mutex : boost::noncopyable {
throw std::runtime_error("Mutex creation failed");
}
}

virtual ~Mutex() {
tct_mtx_destroy(&_m);
}

void lock() {
if (tct_mtx_lock(&_m) != tct_thrd_success) {
throw std::runtime_error("Mutex failed to lock");
}
}

bool tryLock() {
int res = tct_mtx_trylock(&_m);
if (res == tct_thrd_success) {
Expand All @@ -53,7 +53,7 @@ class Mutex : boost::noncopyable {
throw std::runtime_error("Mutex failed to trylock");
}
}

void unlock() {
if (tct_mtx_unlock(&_m) != tct_thrd_success) {
throw std::runtime_error("Mutex failed to unlock");
Expand All @@ -63,12 +63,12 @@ class Mutex : boost::noncopyable {

class Guard : boost::noncopyable {
Mutex* _mutex;

public:
Guard(Mutex& mutex) : _mutex(&mutex) {
Guard(Mutex* mutex) : _mutex(mutex) {
_mutex->lock();
}

~Guard() {
_mutex->unlock();
}
Expand All @@ -77,7 +77,7 @@ class Guard : boost::noncopyable {
class ConditionVariable : boost::noncopyable {
tct_mtx_t* _m;
tct_cnd_t _c;

public:
ConditionVariable(Mutex& mutex) : _m(&mutex._m) {
// If time_t isn't integral, our addSeconds logic needs to change,
Expand All @@ -88,38 +88,38 @@ class ConditionVariable : boost::noncopyable {
// negative values for secs.
if (!boost::is_signed<time_t>::value)
throw std::runtime_error("Signed time_t type expected");

if (tct_cnd_init(&_c) != tct_thrd_success)
throw std::runtime_error("Condition variable failed to initialize");
}

virtual ~ConditionVariable() {
tct_cnd_destroy(&_c);
}

// Unblocks one thread (if any are waiting)
void signal() {
if (tct_cnd_signal(&_c) != tct_thrd_success)
throw std::runtime_error("Condition variable failed to signal");
}

// Unblocks all waiting threads
void broadcast() {
if (tct_cnd_broadcast(&_c) != tct_thrd_success)
throw std::runtime_error("Condition variable failed to broadcast");
}

void wait() {
if (tct_cnd_wait(&_c, _m) != tct_thrd_success)
throw std::runtime_error("Condition variable failed to wait");
}

bool timedwait(double timeoutSecs) {
timespec ts;
if (timespec_get(&ts, TIME_UTC) != TIME_UTC) {
throw std::runtime_error("timespec_get failed");
}

ts = addSeconds(ts, timeoutSecs);

int res = tct_cnd_timedwait(&_c, _m, &ts);
Expand Down
14 changes: 7 additions & 7 deletions src/timer_posix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ int Timer::bg_main_func(void* data) {
}

void Timer::bg_main() {
Guard guard(this->mutex);
Guard guard(&this->mutex);
while (true) {

// Guarded wait; we can't pass here until either the timer is stopped or we
// have a wait time.
while (!(this->stopped || this->wakeAt != boost::none)) {
this->cond.wait();
}

// We're stopped; return, which ends the thread.
if (this->stopped) {
return;
}

// The wake time has been set. There are three possibilities:
// 1. The wake time is in the past. Go ahead and execute now.
// 2. Wait for the wake time, but we're notified before time elapses.
Expand Down Expand Up @@ -58,7 +58,7 @@ Timer::~Timer() {
// on results in undefined behavior--on Fedora 25+ it hangs.
if (this->bgthread != boost::none) {
{
Guard guard(this->mutex);
Guard guard(&this->mutex);
this->stopped = true;
this->cond.signal();
}
Expand All @@ -68,15 +68,15 @@ Timer::~Timer() {
}

void Timer::set(const Timestamp& timestamp) {
Guard guard(this->mutex);
Guard guard(&this->mutex);

// If the thread has not yet been created, created it.
if (this->bgthread == boost::none) {
tct_thrd_t thread;
tct_thrd_create(&thread, &bg_main_func, this);
this->bgthread = thread;
}

this->wakeAt = timestamp;
this->cond.signal();
}
Expand Down
4 changes: 2 additions & 2 deletions src/timer_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ class Timer {
boost::optional<tct_thrd_t> bgthread;
boost::optional<Timestamp> wakeAt;
bool stopped;

static int bg_main_func(void*);
void bg_main();
public:
Timer(const boost::function<void ()>& callback);
virtual ~Timer();

// Schedules the timer to fire next at the specified time.
// If the timer is currently scheduled to fire, that will
// be overwritten with this one (the timer only tracks one
Expand Down

0 comments on commit b6b93e0

Please sign in to comment.