From 616496c3eefd7d2a55d7480c46607e5a6fe018de Mon Sep 17 00:00:00 2001 From: zuon Date: Sun, 8 Apr 2018 16:57:04 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BA=86=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E5=92=8C=E5=B7=A5=E4=BD=9C=E8=80=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 7 ++ Cargo.toml | 3 +- src/adapter.rs | 34 +++++++++- src/lib.rs | 3 + src/task.rs | 87 +++++-------------------- src/task_pool.rs | 45 ++++--------- src/worker.rs | 156 +++++++++++++++++++++------------------------ src/worker_pool.rs | 93 ++++++++------------------- tests/test.rs | 12 ++-- 9 files changed, 176 insertions(+), 264 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2450dee..8b6bbca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,11 @@ name = "fuchsia-zircon-sys" version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "lazy_static" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "libc" version = "0.2.39" @@ -34,6 +39,7 @@ dependencies = [ name = "pi_vm" version = "0.1.0" dependencies = [ + "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.39 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -96,6 +102,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum bitflags 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b3c30d3802dfb7281680d6285f2ccdaa8c2d8fee41f93805dba5c4cf50dc23cf" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" +"checksum lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c8f31047daa365f19be14b47c29df4f7c3b581832407daabe6ae77397619237d" "checksum libc 0.2.39 (registry+https://github.com/rust-lang/crates.io-index)" = "f54263ad99207254cf58b5f701ecb432c717445ea2ee8af387334bdd1a03fdff" "checksum num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c51a3322e4bca9d212ad9a158a02abc6934d005490c054a2778df73a70aa0a30" "checksum rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "eba5f8cb59cc50ed56be8880a5c7b496bfd9bd26394e176bc67884094145c2c5" diff --git a/Cargo.toml b/Cargo.toml index b781fbf..28590ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,4 +7,5 @@ authors = ["zuon "] time = "*" libc = "*" rand = "*" -threadpool = "*" \ No newline at end of file +threadpool = "*" +lazy_static = "*" \ No newline at end of file diff --git a/src/adapter.rs b/src/adapter.rs index 349a275..80f5a24 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -1,11 +1,13 @@ use libc::{c_void, c_char, int8_t, uint8_t, c_int, uint32_t, uint64_t, c_double, memcpy}; -use std::ffi::{CStr, CString}; use std::slice::from_raw_parts; +use std::ffi::{CStr, CString}; +use std::sync::{Arc, Mutex}; use std::os::raw::c_uchar; use std::mem::transmute; use std::ops::Drop; use std::ptr::null; +use bonmgr::BonMgr; use data_view_impl::*; #[link(name = "njsc")] @@ -83,9 +85,35 @@ pub fn register_data_view() { } } +lazy_static! { + static ref BON_MGR: Arc> = Arc::new(Mutex::new(BonMgr::new())); +} + //调用NativeObject函数 -extern "C" fn native_object_function_call(hash: uint32_t, args_size: uint32_t, args: *const c_void) -> *const c_void { - null() +#[no_mangle] +pub extern "C" fn native_object_function_call( + vm: *const c_void, + hash: uint32_t, + args_size: uint32_t, + args: *const c_void) -> *const c_void { + if args_size == 0 { + return null(); + } + + let reply: Option; + let mut js = JS {vm: vm as usize}; + //同步块 + { + let mut refer = BON_MGR.clone(); + reply = (&mut *refer. + lock(). + unwrap()). + call(&mut js, hash, Vec::new()).ok(); + } + match reply { + Some(val) => val.value as *const c_void, + None => null(), + } } //执行njsc测试代码 diff --git a/src/lib.rs b/src/lib.rs index 1db8067..a2027f8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,9 @@ extern crate libc; extern crate rand; extern crate threadpool; +#[macro_use] +extern crate lazy_static; + pub mod data_view_impl; pub mod adapter; pub mod util; diff --git a/src/task.rs b/src/task.rs index be65d81..f3fb91e 100644 --- a/src/task.rs +++ b/src/task.rs @@ -3,8 +3,6 @@ use std::mem::transmute; use std::collections::VecDeque; use std::fmt::{Display, Formatter, Result}; -use adapter::JSType; - /* * 任务类型 */ @@ -20,64 +18,33 @@ pub enum TaskType { * 任务结构 */ pub struct Task { - uid: u32, //任务唯一id - task_type: TaskType, //任务类型 priority: u32, //任务优先级 func: (usize, usize), //任务函数 - args: Vec, //任务参数 - start_time: i64, //任务开始时间 - finish_time: i64 //任务完成时间 + info: &'static str, //任务信息 } unsafe impl Sync for Task {} //声明保证多线程安全性 impl Display for Task { fn fmt(&self, f: &mut Formatter) -> Result { - write!(f, "Task[uid = {}, type = {:?}, priority = {}, start_time = {}, finish_time = {}]", - self.uid, self.task_type, self.priority, self.start_time, self.finish_time) + write!(f, "Task[priority = {}, func = {:?}, info = {}]", self.priority, self.func, self.info) } } impl Task { pub fn new() -> Self { Task { - uid: 0, - task_type: TaskType::Empty, - priority: 0, - func: (0, 0), - args: Vec::new(), - start_time: 0, - finish_time: 0, + priority: 0, + func: (0, 0), + info: "", } } - pub fn copy_to(mut self, dest: &mut Self) -> Self { + pub fn copy_to(&self, dest: &mut Self) { //复制其它成员 - dest.uid = *&self.uid; - dest.task_type = *&self.task_type; - dest.priority = *&self.priority; - dest.func = *&self.func; - //移动源参数 - for index in 0..self.args.len() { - dest.args[index] = self.args.remove(index); - } - self - } - - pub fn get_uid(&self) -> u32 { - self.uid - } - - pub fn set_uid(&mut self, uid: u32) { - self.uid = uid; - } - - pub fn get_type(&self) -> TaskType { - self.task_type - } - - pub fn set_type(&mut self, task_type: TaskType) { - self.task_type = task_type; + dest.priority = self.priority; + dest.func = self.func; + dest.info = self.info; } pub fn get_priority(&self) -> u32 { @@ -98,43 +65,19 @@ impl Task { None => (), } } - - pub fn get_args(&self) -> Vec { - self.args.to_vec() - } - - pub fn add_args(&mut self, arg: JSType) { - self.args.push(arg); - } - pub fn set_args(&mut self, args: Vec) { - self.args = args; + pub fn get_info(&self) -> &str { + self.info } - - pub fn get_start_time(&self) -> i64 { - self.start_time - } - - pub fn set_start_time(&mut self, start_time: i64) { - self.start_time = start_time; - } - - pub fn get_finish_time(&self) -> i64 { - self.finish_time - } - - pub fn set_finish_time(&mut self, finish_time: i64) { - self.finish_time = finish_time; + + pub fn set_info(&mut self, info: &'static str) { + self.info = info; } pub fn reset(&mut self) { - self.uid = 0; - self.task_type = TaskType::Empty; self.priority = 0; self.func = (0, 0); - self.args.clear(); - self.start_time = 0; - self.finish_time = 0; + self.info = ""; } pub fn run(&self) { diff --git a/src/task_pool.rs b/src/task_pool.rs index a3636e0..e592af4 100644 --- a/src/task_pool.rs +++ b/src/task_pool.rs @@ -6,8 +6,6 @@ use std::collections::{HashMap, VecDeque}; use std::fmt::{Display, Formatter, Result}; use task::{TaskType, Task, TaskCache}; -use util::now_microsecond; -use adapter::JSType; /* * 同步任务池 @@ -58,11 +56,9 @@ impl SyncPool { if w < 0 { self.weight -= priority; //减少同步任务池权重 match queue.pop_front() { - Some(mut t) => { + Some(t) => { //填充任务 - t = t.copy_to(task); - task.set_start_time(t.get_start_time()); - task.set_finish_time(now_microsecond()); //设置任务完成时间 + t.copy_to(task); reply = Some(t); }, None => (), @@ -76,11 +72,9 @@ impl SyncPool { //从同步延迟任务队列中弹出任务 fn delay_pop(&mut self, task: &mut Task) -> Option { match self.delay_queue.pop_front() { - Some(mut t) => { + Some(t) => { //填充任务 - t = t.copy_to(task); - task.set_start_time(t.get_start_time()); - task.set_finish_time(now_microsecond()); //设置任务完成时间 + t.copy_to(task); Some(t) }, None => Option::None, @@ -187,11 +181,9 @@ impl AsyncPool { if index > -1 { self.weight -= priority as u64; //减少异步任务池权重 match self.queue.remove(index as usize) { - Some(mut t) => { + Some(t) => { //填充任务 - t = t.copy_to(task); - task.set_start_time(t.get_start_time()); - task.set_finish_time(now_microsecond()); //设置任务完成时间 + t.copy_to(task); reply = Some(t); }, None => (), @@ -203,11 +195,9 @@ impl AsyncPool { //从异步延迟任务队列中弹出任务 fn delay_pop(&mut self, task: &mut Task) -> Option { match self.delay_queue.pop_front() { - Some(mut t) => { + Some(t) => { //填充任务 - t = t.copy_to(task); - task.set_start_time(t.get_start_time()); - task.set_finish_time(now_microsecond()); //设置任务完成时间 + t.copy_to(task); Some(t) }, None => Option::None, @@ -247,7 +237,6 @@ impl AsyncPool { * 任务池 */ pub struct TaskPool { - counter: u32, //任务编号计数器 task_cache: TaskCache, //任务缓存 sync_pool: SyncPool, //同步任务池 async_pool: AsyncPool, //异步任务池 @@ -257,8 +246,8 @@ unsafe impl Sync for TaskPool {} impl Display for TaskPool { fn fmt(&self, f: &mut Formatter) -> Result { - write!(f, "TaskPool[counter = {}, cache_size = {}, sync_pool = {}, async_pool = {}]", - self.counter, self.task_cache.size(), self.sync_pool, self.async_pool) + write!(f, "TaskPool[cache_size = {}, sync_pool = {}, async_pool = {}]", + self.task_cache.size(), self.sync_pool, self.async_pool) } } @@ -266,7 +255,6 @@ impl TaskPool { //构建一个任务池 pub fn new(len: u32) -> Self { TaskPool { - counter: 0, task_cache: TaskCache::new(len), sync_pool: SyncPool::new(), async_pool: AsyncPool::new(), @@ -325,14 +313,11 @@ impl TaskPool { } //向任务池加入一个任务 - pub fn push(&mut self, task_type: TaskType, priority: u32, func: Box, args: Vec) { + pub fn push(&mut self, task_type: TaskType, priority: u32, func: Box, info: &'static str) { let mut task: Task = self.task_cache.pop(); - task.set_uid(self.new_uid()); - task.set_type(task_type); task.set_priority(priority); task.set_func(Some(func)); - task.set_args(args); - task.set_start_time(now_microsecond()); //设置任务开始时间 + task.set_info(info); if priority > 0 { match task_type { TaskType::Async => { @@ -380,12 +365,6 @@ impl TaskPool { self.sync_pool.clear(); } - //获取新的任务编号 - fn new_uid(&mut self) -> u32 { - self.counter += 1; - self.counter - } - //释放指定任务 fn free(&mut self, task: Option) { match task { diff --git a/src/worker.rs b/src/worker.rs index c295cee..14c1a4e 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,35 +1,23 @@ use std::time; use std::thread; -use std::sync::RwLock; use std::sync::{Arc, Mutex, Condvar}; -use std::sync::mpsc::{Sender, Receiver, TryRecvError}; use std::fmt::{Display, Formatter, Result}; +use std::sync::atomic::{Ordering, AtomicUsize}; use threadpool::ThreadPool; +use util::now_microsecond; use task_pool::TaskPool; use task::Task; /* -* 控制信号 +* 工作者状态 */ -pub const CONTROL_SIGN_EXIT: u8 = 0; //退出 -pub const CONTROL_SIGN_SLEEP: u8 = 1; //休眠 -pub const CONTROL_SIGN_CONTINUE: u8 = 2; //继续 - -/* -* 状态信号 -*/ -pub const STATUS_STOP: u8 = 0; //停止 -pub const STATUS_WAIT: u8 = 1; //等待 -pub const STATUS_RUNNING: u8 = 2; //运行中 - -/* -* 工作者信号 -*/ -#[derive(Copy, Clone, Debug)] -pub enum WorkerSign { - Control(u8), //控制信号 +#[derive(Clone)] +pub enum WorkerStatus { + Stop = 0, //停止 + Wait, //等待 + Running, //运行中 } /* @@ -37,97 +25,86 @@ pub enum WorkerSign { */ #[derive(Debug)] pub struct Worker { - uid: u32, //工作者编号 - status: u8, //工作者状态 - sender: Sender, //发送器 - receiver: Receiver, //接收器 + uid: u32, //工作者编号 + slow: u32, //工作者慢任务时长,单位us + status: AtomicUsize, //工作者状态 + counter: AtomicUsize, //工作者计数器 } unsafe impl Sync for Worker {} //声明保证多线程安全性 impl Display for Worker { fn fmt(&self, f: &mut Formatter) -> Result { - write!(f, "Worker[uid = {}, status = {}]", self.uid, self.status) + write!(f, "Worker[uid = {}, slow = {}, status = {}, counter = {}]", + self.uid, self.slow, self.status.load(Ordering::Relaxed), self.counter.load(Ordering::Relaxed)) } } impl Worker { //创建一个工作者 - pub fn new(uid: u32, sender: Sender, receiver: Receiver) -> Self { + pub fn new(uid: u32, slow: u32) -> Self { Worker { uid: uid, - status: STATUS_STOP, - sender: sender, - receiver: receiver, + slow: slow, + status: AtomicUsize::new(WorkerStatus::Wait as usize), + counter: AtomicUsize::new(0), } } //启动 - pub fn startup(pool: &mut ThreadPool, worker: Arc>, sync: Arc<(Mutex, Condvar)>) { - if worker.read().unwrap().status == STATUS_RUNNING { - return; - } - + pub fn startup(pool: &mut ThreadPool, worker: Arc, sync: Arc<(Mutex, Condvar)>) -> bool { pool.execute(move|| { let mut task = Task::new(); Worker::work_loop(worker, sync, &mut task); }); + true } //工作循环 - fn work_loop(worker: Arc>, sync: Arc<(Mutex, Condvar)>, task: &mut Task) { - worker.write().unwrap().status = STATUS_RUNNING; + fn work_loop(worker: Arc, sync: Arc<(Mutex, Condvar)>, task: &mut Task) { + let mut status: usize; loop { - //处理控制信号 - match worker.read().unwrap().receiver.try_recv() { - Ok(sign) => { - match sign { - WorkerSign::Control(CONTROL_SIGN_CONTINUE) => { - //继续处理任务 - worker.write().unwrap().status = STATUS_RUNNING; - }, - WorkerSign::Control(CONTROL_SIGN_SLEEP) => { - //暂停处理任务,继续等待控制信号 - worker.write().unwrap().status = STATUS_WAIT; - thread::sleep(time::Duration::from_millis(1)); - continue; - } - WorkerSign::Control(CONTROL_SIGN_EXIT) => { - //退出当前循环 - worker.write().unwrap().status = STATUS_STOP; - break; - }, - _ => (), //忽略其它信号 - } - }, - Err(e) => { - match e { - TryRecvError::Empty => { - //没有收到控制信号 - match worker.read().unwrap().status { - STATUS_STOP => break, //退出当前循环 - STATUS_WAIT => { - //继续等待控制信号 - thread::sleep(time::Duration::from_millis(1)); - continue; - }, - _ => (), //继续处理任务 - } - }, - TryRecvError::Disconnected => { - //已断开通道 - //TODO... - }, - } - }, + status = worker.get_status(); + //处理控制状态 + if status == WorkerStatus::Stop as usize { + //退出当前循环 + break; + } else if status == WorkerStatus::Wait as usize { + //继续等待控制状态 + thread::sleep(time::Duration::from_millis(1)); + continue; + } else if status == WorkerStatus::Running as usize { + //继续工作 + worker.work(&sync, task); } - worker.read().unwrap().work(&sync, task); } } - //获取当前状态 - pub fn get_status(&self) -> u8 { - self.status + //获取工作者当前状态 + pub fn get_status(&self) -> usize { + self.status.load(Ordering::Relaxed) + } + + //设置工作者当前状态 + pub fn set_status(&self, current: WorkerStatus, new: WorkerStatus) -> bool { + match self.status.compare_exchange(current as usize, new as usize, Ordering::SeqCst, Ordering::Acquire) { + Ok(_) => true, + _ => false, + } + } + + //获取工作者的工作计数 + pub fn count(&self) -> usize { + self.counter.load(Ordering::Relaxed) + } + + //关闭工作者 + pub fn stop(&self) -> bool { + if self.get_status() == WorkerStatus::Stop as usize { + return true; + } + self.status.fetch_sub(WorkerStatus::Running as usize, Ordering::SeqCst); + true } //工作 @@ -142,6 +119,19 @@ impl Worker { } (*task_pool).pop(task); //获取任务 } - task.run(); //执行任务 + check_slow_task(self, task); //执行任务 + self.counter.fetch_add(1, Ordering::Relaxed); //增加工作计数 + } +} + +#[inline] +fn check_slow_task(worker: &Worker, task: &mut Task) { + let start_time = now_microsecond(); + task.run(); //执行任务 + let finish_time = now_microsecond() - start_time; + if finish_time >= worker.slow as i64 { + //记录慢任务 + //TODO... + println!("!!!!!!slow task, time: {}, task: {}", finish_time, task); } } \ No newline at end of file diff --git a/src/worker_pool.rs b/src/worker_pool.rs index 186a646..9203c75 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -1,25 +1,19 @@ -use std::sync::RwLock; -use std::sync::{Arc, Mutex, Condvar}; -use std::sync::mpsc::channel; -use std::sync::mpsc::{Sender, Receiver, RecvError, TryRecvError}; use std::collections::HashMap; +use std::sync::{Arc, Mutex, Condvar}; use std::fmt::{Display, Formatter, Result as FmtResult}; //避免和标准Result冲突,改名为FmtResult use threadpool::ThreadPool; use task_pool::TaskPool; -use worker::{CONTROL_SIGN_EXIT, CONTROL_SIGN_SLEEP, CONTROL_SIGN_CONTINUE}; -use worker::{STATUS_STOP, STATUS_WAIT, STATUS_RUNNING}; -use worker::{Worker, WorkerSign}; +use worker::{WorkerStatus, Worker}; /* * 工作者池 */ pub struct WorkerPool { - counter: u32, //工作者编号计数器 - receiver: Receiver, //工作者池接收器 - map: HashMap, Arc>)>, //工作者缓存 - thread_pool: ThreadPool, //线程池 + counter: u32, //工作者编号计数器 + map: HashMap>, //工作者缓存 + thread_pool: ThreadPool, //线程池 } impl Display for WorkerPool { @@ -31,22 +25,15 @@ impl Display for WorkerPool { impl WorkerPool { //构建指定数量工作者的工作者池 - pub fn new(len: usize) -> Self { + pub fn new(len: usize, slow: u32) -> Self { let mut counter: u32 = 0; - let (p_sender, p_receiver) = channel(); //工作者池通道 let mut map = HashMap::new(); for _ in 0..len { counter += 1; - let (w_sender, w_receiver) = channel(); //工作者通道 - map.insert(counter, ( - w_sender, - Arc::new(RwLock::new(Worker::new(counter, p_sender.clone(), w_receiver))) - ) - ); + map.insert(counter, Arc::new(Worker::new(counter, slow))); } WorkerPool { counter: counter, - receiver: p_receiver, map: map, thread_pool: ThreadPool::new(len), } @@ -58,11 +45,10 @@ impl WorkerPool { } //获取指定状态的工作者编号数组 - pub fn workers(&self, status: u8) -> Vec { + pub fn workers(&self, status: usize) -> Vec { let mut vec = Vec::::new(); - for (uid, pair) in self.map.iter() { - let (_, ref worker): (Sender, Arc>) = *pair; - if worker.read().unwrap().get_status() == status { + for (uid, worker) in self.map.iter() { + if worker.get_status() == status { vec.push(*uid); } } @@ -72,12 +58,8 @@ impl WorkerPool { //休眠指定工作者 pub fn sleep(&self, uid: u32) -> bool { match self.map.get(&uid) { - Some(pair) => { - let (ref sender, ref worker): (Sender, Arc>) = *pair; - if worker.read().unwrap().get_status() != STATUS_RUNNING { - return false; - } - sender.send(WorkerSign::Control(CONTROL_SIGN_SLEEP)).is_ok() + Some(worker) => { + worker.set_status(WorkerStatus::Running, WorkerStatus::Wait) }, None => false, } @@ -86,27 +68,18 @@ impl WorkerPool { //唤醒指定工作者 pub fn wakeup(&self, uid: u32) -> bool { match self.map.get(&uid) { - Some(pair) => { - let (ref sender, ref worker): (Sender, Arc>) = *pair; - if worker.read().unwrap().get_status() != STATUS_WAIT { - return false; - } - sender.send(WorkerSign::Control(CONTROL_SIGN_CONTINUE)).is_ok() + Some(worker) => { + worker.set_status(WorkerStatus::Wait, WorkerStatus::Running) }, None => false, } } //停止指定工作者 - pub fn stop(&self, uid: u32) -> bool { - match self.map.get(&uid) { - Some(pair) => { - let (ref sender, ref worker): (Sender, Arc>) = *pair; - if worker.read().unwrap().get_status() == STATUS_STOP { - //如果已停止,则忽略 - return true; - } - sender.send(WorkerSign::Control(CONTROL_SIGN_EXIT)).is_ok() + pub fn stop(&mut self, uid: u32) -> bool { + match self.map.get_mut(&uid) { + Some(worker) => { + worker.stop() }, None => false, } @@ -115,13 +88,12 @@ impl WorkerPool { //启动工作者,启动时需要指定任务池的同步对象 pub fn start(&mut self, sync: Arc<(Mutex, Condvar)>, uid: u32) -> bool { match self.map.get_mut(&uid) { - Some(pair) => { - let (_, ref worker): (Sender, Arc>) = *pair; - if worker.read().unwrap().get_status() != STATUS_STOP { - return false; + Some(worker) => { + if worker.set_status(WorkerStatus::Wait, WorkerStatus::Running) { + Worker::startup(&mut self.thread_pool, worker.clone(), sync.clone()) + } else { + false } - Worker::startup(&mut self.thread_pool, worker.clone(), sync.clone()); - true }, None => false, } @@ -129,19 +101,10 @@ impl WorkerPool { //在指定任务池中,运行工作池,需要指定任务池的同步对象 pub fn run(&mut self, sync: Arc<(Mutex, Condvar)>) { - for (_, pair) in self.map.iter() { - let (_, ref worker): (Sender, Arc>) = *pair; - Worker::startup(&mut self.thread_pool, worker.clone(), sync.clone()); + for (_, worker) in self.map.iter() { + if worker.set_status(WorkerStatus::Wait, WorkerStatus::Running) { + Worker::startup(&mut self.thread_pool, worker.clone(), sync.clone()); + } } } - - //阻塞并接收工作者信号 - pub fn recv(&self) -> Result { - self.receiver.recv() - } - - //接收工作者信号 - pub fn try_recv(&self) -> Result { - self.receiver.try_recv() - } -} \ No newline at end of file +} diff --git a/tests/test.rs b/tests/test.rs index a398295..63444a6 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -154,35 +154,33 @@ fn task_test() { let task_pool = TaskPool::new(10); let sync = Arc::new((Mutex::new(task_pool), Condvar::new())); - let mut worker_pool = Box::new(WorkerPool::new(3)); + let mut worker_pool = Box::new(WorkerPool::new(3, 1)); worker_pool.run(sync.clone()); let task_type = TaskType::Async; let priority = 0; let copy_sync = sync.clone(); let func = Box::new(move|| { - copy.call("echo".to_string(), &[copy.new_boolean(true), copy.new_f64(0.999), copy.new_str("Hello World!!!!!!".to_string())]); + copy.call("echo".to_string(), &[copy.new_boolean(false), copy.new_u64(0xfffffffffff), copy.new_str("Hello World!!!!!!".to_string())]); let task_type = TaskType::Async; let priority = 10; let func = Box::new(move|| { - copy.call("echo".to_string(), &[copy.new_boolean(true), copy.new_f64(0.999), copy.new_str("Hello World!!!!!!".to_string())]); + copy.call("echo".to_string(), &[copy.new_boolean(true), copy.new_f64(0.999), copy.new_str("你好 World!!!!!!".to_string())]); }); - let args = Vec::new(); { let &(ref lock, ref cvar) = &*copy_sync; let mut task_pool = lock.lock().unwrap(); - (*task_pool).push(task_type, priority, func, args); + (*task_pool).push(task_type, priority, func, "first task"); println!("task_pool: {}", task_pool); cvar.notify_one(); } thread::sleep(Duration::from_millis(10000)); }); - let args = Vec::new(); { let &(ref lock, ref cvar) = &*sync; let mut task_pool = lock.lock().unwrap(); - (*task_pool).push(task_type, priority, func, args); + (*task_pool).push(task_type, priority, func, "second task"); println!("task_pool: {}", task_pool); cvar.notify_one(); }