Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve executor ergonomics #27

Merged
merged 4 commits into from
Sep 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions scipio/examples/hello_world.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Unless explicitly stated otherwise all files in this repository are licensed under the
// MIT/Apache-2.0 License, at your convenience
//
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc.
//
use futures::future::join_all;
use scipio::{Local, LocalExecutor};
use std::io::Result;

async fn hello() {
let mut tasks = vec![];
for t in 0..5 {
tasks.push(Local::local(async move {
println!("{}: Hello {} ...", Local::id(), t);
Local::later().await;
println!("{}: ... {} World!", Local::id(), t);
}));
}
join_all(tasks).await;
}

fn main() -> Result<()> {
// There are two ways to create an executor, demonstrated in this example.
//
// We can create it in the current thread, and run it separately later...
let ex = LocalExecutor::new(Some(0))?;

// Or we can spawn a new thread with an executor inside.
let handle = LocalExecutor::spawn_executor("hello", Some(1), || async move {
hello().await;
})?;

// If you create the executor manually, you have to run it like so.
//
// spawn_new() is the preferred way to create an executor!
ex.run(async move {
hello().await;
});

// The newly spawned executor runs on a thread, so we need to join on
// its handle so we can wait for it to finish
handle.join().unwrap();
Ok(())
}
133 changes: 118 additions & 15 deletions scipio/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use std::future::Future;
use std::io;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant};

use futures_lite::pin;
Expand All @@ -46,6 +48,8 @@ use crate::task::waker_fn::waker_fn;
use crate::Reactor;
use crate::{IoRequirements, Latency};

static EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);

#[derive(Debug, Clone)]
/// Error thrown when a Task Queue is not found.
pub struct QueueNotFoundError {
Expand Down Expand Up @@ -304,9 +308,29 @@ pub struct LocalExecutor {
queues: Rc<RefCell<ExecutorQueues>>,
parker: parking::Parker,
binding: Option<usize>,
id: usize,
}

impl LocalExecutor {
fn init(&mut self) -> io::Result<()> {
if let Some(cpu) = self.binding {
bind_to_cpu(cpu)?;
}

let queues = self.queues.clone();
let index = 0;

let io_requirements = IoRequirements::new(Latency::NotImportant, 0);
self.queues.borrow_mut().available_executors.insert(
0,
TaskQueue::new("default", 1000, io_requirements, move || {
let mut queues = queues.borrow_mut();
queues.maybe_activate(index);
}),
);
Ok(())
}

/// Creates a single-threaded executor, optionally bound to a specific CPU
///
/// # Examples
Expand All @@ -322,28 +346,79 @@ impl LocalExecutor {
/// ```
pub fn new(binding: Option<usize>) -> io::Result<LocalExecutor> {
let p = parking::Parker::new();
let le = LocalExecutor {
let mut le = LocalExecutor {
queues: ExecutorQueues::new(),
parker: p,
binding,
id: EXECUTOR_ID.fetch_add(1, Ordering::Relaxed),
};

if let Some(cpu) = binding {
bind_to_cpu(cpu)?;
}
le.init()?;
Ok(le)
}

let queues = le.queues.clone();
let index = 0;
/// Creates a single-threaded executor, optionally bound to a specific CPU, inside
/// a newly craeted thread. The parameter `name` specifies the name of the thread.
///
/// This is a more ergonomic way to create a thread and then run an executor inside it
/// This function panics if creating the thread or the executor fails. If you need more
/// fine-grained error handling consider initializing those entities manually.
///
///
/// # Examples
///
/// ```
/// use scipio::LocalExecutor;
///
/// // executor is a single thread, but not bound to any particular CPU.
/// let handle = LocalExecutor::spawn_executor("myname", None, || async move {
/// println!("hello");
/// }).unwrap();
///
/// handle.join().unwrap();
/// ```
#[must_use = "This spawns an executor on a thread, so you must acquire its handle and then join() to keep it alive"]
pub fn spawn_executor<G, F, T>(
name: &'static str,
binding: Option<usize>,
fut_gen: G,
) -> io::Result<JoinHandle<()>>
where
G: FnOnce() -> F + std::marker::Send + 'static,
F: Future<Output = T> + 'static,
{
let id = EXECUTOR_ID.fetch_add(1, Ordering::Relaxed);

Builder::new()
.name(format!("{}-{}", name, id).to_string())
.spawn(move || {
let mut le = LocalExecutor {
queues: ExecutorQueues::new(),
parker: parking::Parker::new(),
binding,
id,
};
le.init().unwrap();
le.run(async move {
let task = Task::local(async move {
fut_gen().await;
});
task.await;
})
})
}

let io_requirements = IoRequirements::new(Latency::NotImportant, 0);
le.queues.borrow_mut().available_executors.insert(
0,
TaskQueue::new("default", 1000, io_requirements, move || {
let mut queues = queues.borrow_mut();
queues.maybe_activate(index);
}),
);
Ok(le)
/// Returns a unique identifier for this Executor.
///
/// # Examples
/// ```
/// use scipio::LocalExecutor;
///
/// let local_ex = LocalExecutor::new(None).expect("failed to create local executor");
/// println!("My ID: {}", local_ex.id());
/// ```
pub fn id(&self) -> usize {
self.id
}

/// Creates a task queue in the executor.
Expand Down Expand Up @@ -717,6 +792,34 @@ impl<T> Task<T> {
}
}

/// Returns the id of the current executor
///
/// If called from a [`LocalExecutor`], returns the id of the executor.
///
/// Otherwise, this method panics.
///
/// # Examples
///
/// ```
/// use scipio::{LocalExecutor, Task};
///
/// let local_ex = LocalExecutor::new(None).expect("failed to create local executor");
///
/// local_ex.run(async {
/// println!("my ID: {}", Task::<()>::id());
/// });
/// ```
pub fn id() -> usize
where
T: 'static,
{
if LOCAL_EX.is_set() {
LOCAL_EX.with(|local_ex| local_ex.id())
} else {
panic!("`Task::id()` must be called from a `LocalExecutor`")
}
}

/// Detaches the task to let it keep running in the background.
///
/// # Examples
Expand Down
11 changes: 11 additions & 0 deletions scipio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,17 @@ pub use crate::pollable::Async;
pub use crate::sys::DmaBuffer;
pub use crate::timer::Timer;

/// Local is an ergonomic way to access the local executor.
/// The local is executed through a Task type, but the Task type has a type
/// parameter consisting of the return type of the future encapsulated by this
/// task.
///
/// However for associated functions without a self parameter, like local() and
/// local_into(), the type is always () and Rust is not able to elide.
///
/// Writing Task::<()>::function() works, but it is not very ergonomic.
pub type Local = Task<()>;

/// An attribute of a TaskQueue, passed during its creation.
///
/// This tells the executor whether or not tasks in this class are latency
Expand Down