Skip to content

Commit

Permalink
removed useless code, cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
zonyitoo committed Dec 27, 2015
1 parent 0ff6e3b commit f747ab8
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 109 deletions.
14 changes: 4 additions & 10 deletions src/runtime/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ use std::ptr;
use std::any::Any;
use std::time::Duration;

// use mio::util::BoundedQueue;

use deque::{BufferPool, Stolen, Worker, Stealer};

use rand;
Expand Down Expand Up @@ -169,11 +167,6 @@ impl Processor {
self.cur_running
}

// #[inline]
// pub fn set_neighbors(&mut self, neigh: Vec<Stealer<SendableCoroutinePtr>>) {
// self.neighbor_stealers = neigh;
// }

/// Get the thread local processor
#[inline]
pub fn current() -> &'static mut Processor {
Expand Down Expand Up @@ -269,11 +262,11 @@ impl Processor {
ProcMessage::NewNeighbor(nei) => self.neighbor_stealers.push(nei),
ProcMessage::Shutdown => {
self.destroy_all_coroutines();
},
}
ProcMessage::Ready(SendableCoroutinePtr(ptr)) => unsafe {
self.ready(ptr);
self.has_ready_tasks = true;
}
},
}
}

Expand All @@ -290,7 +283,8 @@ impl Processor {
let rand_idx = rand::random::<usize>();
let total_stealers = self.neighbor_stealers.len();
for idx in (0..self.neighbor_stealers.len()).map(|x| (x + rand_idx) % total_stealers) {
if let Stolen::Data(SendableCoroutinePtr(hdl)) = self.neighbor_stealers[idx].steal() {
if let Stolen::Data(SendableCoroutinePtr(hdl)) = self.neighbor_stealers[idx]
.steal() {
unsafe {
self.run_with_all_local_tasks(hdl);
}
Expand Down
132 changes: 33 additions & 99 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
//! Global coroutine scheduler
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Condvar};
use std::sync::Arc;
use std::sync::mpsc::{Sender, TryRecvError};
use std::default::Default;
use std::any::Any;
use std::thread;
use std::io;
use std::cell::UnsafeCell;
use std::time::Duration;
Expand Down Expand Up @@ -97,7 +96,8 @@ impl Handler for IoHandler {
type Message = IoHandlerMessage;

fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
debug!("Got {:?} for {:?}", events, token);
trace!("Got {:?} for {:?}", events, token);

if token == Token(0) {
error!("Received events from Token(0): {:?}", events);
return;
Expand All @@ -112,7 +112,8 @@ impl Handler for IoHandler {
}

fn timeout(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
debug!("Timer waked up {:?}", token);
trace!("Timer waked up {:?}", token);

if token == Token(0) {
error!("Received timeout event from Token(0)");
return;
Expand All @@ -139,9 +140,7 @@ impl Handler for IoHandler {

impl IoHandler {
fn new() -> IoHandler {
IoHandler {
slab: Slab::new_starting_at(Token(1), 102400),
}
IoHandler { slab: Slab::new_starting_at(Token(1), 102400) }
}

fn wakeup_all(&mut self, event_loop: &mut EventLoop<Self>) {
Expand All @@ -157,13 +156,12 @@ impl IoHandler {
/// Coroutine scheduler
pub struct Scheduler {
work_counts: AtomicUsize,
// proc_handles: Mutex<Vec<(Sender<ProcMessage>, Stealer<SendableCoroutinePtr>)>>,
expected_worker_count: usize,
starving_lock: Arc<(Mutex<usize>, Condvar)>,

// Mio event loop and the handler
// It controls all I/O and timer waits
eventloop: UnsafeCell<EventLoop<IoHandler>>,
io_handler: UnsafeCell<IoHandler>,
processors: Mutex<Vec<(Sender<ProcMessage>, Stealer<SendableCoroutinePtr>)>>,
}

unsafe impl Send for Scheduler {}
Expand All @@ -174,13 +172,10 @@ impl Scheduler {
pub fn new() -> Scheduler {
Scheduler {
work_counts: AtomicUsize::new(0),
// proc_handles: Mutex::new(Vec::new()),
expected_worker_count: 1,
starving_lock: Arc::new((Mutex::new(0), Condvar::new())),

eventloop: UnsafeCell::new(EventLoop::new().unwrap()),
io_handler: UnsafeCell::new(IoHandler::new()),
processors: Mutex::new(vec![]),
}
}

Expand All @@ -205,19 +200,6 @@ impl Scheduler {
Processor::current().ready(coro);
}

#[doc(hidden)]
#[inline]
pub fn proc_wait(&self) {
let &(ref lock, ref cond) = &*self.starving_lock;
let mut guard = lock.lock().unwrap();
*guard -= 1;
if *guard != 0 {
debug!("Thread {:?} is starving and exile ...", thread::current());
guard = cond.wait(guard).unwrap();
}
*guard += 1;
}

/// A coroutine is finished
///
/// The coroutine will be destroy, make sure that the coroutine pointer is unique!
Expand Down Expand Up @@ -261,10 +243,6 @@ impl Scheduler {
};
Processor::current().spawn_opts(Box::new(wrapper), opts);

let &(ref lock, ref cond) = &*Scheduler::instance().starving_lock;
let _ = lock.lock().unwrap();
cond.notify_one();

JoinHandle { result: rx }
}

Expand All @@ -276,71 +254,70 @@ impl Scheduler {
let the_sched = Arc::new(self);
let mut handles = Vec::new();

*the_sched.starving_lock.0.lock().unwrap() = 1;
let mut processor_handlers: Vec<Sender<ProcMessage>> = Vec::new();
let mut processor_stealers: Vec<Stealer<SendableCoroutinePtr>> = Vec::new();

// Run the main function
let main_coro_hdl = {
// The first worker
let mut processors = the_sched.processors.lock().unwrap();

let (hdl, msg, st, main_hdl) = Processor::run_main(0, the_sched.clone(), main_fn);
handles.push(hdl);
processors.push((msg, st));

processor_handlers.push(msg);
processor_stealers.push(st);

main_hdl
};

{
*the_sched.starving_lock.0.lock().unwrap() = the_sched.expected_worker_count;

// The others
for tid in 1..the_sched.expected_worker_count {
let mut processors = the_sched.processors.lock().unwrap();

let (hdl, msg, st) = Processor::run_with_neighbors(tid,
the_sched.clone(),
processors.iter()
.map(|x| x.1.clone())
.collect());
processor_stealers.clone());

for &(ref msg, _) in processors.iter() {
for msg in processor_handlers.iter() {
if let Err(err) = msg.send(ProcMessage::NewNeighbor(st.clone())) {
error!("Error while sending NewNeighbor {:?}", err);
}
}

handles.push(hdl);
processors.push((msg, st));

processor_handlers.push(msg);
processor_stealers.push(st);
}
}

// The scheduler loop
loop {
{
let io_handler: &mut IoHandler = unsafe { &mut *the_sched.io_handler.get() };
let event_loop: &mut EventLoop<IoHandler> = unsafe { &mut *the_sched.eventloop.get() };
let event_loop: &mut EventLoop<IoHandler> = unsafe {
&mut *the_sched.eventloop.get()
};

event_loop.run_once(io_handler, Some(100)).unwrap();
}

match main_coro_hdl.try_recv() {
Ok(main_ret) => {
{
let processors = the_sched.processors.lock().unwrap();

for &(ref msg, _) in processors.iter() {
for msg in processor_handlers.iter() {
let _ = msg.send(ProcMessage::Shutdown);
}
}

{
let event_loop: &mut EventLoop<IoHandler> = unsafe { &mut *the_sched.eventloop.get() };
let io_handler: &mut IoHandler = unsafe { &mut *the_sched.io_handler.get() };
let event_loop: &mut EventLoop<IoHandler> = unsafe {
&mut *the_sched.eventloop.get()
};
let io_handler: &mut IoHandler = unsafe {
&mut *the_sched.io_handler.get()
};
io_handler.wakeup_all(event_loop);
}

{
let &(ref lock, ref cond) = &*the_sched.starving_lock;
let _ = lock.lock().unwrap();
cond.notify_all();
}

for hdl in handles {
hdl.join().unwrap();
}
Expand All @@ -352,7 +329,6 @@ impl Scheduler {
panic!("Main coro is disconnected");
}
}

}
}

Expand Down Expand Up @@ -381,33 +357,6 @@ impl Scheduler {

if let Some(ptr) = unsafe { processor.running() } {
let event_loop: &mut EventLoop<IoHandler> = unsafe { &mut *self.eventloop.get() };
// let token = {
// let mut io_handler = self.io_handler.lock().unwrap();
//
// io_handler.io_slab
// .insert((processor.id(), ptr))
// .unwrap()
// };
// try!(event_loop.register(fd, token, interest, PollOpt::edge() | PollOpt::oneshot()));
// debug!("wait_event: Blocked current Coroutine ...; token={:?}",
// token);
// processor.block();
// debug!("wait_event: Waked up; token={:?}", token);
//
// // For the latest MIO version, it requires to deregister every Evented object
// // But KQueue with EV_ONESHOT flag does not require explicit deregister
// if cfg!(not(any(target_os = "macos",
// target_os = "ios",
// target_os = "freebsd",
// target_os = "dragonfly",
// target_os = "netbsd"))) {
// try!(event_loop.deregister(fd));
// }
//
// {
// let mut io_handler = self.io_handler.lock().unwrap();
// io_handler.io_slab.remove(token);
// }
let proc_hdl = processor.handle();
let sendable_coro = SendableCoroutinePtr(ptr);
let channel = event_loop.channel();
Expand Down Expand Up @@ -455,27 +404,12 @@ impl Scheduler {
let processor = Processor::current();

if let Some(ptr) = unsafe { processor.running() } {
// let token = {
// let mut io_handler = self.io_handler.lock().unwrap();
//
// io_handler.timer_slab
// .insert((processor.id(), ptr))
// .unwrap()
// };
let event_loop: &mut EventLoop<IoHandler> = unsafe { &mut *self.eventloop.get() };
// event_loop.timeout_ms(token, delay).unwrap();
//
// processor.block();
//
// {
// let mut io_handler = self.io_handler.lock().unwrap();
// io_handler.timer_slab.remove(token);
// }
let proc_hdl = processor.handle();
let sendable_coro = SendableCoroutinePtr(ptr);
let channel = event_loop.channel();

let msg = IoHandlerMessage::new(|evloop: &mut EventLoop<IoHandler>, token /* : _ */| {
let msg = IoHandlerMessage::new(|evloop: &mut EventLoop<IoHandler>, token| {
evloop.timeout_ms(token, delay).unwrap();
},
move |_: &mut EventLoop<IoHandler>| {
Expand Down

0 comments on commit f747ab8

Please sign in to comment.