Skip to content

Commit

Permalink
Guarantee FIFO ordering if timestamps are identical.
Browse files Browse the repository at this point in the history
On virtualbox (where system timer seems to have less resolution, perhaps?) we
were seeing native later calls being executed in a different order than they
were scheduled, due to the timestamps being identical and the Callback class
not knowing how to break ties. Now we're keeping count with an integer.
  • Loading branch information
jcheng5 committed Sep 14, 2018
1 parent 0a6980c commit 5a32279
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 13 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Imports:
rlang
LinkingTo: Rcpp, BH
Roxygen: list(markdown = TRUE)
RoxygenNote: 6.0.1
RoxygenNote: 6.1.0
Suggests:
knitr,
rmarkdown,
Expand Down
4 changes: 4 additions & 0 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Generated by using Rcpp::compileAttributes() -> do not edit by hand
# Generator token: 10BE3573-1514-4C36-9D1C-5A225CD40393

testCallbackOrdering <- function() {
invisible(.Call('_later_testCallbackOrdering', PACKAGE = 'later'))
}

ensureInitialized <- function() {
invisible(.Call('_later_ensureInitialized', PACKAGE = 'later'))
}
Expand Down
9 changes: 9 additions & 0 deletions src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@

using namespace Rcpp;

// testCallbackOrdering
void testCallbackOrdering();
RcppExport SEXP _later_testCallbackOrdering() {
BEGIN_RCPP
Rcpp::RNGScope rcpp_rngScope_gen;
testCallbackOrdering();
return R_NilValue;
END_RCPP
}
// ensureInitialized
void ensureInitialized();
RcppExport SEXP _later_ensureInitialized() {
Expand Down
37 changes: 37 additions & 0 deletions src/callback_registry.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,46 @@
#include <boost/atomic.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/make_shared.hpp>
#include <vector>
#include "callback_registry.h"
#include "debug.h"

boost::atomic<unsigned long> nextCallbackNum(0);

Callback::Callback(Timestamp when, Task func) : when(when), func(func) {
this->callbackNum = nextCallbackNum++;
}

// [[Rcpp::export]]
void testCallbackOrdering() {
std::vector<Callback> callbacks;
Timestamp ts;
Task func;
for (size_t i = 0; i < 100; i++) {
callbacks.push_back(Callback(ts, func));
}
for (size_t i = 1; i < 100; i++) {
if (callbacks[i] < callbacks[i-1]) {
::Rf_error("Callback ordering is broken [1]");
}
if (!(callbacks[i] > callbacks[i-1])) {
::Rf_error("Callback ordering is broken [2]");
}
if (callbacks[i-1] > callbacks[i]) {
::Rf_error("Callback ordering is broken [3]");
}
if (!(callbacks[i-1] < callbacks[i])) {
::Rf_error("Callback ordering is broken [4]");
}
}
for (size_t i = 100; i > 1; i--) {
if (callbacks[i-1] < callbacks[i-2]) {
::Rf_error("Callback ordering is broken [2]");
}
}
}

CallbackRegistry::CallbackRegistry() : mutex(mtx_recursive), condvar(mutex) {
}

Expand Down
15 changes: 8 additions & 7 deletions src/callback_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <Rcpp.h>
#include <queue>
#include <boost/operators.hpp>
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include "timestamp.h"
Expand All @@ -11,17 +12,14 @@

typedef boost::function0<void> Task;

class Callback {
class Callback : boost::operators<Callback> {

public:
Callback(Timestamp when, Task func) : when(when), func(func) {}
Callback(Timestamp when, Task func);

bool operator<(const Callback& other) const {
return this->when < other.when;
}

bool operator>(const Callback& other) const {
return this->when > other.when;
return this->when < other.when ||
(!(this->when > other.when) && this->callbackNum < other.callbackNum);
}

void operator()() const {
Expand All @@ -32,6 +30,9 @@ class Callback {

private:
Task func;
// Used to break ties when comparing to a callback that has precisely the same
// timestamp
long callbackNum;
};

typedef boost::shared_ptr<Callback> Callback_sp;
Expand Down
12 changes: 7 additions & 5 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ extern SEXP _later_execCallbacks(SEXP);
extern SEXP _later_idle();
extern SEXP _later_execLater(SEXP, SEXP);
extern SEXP _later_next_op_secs();
extern SEXP _later_testCallbackOrdering();

static const R_CallMethodDef CallEntries[] = {
{"_later_ensureInitialized", (DL_FUNC) &_later_ensureInitialized, 0},
{"_later_execCallbacks", (DL_FUNC) &_later_execCallbacks, 1},
{"_later_idle", (DL_FUNC) &_later_idle, 0},
{"_later_execLater", (DL_FUNC) &_later_execLater, 2},
{"_later_next_op_secs", (DL_FUNC) &_later_next_op_secs, 0},
{"_later_ensureInitialized", (DL_FUNC) &_later_ensureInitialized, 0},
{"_later_execCallbacks", (DL_FUNC) &_later_execCallbacks, 1},
{"_later_idle", (DL_FUNC) &_later_idle, 0},
{"_later_execLater", (DL_FUNC) &_later_execLater, 2},
{"_later_next_op_secs", (DL_FUNC) &_later_next_op_secs, 0},
{"_later_testCallbackOrdering", (DL_FUNC) &_later_testCallbackOrdering, 0},
{NULL, NULL, 0}
};

Expand Down
4 changes: 4 additions & 0 deletions tests/testthat/test-run_now.R
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,7 @@ test_that("run_now wakes up when a background thread calls later()", {
expect_lt(as.numeric(x[["elapsed"]]), 1.25)
expect_true(result)
})

test_that("When callbacks have tied timestamps, they respect order of creation", {
expect_error(testCallbackOrdering(), NA)
})

0 comments on commit 5a32279

Please sign in to comment.