diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index 38bb6e355a771..b9144047df5c0 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -219,7 +219,7 @@ impl Scheduler { let message = stask.sched.get_mut_ref().message_queue.pop(); rtassert!(match message { msgq::Empty => true, _ => false }); - stask.task.get_mut_ref().destroyed = true; + stask.task.take().unwrap().drop(); } // This does not return a scheduler, as the scheduler is placed diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs index 3d3b413384050..12d7b75569782 100644 --- a/src/libgreen/task.rs +++ b/src/libgreen/task.rs @@ -442,15 +442,30 @@ impl Runtime for GreenTask { f: proc():Send) { self.put_task(cur_task); + // First, set up a bomb which when it goes off will restore the local + // task unless its disarmed. This will allow us to gracefully fail from + // inside of `configure` which allocates a new task. + struct Bomb { inner: Option> } + impl Drop for Bomb { + fn drop(&mut self) { + let _ = self.inner.take().map(|task| task.put()); + } + } + let mut bomb = Bomb { inner: Some(self) }; + // Spawns a task into the current scheduler. We allocate the new task's // stack from the scheduler's stack pool, and then configure it // accordingly to `opts`. Afterwards we bootstrap it immediately by // switching to it. // // Upon returning, our task is back in TLS and we're good to return. - let mut sched = self.sched.take_unwrap(); - let sibling = GreenTask::configure(&mut sched.stack_pool, opts, f); - sched.run_task(self, sibling) + let sibling = { + let sched = bomb.inner.get_mut_ref().sched.get_mut_ref(); + GreenTask::configure(&mut sched.stack_pool, opts, f) + }; + let mut me = bomb.inner.take().unwrap(); + let sched = me.sched.take().unwrap(); + sched.run_task(me, sibling) } // Local I/O is provided by the scheduler's event loop diff --git a/src/libnative/task.rs b/src/libnative/task.rs index 35367ff2efab3..c72d6c24a7c16 100644 --- a/src/libnative/task.rs +++ b/src/libnative/task.rs @@ -71,7 +71,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc():Send) { // Note that this increment must happen *before* the spawn in order to // guarantee that if this task exits it will always end up waiting for the // spawned task to exit. - bookkeeping::increment(); + let token = bookkeeping::increment(); // Spawning a new OS thread guarantees that __morestack will never get // triggered, but we must manually set up the actual stack bounds once this @@ -93,7 +93,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc():Send) { let mut task = task; task.put_runtime(ops); drop(task.run(|| { f.take_unwrap()() }).destroy()); - bookkeeping::decrement(); + drop(token); }) } diff --git a/src/librustrt/bookkeeping.rs b/src/librustrt/bookkeeping.rs index fd290491eaf1e..dc96aecff8017 100644 --- a/src/librustrt/bookkeeping.rs +++ b/src/librustrt/bookkeeping.rs @@ -19,14 +19,24 @@ //! decrement() manually. use core::atomics; +use core::ops::Drop; use mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT; static mut TASK_LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; -pub fn increment() { +pub struct Token { _private: () } + +impl Drop for Token { + fn drop(&mut self) { decrement() } +} + +/// Increment the number of live tasks, returning a token which will decrement +/// the count when dropped. +pub fn increment() -> Token { let _ = unsafe { TASK_COUNT.fetch_add(1, atomics::SeqCst) }; + Token { _private: () } } pub fn decrement() { diff --git a/src/librustrt/local.rs b/src/librustrt/local.rs index bdb1c60b6d6f8..e2a5eef0d99e1 100644 --- a/src/librustrt/local.rs +++ b/src/librustrt/local.rs @@ -125,8 +125,8 @@ mod test { }).join(); } - fn cleanup_task(mut t: Box) { - t.destroyed = true; + fn cleanup_task(t: Box) { + t.drop(); } } diff --git a/src/librustrt/task.rs b/src/librustrt/task.rs index d27a4f25b4e70..e3d9b7d136ec2 100644 --- a/src/librustrt/task.rs +++ b/src/librustrt/task.rs @@ -100,12 +100,21 @@ pub struct Task { pub storage: LocalStorage, pub unwinder: Unwinder, pub death: Death, - pub destroyed: bool, pub name: Option, + state: TaskState, imp: Option>, } +// Once a task has entered the `Armed` state it must be destroyed via `drop`, +// and no other method. This state is used to track this transition. +#[deriving(PartialEq)] +enum TaskState { + New, + Armed, + Destroyed, +} + pub struct TaskOpts { /// Invoke this procedure with the result of the task when it finishes. pub on_exit: Option, @@ -159,7 +168,7 @@ impl Task { storage: LocalStorage(None), unwinder: Unwinder::new(), death: Death::new(), - destroyed: false, + state: New, name: None, imp: None, } @@ -203,7 +212,7 @@ impl Task { /// }).destroy(); /// # } /// ``` - pub fn run(self: Box, f: ||) -> Box { + pub fn run(mut self: Box, f: ||) -> Box { assert!(!self.is_destroyed(), "cannot re-use a destroyed task"); // First, make sure that no one else is in TLS. This does not allow @@ -212,6 +221,7 @@ impl Task { if Local::exists(None::) { fail!("cannot run a task recursively inside another"); } + self.state = Armed; Local::put(self); // There are two primary reasons that general try/catch is unsafe. The @@ -333,12 +343,12 @@ impl Task { // Now that we're done, we remove the task from TLS and flag it for // destruction. let mut task: Box = Local::take(); - task.destroyed = true; + task.state = Destroyed; return task; } /// Queries whether this can be destroyed or not. - pub fn is_destroyed(&self) -> bool { self.destroyed } + pub fn is_destroyed(&self) -> bool { self.state == Destroyed } /// Inserts a runtime object into this task, transferring ownership to the /// task. It is illegal to replace a previous runtime object in this task @@ -453,12 +463,20 @@ impl Task { pub fn can_block(&self) -> bool { self.imp.get_ref().can_block() } + + /// Consume this task, flagging it as a candidate for destruction. + /// + /// This function is required to be invoked to destroy a task. A task + /// destroyed through a normal drop will abort. + pub fn drop(mut self) { + self.state = Destroyed; + } } impl Drop for Task { fn drop(&mut self) { rtdebug!("called drop for a task: {}", self as *mut Task as uint); - rtassert!(self.destroyed); + rtassert!(self.state != Armed); } } @@ -634,12 +652,17 @@ mod test { begin_unwind("cause", file!(), line!()) } + #[test] + fn drop_new_task_ok() { + drop(Task::new()); + } + // Task blocking tests #[test] fn block_and_wake() { let task = box Task::new(); let mut task = BlockedTask::block(task).wake().unwrap(); - task.destroyed = true; + task.drop(); } } diff --git a/src/test/run-pass/spawn-stack-too-big.rs b/src/test/run-pass/spawn-stack-too-big.rs new file mode 100644 index 0000000000000..e1c4a480d1cc1 --- /dev/null +++ b/src/test/run-pass/spawn-stack-too-big.rs @@ -0,0 +1,47 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +// ignore-macos apparently gargantuan mmap requests are ok? + +#![feature(phase)] + +#[phase(plugin)] +extern crate green; +extern crate native; + +use std::task::TaskBuilder; +use native::NativeTaskBuilder; + +green_start!(main) + +fn main() { + test(); + + let (tx, rx) = channel(); + TaskBuilder::new().native().spawn(proc() { + tx.send(test()); + }); + rx.recv(); +} + +#[cfg(not(target_word_size = "64"))] +fn test() {} + +#[cfg(target_word_size = "64")] +fn test() { + let (tx, rx) = channel(); + spawn(proc() { + TaskBuilder::new().stack_size(1024 * 1024 * 1024 * 64).spawn(proc() { + }); + tx.send(()); + }); + + assert!(rx.recv_opt().is_err()); +}