Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guarantee FIFO ordering if timestamps are identical. #69

Merged
merged 4 commits into from
Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Imports:
rlang
LinkingTo: Rcpp, BH
Roxygen: list(markdown = TRUE)
RoxygenNote: 6.0.1
RoxygenNote: 6.1.0
Suggests:
knitr,
rmarkdown,
Expand Down
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## later 0.7.5

* Fixed issue where the order of callbacks scheduled by native later::later could be nondeterministic if they are scheduled too quickly. This was because callbacks were sorted by the time at which they come due, which could be identical. Later now uses the order of insertion as a tiebreaker. [PR #69](https://github.com/r-lib/later/pull/69)

## later 0.7.4

* Fixed issue [#45](https://github.com/r-lib/later/issues/45) and [#63](https://github.com/r-lib/later/issues/63): glibc 2.28 and musl (used on Arch and Alpine Linux) added support for C11-style threads.h, which masked functions from the tinycthread library used by later. Later now detects support for threads.h and uses it if available; otherwise it uses tinycthread. [PR #64](https://github.com/r-lib/later/pull/64)
Expand Down
4 changes: 4 additions & 0 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Generated by using Rcpp::compileAttributes() -> do not edit by hand
# Generator token: 10BE3573-1514-4C36-9D1C-5A225CD40393

testCallbackOrdering <- function() {
invisible(.Call('_later_testCallbackOrdering', PACKAGE = 'later'))
}

ensureInitialized <- function() {
invisible(.Call('_later_ensureInitialized', PACKAGE = 'later'))
}
Expand Down
9 changes: 9 additions & 0 deletions src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@

using namespace Rcpp;

// testCallbackOrdering
void testCallbackOrdering();
RcppExport SEXP _later_testCallbackOrdering() {
BEGIN_RCPP
Rcpp::RNGScope rcpp_rngScope_gen;
testCallbackOrdering();
return R_NilValue;
END_RCPP
}
// ensureInitialized
void ensureInitialized();
RcppExport SEXP _later_ensureInitialized() {
Expand Down
37 changes: 37 additions & 0 deletions src/callback_registry.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,46 @@
#include <boost/atomic.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/make_shared.hpp>
#include <vector>
#include "callback_registry.h"
#include "debug.h"

boost::atomic<unsigned long> nextCallbackNum(0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long long


Callback::Callback(Timestamp when, Task func) : when(when), func(func) {
this->callbackNum = nextCallbackNum++;
}

// [[Rcpp::export]]
void testCallbackOrdering() {
std::vector<Callback> callbacks;
Timestamp ts;
Task func;
for (size_t i = 0; i < 100; i++) {
callbacks.push_back(Callback(ts, func));
}
for (size_t i = 1; i < 100; i++) {
if (callbacks[i] < callbacks[i-1]) {
::Rf_error("Callback ordering is broken [1]");
}
if (!(callbacks[i] > callbacks[i-1])) {
::Rf_error("Callback ordering is broken [2]");
}
if (callbacks[i-1] > callbacks[i]) {
::Rf_error("Callback ordering is broken [3]");
}
if (!(callbacks[i-1] < callbacks[i])) {
::Rf_error("Callback ordering is broken [4]");
}
}
for (size_t i = 100; i > 1; i--) {
if (callbacks[i-1] < callbacks[i-2]) {
::Rf_error("Callback ordering is broken [2]");
}
}
}

CallbackRegistry::CallbackRegistry() : mutex(mtx_recursive), condvar(mutex) {
}

Expand Down
15 changes: 8 additions & 7 deletions src/callback_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <Rcpp.h>
#include <queue>
#include <boost/operators.hpp>
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include "timestamp.h"
Expand All @@ -11,17 +12,14 @@

typedef boost::function0<void> Task;

class Callback {
class Callback : boost::operators<Callback> {

public:
Callback(Timestamp when, Task func) : when(when), func(func) {}
Callback(Timestamp when, Task func);

bool operator<(const Callback& other) const {
return this->when < other.when;
}

bool operator>(const Callback& other) const {
return this->when > other.when;
return this->when < other.when ||
(!(this->when > other.when) && this->callbackNum < other.callbackNum);
}

void operator()() const {
Expand All @@ -32,6 +30,9 @@ class Callback {

private:
Task func;
// Used to break ties when comparing to a callback that has precisely the same
// timestamp
long callbackNum;
};

typedef boost::shared_ptr<Callback> Callback_sp;
Expand Down
12 changes: 7 additions & 5 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ extern SEXP _later_execCallbacks(SEXP);
extern SEXP _later_idle();
extern SEXP _later_execLater(SEXP, SEXP);
extern SEXP _later_next_op_secs();
extern SEXP _later_testCallbackOrdering();

static const R_CallMethodDef CallEntries[] = {
{"_later_ensureInitialized", (DL_FUNC) &_later_ensureInitialized, 0},
{"_later_execCallbacks", (DL_FUNC) &_later_execCallbacks, 1},
{"_later_idle", (DL_FUNC) &_later_idle, 0},
{"_later_execLater", (DL_FUNC) &_later_execLater, 2},
{"_later_next_op_secs", (DL_FUNC) &_later_next_op_secs, 0},
{"_later_ensureInitialized", (DL_FUNC) &_later_ensureInitialized, 0},
{"_later_execCallbacks", (DL_FUNC) &_later_execCallbacks, 1},
{"_later_idle", (DL_FUNC) &_later_idle, 0},
{"_later_execLater", (DL_FUNC) &_later_execLater, 2},
{"_later_next_op_secs", (DL_FUNC) &_later_next_op_secs, 0},
{"_later_testCallbackOrdering", (DL_FUNC) &_later_testCallbackOrdering, 0},
{NULL, NULL, 0}
};

Expand Down
4 changes: 4 additions & 0 deletions tests/testthat/test-run_now.R
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,7 @@ test_that("run_now wakes up when a background thread calls later()", {
expect_lt(as.numeric(x[["elapsed"]]), 1.25)
expect_true(result)
})

test_that("When callbacks have tied timestamps, they respect order of creation", {
expect_error(testCallbackOrdering(), NA)
})