Skip to content

Commit

Permalink
Remove alive tracking from the timeout worker
Browse files Browse the repository at this point in the history
This removes the "alive" flag from the timeout worker, instead relying
on dropping channels to signal the shutting down of this worker. This is
done in preparation for https://gitlab.com/inko-lang/inko/-/issues/275,
as it removes the need for explicitly shutting down a timeout worker,
while also simplifying the internals.

Changelog: changed
  • Loading branch information
yorickpeterse committed Sep 15, 2022
1 parent b0fd083 commit 1e0b003
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 156 deletions.
18 changes: 6 additions & 12 deletions vm/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,12 @@ impl<'a> Machine<'a> {
let entry_method =
unsafe { entry_class.get_method(image.entry_method) };

let timeout_guard = {
let thread_state = state.clone();

thread::Builder::new()
{
let state = state.clone();
let _ = thread::Builder::new()
.name("timeout worker".to_string())
.spawn(move || {
thread_state.timeout_worker.run(&thread_state.scheduler);
})
.unwrap()
.spawn(move || state.timeout_worker.run(&state.scheduler))
.unwrap();
};

let poller_guard = {
Expand Down Expand Up @@ -148,10 +145,7 @@ impl<'a> Machine<'a> {

// Joining the pools only fails in case of a panic. In this case we
// don't want to re-panic as this clutters the error output.
if primary_guard.join().is_err()
|| timeout_guard.join().is_err()
|| poller_guard.join().is_err()
{
if primary_guard.join().is_err() || poller_guard.join().is_err() {
state.set_exit_status(1);
}

Expand Down
184 changes: 41 additions & 143 deletions vm/src/scheduler/timeout_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ const FRAGMENTATION_THRESHOLD: f64 = 0.1;
/// The maximum number of messages to process in a single timeout iteration.
const MAX_MESSAGES_PER_ITERATION: usize = 64;

enum Message {
Suspend(ProcessPointer, ArcWithoutWeak<Timeout>),
Terminate,
struct Message {
process: ProcessPointer,
timeout: ArcWithoutWeak<Timeout>,
}

struct Inner {
Expand All @@ -26,9 +26,6 @@ struct Inner {

/// The receiving half of the channel used for suspending processes.
receiver: Receiver<Message>,

/// Indicates if the timeout worker should run or terminate.
alive: bool,
}

/// A TimeoutWorker is tasked with rescheduling processes when their timeouts
Expand Down Expand Up @@ -69,7 +66,7 @@ impl TimeoutWorker {
pub(crate) fn new() -> Self {
let (sender, receiver) = unbounded();

let inner = Inner { timeouts: Timeouts::new(), receiver, alive: true };
let inner = Inner { timeouts: Timeouts::new(), receiver };

TimeoutWorker {
inner: UnsafeCell::new(inner),
Expand All @@ -78,38 +75,26 @@ impl TimeoutWorker {
}
}

pub(crate) fn terminate(&self) {
self.sender
.send(Message::Terminate)
.expect("Failed to terminate because the channel was closed");
}

pub(crate) fn increase_expired_timeouts(&self) {
self.expired.fetch_add(1, Ordering::AcqRel);
}

pub(crate) fn run(&self, scheduler: &Scheduler) {
while self.is_alive() {
loop {
self.defragment_heap();
self.handle_pending_messages();

// This ensures we don't end up waiting for a message below if we
// were instructed to terminated when processing the pending
// messages.
if !self.is_alive() {
return;
}

let time_until_expiration =
self.reschedule_expired_processes(scheduler);

if let Some(duration) = time_until_expiration {
self.wait_for_message_with_timeout(duration);
} else {
// When there are no timeouts there's no point in periodically
// processing the list of timeouts, so instead we wait until the
// first one is added.
self.wait_for_message();
match self.inner().receiver.recv_timeout(duration) {
Ok(msg) => self.handle_message(msg),
Err(err) if err.is_disconnected() => break,
_ => {}
}
} else if !self.wait_for_message() {
break;
}
}
}
Expand All @@ -119,9 +104,10 @@ impl TimeoutWorker {
process: ProcessPointer,
timeout: ArcWithoutWeak<Timeout>,
) {
self.sender
.send(Message::Suspend(process, timeout))
.expect("Failed to suspend because the channel was closed");
// If a send fails that's OK, because this realistically only happens
// during shutdown of the program, at which point we don't care about
// what happens with the message.
let _ = self.sender.send(Message { process, timeout });
}

fn reschedule_expired_processes(
Expand Down Expand Up @@ -150,37 +136,17 @@ impl TimeoutWorker {
}
}

fn wait_for_message(&self) {
let message = self
.inner()
.receiver
.recv()
.expect("Attempt to receive from a closed channel");

self.handle_message(message);
}

fn wait_for_message_with_timeout(&self, wait_for: Duration) {
if let Ok(message) = self.inner().receiver.recv_timeout(wait_for) {
fn wait_for_message(&self) -> bool {
if let Ok(message) = self.inner().receiver.recv() {
self.handle_message(message);
true
} else {
false
}
}

fn handle_message(&self, message: Message) {
let inner = self.inner_mut();

match message {
Message::Suspend(process, timeout) => {
inner.timeouts.insert(process, timeout);
}
Message::Terminate => {
inner.alive = false;
}
}
}

fn is_alive(&self) -> bool {
self.inner().alive
self.inner_mut().timeouts.insert(message.process, message.timeout);
}

fn number_of_expired_timeouts(&self) -> f64 {
Expand Down Expand Up @@ -222,11 +188,26 @@ mod tests {
use std::thread;
use std::time::Instant;

fn terminate(worker: &TimeoutWorker) {
let (sender, _) = unbounded();

// This shuts down the worker by dropping the old Sender, without
// needing extra boolean "alive" flags.
//
// We use this hack because in tests we may use an Arc for TimeoutWorker
// so we can share it with threads.
unsafe {
let oh_dear_god =
worker as *const TimeoutWorker as *mut TimeoutWorker;

(&mut *oh_dear_god).sender = sender;
}
}

#[test]
fn test_new() {
let worker = TimeoutWorker::new();

assert!(worker.inner().alive);
assert_eq!(worker.inner().timeouts.len(), 0);
assert_eq!(worker.expired.load(Ordering::Acquire), 0);
}
Expand All @@ -242,15 +223,6 @@ mod tests {
assert!(worker.inner().receiver.recv().is_ok());
}

#[test]
fn test_terminate() {
let worker = TimeoutWorker::new();

worker.terminate();

assert!(worker.inner().receiver.recv().is_ok());
}

#[test]
fn test_increase_expired_timeouts() {
let worker = TimeoutWorker::new();
Expand Down Expand Up @@ -280,8 +252,7 @@ mod tests {
// loop.
worker.wait_for_message();
worker.wait_for_message();
worker.terminate();

terminate(&worker);
worker.run(&scheduler);

assert_eq!(worker.inner().timeouts.len(), 1);
Expand All @@ -298,7 +269,7 @@ mod tests {

process.state().waiting_for_future(Some(timeout.clone()));
worker.suspend(process, timeout);
worker.terminate();
terminate(&worker);
worker.run(&scheduler);

assert_eq!(worker.inner().timeouts.len(), 1);
Expand Down Expand Up @@ -338,7 +309,7 @@ mod tests {
thread::sleep(Duration::from_millis(5));
}

worker.terminate();
terminate(&worker);

let duration =
handle.join().expect("Failed to join the timeout worker");
Expand Down Expand Up @@ -380,79 +351,6 @@ mod tests {
assert!(scheduler.pool.state.pop_global().is_none());
}

#[test]
fn test_handle_pending_messages() {
let worker = TimeoutWorker::new();

worker.terminate();
worker.handle_pending_messages();

assert!(!worker.is_alive());
}

#[test]
fn test_handle_pending_messages_with_many_messages() {
let worker = TimeoutWorker::new();

for _ in 0..(MAX_MESSAGES_PER_ITERATION + 1) {
worker.terminate();
}

worker.handle_pending_messages();

assert!(worker.inner().receiver.recv().is_ok());
}

#[test]
fn test_wait_for_message() {
let worker = TimeoutWorker::new();

worker.terminate();
worker.wait_for_message();

assert!(!worker.is_alive());
}

#[test]
fn test_wait_for_message_with_timeout_with_message() {
let worker = TimeoutWorker::new();

worker.terminate();
worker.wait_for_message_with_timeout(Duration::from_millis(5));

assert!(!worker.is_alive());
}

#[test]
fn test_wait_for_message_with_timeout_without_message() {
let worker = TimeoutWorker::new();
let start = Instant::now();

worker.wait_for_message_with_timeout(Duration::from_millis(10));

assert!(start.elapsed() >= Duration::from_millis(9));
}

#[test]
fn test_handle_message() {
let worker = TimeoutWorker::new();

worker.handle_message(Message::Terminate);

assert!(!worker.is_alive());
}

#[test]
fn test_is_alive() {
let worker = TimeoutWorker::new();

assert!(worker.is_alive());

worker.handle_message(Message::Terminate);

assert!(!worker.is_alive());
}

#[test]
fn test_number_of_expired_timeouts() {
let worker = TimeoutWorker::new();
Expand Down
1 change: 0 additions & 1 deletion vm/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ impl State {

pub(crate) fn terminate(&self) {
self.scheduler.terminate();
self.timeout_worker.terminate();
self.network_poller.terminate();
}

Expand Down

0 comments on commit 1e0b003

Please sign in to comment.