Skip to content

Commit

Permalink
Add timeout jobs to tokio/smol event_loop examples and fix borrow issue
Browse files Browse the repository at this point in the history
  • Loading branch information
hansl committed Jan 23, 2025
1 parent b6a868f commit f2511c4
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 35 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion core/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ sptr.workspace = true
thiserror.workspace = true
dashmap.workspace = true
num_enum.workspace = true
pollster.workspace = true
thin-vec.workspace = true
itertools = { workspace = true, default-features = false }
icu_normalizer = { workspace = true, features = ["compiled_data"] }
Expand Down
71 changes: 38 additions & 33 deletions core/engine/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::{
Context, JsResult, JsValue,
};
use boa_gc::{Finalize, Trace};
use futures_lite::FutureExt;
use std::{cell::RefCell, collections::VecDeque, fmt::Debug, future::Future, pin::Pin};

/// An ECMAScript [Job Abstract Closure].
Expand Down Expand Up @@ -195,6 +194,13 @@ impl TimeoutJob {
pub fn call(self, context: &mut Context) -> JsResult<JsValue> {
self.job.call(context)
}

/// Returns the timeout value in milliseconds since epoch.
#[inline]
#[must_use]
pub fn timeout(&self) -> i64 {
self.timeout
}
}

/// The [`Future`] job returned by a [`NativeAsyncJob`] operation.
Expand Down Expand Up @@ -563,18 +569,19 @@ impl JobExecutor for SimpleJobExecutor {
fn run_jobs(&self, context: &mut Context) -> JsResult<()> {
let now = context.host_hooks().utc_now();

// Execute timeout jobs first. We do not execute them in a loop.
self.timeout_jobs.borrow_mut().sort_by_key(|a| a.timeout);

let i = self
.timeout_jobs
.borrow()
.iter()
.position(|job| job.timeout <= now);
if let Some(i) = i {
let jobs_to_run: Vec<_> = self.timeout_jobs.borrow_mut().drain(..=i).collect();
for job in jobs_to_run {
job.call(context)?;
{
let mut timeouts_borrow = self.timeout_jobs.borrow_mut();
// Execute timeout jobs first. We do not execute them in a loop.
timeouts_borrow.sort_by_key(|a| a.timeout);

let i = timeouts_borrow.iter().position(|job| job.timeout <= now);
if let Some(i) = i {
let jobs_to_run: Vec<_> = timeouts_borrow.drain(..=i).collect();
drop(timeouts_borrow);

for job in jobs_to_run {
job.call(context)?;
}
}
}

Expand All @@ -584,26 +591,24 @@ impl JobExecutor for SimpleJobExecutor {
break;
}

// Block on ALL async jobs running in the queue. We don't block on a single
// job, but loop through them in context.
futures_lite::future::block_on(async {
// Fold all futures into a single future.
self.async_jobs
.borrow_mut()
.drain(..)
.fold(async { Ok(()) }.boxed_local(), |acc, job| {
let context = &context;
async move {
futures_lite::future::try_zip(acc, job.call(context)).await?;
Ok(())
}
.boxed_local()
})
.await
})?;

while let Some(job) = self.promise_jobs.borrow_mut().pop_front() {
job.call(&mut context.borrow_mut())?;
// Block on each async jobs running in the queue.
let mut next_job = self.async_jobs.borrow_mut().pop_front();
while let Some(job) = next_job {
if let Err(err) = futures_lite::future::block_on(job.call(&context)) {
self.async_jobs.borrow_mut().clear();
self.promise_jobs.borrow_mut().clear();
return Err(err);
};
next_job = self.async_jobs.borrow_mut().pop_front();
}
let mut next_job = self.promise_jobs.borrow_mut().pop_front();
while let Some(job) = next_job {
if let Err(err) = job.call(&mut context.borrow_mut()) {
self.async_jobs.borrow_mut().clear();
self.promise_jobs.borrow_mut().clear();
return Err(err);
};
next_job = self.promise_jobs.borrow_mut().pop_front();
}
}

Expand Down
24 changes: 24 additions & 0 deletions examples/src/bin/smol_event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
time::{Duration, Instant},
};

use boa_engine::job::TimeoutJob;
use boa_engine::{
context::ContextBuilder,
job::{Job, JobExecutor, NativeAsyncJob, PromiseJob},
Expand Down Expand Up @@ -36,6 +37,7 @@ fn main() -> JsResult<()> {
struct Queue {
async_jobs: RefCell<VecDeque<NativeAsyncJob>>,
promise_jobs: RefCell<VecDeque<PromiseJob>>,
timeout_jobs: RefCell<Vec<TimeoutJob>>,
}

impl Queue {
Expand All @@ -46,7 +48,29 @@ impl Queue {
}
}

fn drain_timeout_jobs(&self, context: &mut Context) {
let now = context.host_hooks().utc_now();
let mut timeouts_borrow = self.timeout_jobs.borrow_mut();
// Execute timeout jobs first. We do not execute them in a loop.
timeouts_borrow.sort_by_key(|a| a.timeout());

let i = timeouts_borrow.iter().position(|job| job.timeout() <= now);
if let Some(i) = i {
let jobs_to_run: Vec<_> = timeouts_borrow.drain(..=i).collect();
drop(timeouts_borrow);

for job in jobs_to_run {
if let Err(e) = job.call(context) {
eprintln!("Uncaught {e}");
}
}
}
}

fn drain_jobs(&self, context: &mut Context) {
// Run the timeout jobs first.
self.drain_timeout_jobs(context);

let jobs = std::mem::take(&mut *self.promise_jobs.borrow_mut());
for job in jobs {
if let Err(e) = job.call(context) {
Expand Down
26 changes: 26 additions & 0 deletions examples/src/bin/tokio_event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
time::{Duration, Instant},
};

use boa_engine::job::TimeoutJob;
use boa_engine::{
context::ContextBuilder,
job::{Job, JobExecutor, NativeAsyncJob, PromiseJob},
Expand Down Expand Up @@ -36,17 +37,41 @@ fn main() -> JsResult<()> {
struct Queue {
async_jobs: RefCell<VecDeque<NativeAsyncJob>>,
promise_jobs: RefCell<VecDeque<PromiseJob>>,
timeout_jobs: RefCell<Vec<TimeoutJob>>,
}

impl Queue {
fn new() -> Self {
Self {
async_jobs: RefCell::default(),
promise_jobs: RefCell::default(),
timeout_jobs: RefCell::default(),
}
}

fn drain_timeout_jobs(&self, context: &mut Context) {
let now = context.host_hooks().utc_now();
let mut timeouts_borrow = self.timeout_jobs.borrow_mut();
// Execute timeout jobs first. We do not execute them in a loop.
timeouts_borrow.sort_by_key(|a| a.timeout());

let i = timeouts_borrow.iter().position(|job| job.timeout() <= now);
if let Some(i) = i {
let jobs_to_run: Vec<_> = timeouts_borrow.drain(..=i).collect();
drop(timeouts_borrow);

for job in jobs_to_run {
if let Err(e) = job.call(context) {
eprintln!("Uncaught {e}");
}
}
}
}

fn drain_jobs(&self, context: &mut Context) {
// Run the timeout jobs first.
self.drain_timeout_jobs(context);

let jobs = std::mem::take(&mut *self.promise_jobs.borrow_mut());
for job in jobs {
if let Err(e) = job.call(context) {
Expand All @@ -61,6 +86,7 @@ impl JobExecutor for Queue {
match job {
Job::PromiseJob(job) => self.promise_jobs.borrow_mut().push_back(job),
Job::AsyncJob(job) => self.async_jobs.borrow_mut().push_back(job),
Job::TimeoutJob(job) => self.timeout_jobs.borrow_mut().push(job),
_ => panic!("unsupported job type"),
}
}
Expand Down

0 comments on commit f2511c4

Please sign in to comment.