From 5fc9c6b4c4c61cc0a689da532bf2cba21a255379 Mon Sep 17 00:00:00 2001 From: zuon Date: Thu, 7 Jun 2018 14:01:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86=E4=BB=BB=E5=8A=A1=E6=B1=A0=E3=80=81?= =?UTF-8?q?=E5=B7=A5=E4=BD=9C=E6=B1=A0=E5=92=8C=E5=B7=A5=E5=85=B7=E5=BA=93?= =?UTF-8?q?=E7=A7=BB=E5=8A=A8=E5=88=B0pi=5Fbase?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 3 +- src/lib.rs | 8 +- src/pi_vm_impl.rs | 28 +--- src/task.rs | 143 ----------------- src/task_pool.rs | 377 --------------------------------------------- src/util.rs | 79 ---------- src/worker.rs | 150 ------------------ src/worker_pool.rs | 155 ------------------- tests/test.rs | 27 ++-- 9 files changed, 24 insertions(+), 946 deletions(-) delete mode 100644 src/task.rs delete mode 100644 src/task_pool.rs delete mode 100644 src/util.rs delete mode 100644 src/worker.rs delete mode 100644 src/worker_pool.rs diff --git a/Cargo.toml b/Cargo.toml index c388a0e..d8fb7b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,4 +13,5 @@ threadpool = "*" lazy_static = "*" kernel32-sys = "*" -pi_lib = { path = "../pi_lib" } \ No newline at end of file +pi_lib = { path = "../pi_lib" } +pi_base = { path = "../pi_base" } \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 4df30ed..cfa0eeb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,13 +9,11 @@ #![feature(slice_internals)] #![feature(duration_from_micros)] -extern crate fnv; extern crate core; extern crate time; extern crate libc; extern crate rand; extern crate magnetic; -extern crate threadpool; #[macro_use] extern crate lazy_static; @@ -24,13 +22,9 @@ extern crate lazy_static; extern crate kernel32; extern crate pi_lib; +extern crate pi_base; pub mod adapter; -pub mod util; -pub mod worker; -pub mod worker_pool; -pub mod task; -pub mod task_pool; pub mod native_object_impl; pub mod pi_vm_impl; pub mod bonmgr; \ No newline at end of file diff --git a/src/pi_vm_impl.rs b/src/pi_vm_impl.rs index 94cf964..c0d9fb1 100644 --- a/src/pi_vm_impl.rs +++ b/src/pi_vm_impl.rs @@ -1,22 +1,18 @@ use std::thread; use std::boxed::FnBox; use std::time::Duration; -use std::sync::{Arc, Mutex, Condvar}; +use std::sync::Arc; use std::sync::atomic::{Ordering, AtomicUsize}; use magnetic::mpmc::*; use magnetic::buffer::dynamic::DynamicBuffer; use magnetic::{Producer, Consumer}; -use task::TaskType; -use task_pool::TaskPool; +use pi_base::task::TaskType; +use pi_base::pi_base_impl::cast_js_task; use adapter::{JSStatus, JS, try_js_destroy, dukc_vm_status_check, dukc_vm_status_switch, dukc_vm_status_sub, dukc_wakeup, dukc_continue, js_reply_callback}; use pi_lib::atom::Atom; -lazy_static! { - pub static ref JS_TASK_POOL: Arc<(Mutex, Condvar)> = Arc::new((Mutex::new(TaskPool::new(10)), Condvar::new())); -} - /* * 虚拟机工厂 */ @@ -89,7 +85,7 @@ impl VMFactory { vm = args(vm); vm.call(4); }); - cast_task(TaskType::Sync, 5000000000 + uid as u64, func, info); + cast_js_task(TaskType::Sync, 5000000000 + uid as u64, func, info); } } } @@ -105,7 +101,7 @@ impl VMFactory { Ok(_) => (), } }); - cast_task(TaskType::Sync, 5000000000 + uid as u64, func, info); + cast_js_task(TaskType::Sync, 5000000000 + uid as u64, func, info); }, } } @@ -130,16 +126,6 @@ impl VMFactory { } } -/* -* 线程安全的向任务池投递任务 -*/ -pub fn cast_task(task_type: TaskType, priority: u64, func: Box, info: Atom) { - let &(ref lock, ref cvar) = &**JS_TASK_POOL; - let mut task_pool = lock.lock().unwrap(); - (*task_pool).push(task_type, priority, func, info); - cvar.notify_one(); -} - /* * 线程安全的回应阻塞调用 */ @@ -169,7 +155,7 @@ pub fn block_reply(js: Arc, result: Box)>, task_type: TaskType } } }); - cast_task(task_type, priority, func, info); + cast_js_task(task_type, priority, func, info); } /* @@ -201,7 +187,7 @@ pub fn block_throw(js: Arc, reason: String, task_type: TaskType, priority: u } } }); - cast_task(task_type, priority, func, info); + cast_js_task(task_type, priority, func, info); } #[cfg(all(feature="unstable", any(target_arch = "x86", target_arch = "x86_64")))] diff --git a/src/task.rs b/src/task.rs deleted file mode 100644 index c3795bd..0000000 --- a/src/task.rs +++ /dev/null @@ -1,143 +0,0 @@ -use std::boxed::FnBox; -use std::mem::transmute; -use std::collections::VecDeque; -use std::fmt::{Display, Formatter, Result}; - -use pi_lib::atom::Atom; - -/* -* 任务类型 -*/ -#[derive(Copy, Clone, Debug)] -pub enum TaskType { - Empty, //空任务 - Async, //异步任务 - Sync, //同步任务 - SyncImme, //同步立即任务 -} - -/* -* 任务结构 -*/ -pub struct Task { - priority: u64, //任务优先级 - func: (usize, usize), //任务函数 - info: Atom, //任务信息 -} - -unsafe impl Sync for Task {} //声明保证多线程安全性 - -impl Display for Task { - fn fmt(&self, f: &mut Formatter) -> Result { - write!(f, "Task[priority = {}, func = {:?}, info = {}]", self.priority, self.func, *self.info) - } -} - -impl Task { - pub fn new() -> Self { - Task { - priority: 0, - func: (0, 0), - info: Atom::from(""), - } - } - - pub fn copy_to(&self, dest: &mut Self) { - //复制其它成员 - dest.priority = self.priority; - dest.func = self.func; - dest.info = self.info.clone(); - } - - pub fn get_priority(&self) -> u64 { - self.priority - } - - pub fn set_priority(&mut self, priority: u64) { - self.priority = priority; - } - - pub fn set_func(&mut self, func: Option>) { - match func { - Some(f) => { - let (x, y): (usize, usize) = unsafe { transmute(f) }; - self.func.0 = x; - self.func.1 = y; - }, - None => (), - } - } - - pub fn get_info(&self) -> &str { - self.info.as_str() - } - - pub fn set_info(&mut self, info: Atom) { - self.info = info; - } - - pub fn reset(&mut self) { - self.priority = 0; - self.func = (0, 0); - self.info = Atom::from(""); - } - - pub fn run(&self) { - if self.func == (0, 0) { - return; - } - let func: Box = unsafe { transmute(self.func) }; - func(); - } -} - -/* -* 任务缓存结构 -*/ -pub struct TaskCache { - cache: VecDeque, //任务缓存 -} - -impl Display for TaskCache { - fn fmt(&self, f: &mut Formatter) -> Result { - write!(f, "TaskCache[size = {}]", self.cache.len()) - } -} - -impl TaskCache { - pub fn new(len: u32) -> Self { - if len < 1 { - panic!("invalid task cache size"); - } - - let mut cache = VecDeque::with_capacity(len as usize); - for _ in 0..len { - cache.push_back(Task::new()); - } - TaskCache { - cache: cache, - } - } - - pub fn pop(&mut self) -> Task { - match self.cache.pop_front() { - Some(e) => e, - None => Task::new(), - } - } - - pub fn push(&mut self, mut entry: Task) { - entry.reset(); - self.cache.push_back(entry); - } - - pub fn clean(&mut self) -> usize { - //TODO... - self.size() - } - - pub fn size(&self) -> usize { - self.cache.len() - } -} - diff --git a/src/task_pool.rs b/src/task_pool.rs deleted file mode 100644 index a6a0c31..0000000 --- a/src/task_pool.rs +++ /dev/null @@ -1,377 +0,0 @@ -use rand; -use rand::Rng; -use fnv::FnvHashMap; -use std::boxed::FnBox; -use std::collections::VecDeque; -use std::fmt::{Display, Formatter, Result}; - -use pi_lib::atom::Atom; -use task::{TaskType, Task, TaskCache}; - -/* -* 同步任务池 -*/ -struct SyncPool { - weight: u64, //同步任务池权重 - map: FnvHashMap>, //同步任务队列表 - delay_queue: VecDeque, //延迟同步任务队列 -} - -impl Display for SyncPool { - fn fmt(&self, f: &mut Formatter) -> Result { - write!(f, "SyncPool[weight = {}, priority_size = {}, size = {}, delay_size = {}]", - self.weight, self.map.len(), self.size(), self.delay_size()) - } -} - -impl SyncPool { - //构建一个同步任务池 - fn new() -> Self { - SyncPool { - weight: 0, - map: FnvHashMap::default(), - delay_queue: VecDeque::new(), - } - } - - //获取同步任务数量 - fn size(&self) -> u64 { - let mut size: u64 = 0; - for val in self.map.values() { - size += val.len() as u64; - } - size - } - - //获取延迟同步任务数量 - fn delay_size(&self) -> u64 { - self.delay_queue.len() as u64 - } - - //从同步任务队列中弹出任务 - fn pop(&mut self, weight: u64, task: &mut Task) -> Option { - let mut reply = Option::None; - let mut w: i64 = weight as i64; - for (priority, queue) in self.map.iter_mut() { - w -= (priority * (queue.len() as u64)) as i64; - if w < 0 { - self.weight -= priority; //减少同步任务池权重 - match queue.pop_front() { - Some(t) => { - //填充任务 - t.copy_to(task); - reply = Some(t); - }, - None => (), - } - break; - } - } - reply - } - - //从同步延迟任务队列中弹出任务 - fn delay_pop(&mut self, task: &mut Task) -> Option { - match self.delay_queue.pop_front() { - Some(t) => { - //填充任务 - t.copy_to(task); - Some(t) - }, - None => Option::None, - } - } - - //向同步任务队列尾加入任务 - fn push_back(&mut self, task: Task) { - let priority = task.get_priority() as u64; - self.weight += priority; - self.map.entry(priority).or_insert(VecDeque::new()).push_back(task); //获取指定优先级的同步任务队列并加入任务,如果队列为空,则创建一个队列后再加入任务 - } - - //向同步任务队列头加入任务 - fn push_front(&mut self, task: Task) { - let priority = task.get_priority() as u64; - self.weight += priority; - self.map.entry(priority).or_insert(VecDeque::new()).push_front(task); //获取指定优先级的同步任务队列并加入任务,如果队列为空,则创建一个队列后再加入任务 - } - - //向同步延迟任务队列尾加入任务 - fn delay_push_back(&mut self, task: Task) { - self.delay_queue.push_back(task); - } - - //向同步延迟任务队列头加入任务 - fn delay_push_front(&mut self, task: Task) { - self.delay_queue.push_front(task); - } - - //移除指定优先级的同步任务队列 - fn remove(&mut self, priority: u64) { - self.map.remove(&(priority as u64)); - } - - //移除同步延迟队列任务 - fn delay_remove(&mut self) { - self.delay_queue.clear(); - } - - //清空同步任务池 - fn clear(&mut self) { - self.map.clear(); - self.delay_remove(); - } -} - -/* -* 异步任务池 -*/ -struct AsyncPool { - weight: u64, //异步任务队列权重 - queue: VecDeque, //异步任务队列 - delay_queue: VecDeque, //延迟异步任务队列 -} - -impl Display for AsyncPool { - fn fmt(&self, f: &mut Formatter) -> Result { - write!(f, "AsyncPool[weight = {}, size = {}, delay_size = {}]", - self.weight, self.size(), self.delay_size()) - } -} - -impl AsyncPool { - //构建一个同步任务池 - fn new() -> Self { - AsyncPool { - weight: 0, - queue: VecDeque::new(), - delay_queue: VecDeque::new(), - } - } - - //获取异步任务数量 - fn size(&self) -> u64 { - self.queue.len() as u64 - } - - //获取延迟异步任务数量 - fn delay_size(&self) -> u64 { - self.delay_queue.len() as u64 - } - - //从异步任务队列中弹出任务 - fn pop(&mut self, mut weight: u64, task: &mut Task) -> Option { - let mut index = -1; - let mut priority = 0; - let mut reply = Option::None; - for i in 0..self.queue.len() { - match self.queue.get_mut(i) { - Some(t) => { - priority = t.get_priority(); - if weight < (priority as u64) { - //已选中异步任务 - index = i as isize; - break; - } - //没有选中,则减少权重继续查找下一个任务 - weight -= priority as u64; - }, - None => continue, - } - } - if index > -1 { - self.weight -= priority as u64; //减少异步任务池权重 - match self.queue.remove(index as usize) { - Some(t) => { - //填充任务 - t.copy_to(task); - reply = Some(t); - }, - None => (), - } - } - reply - } - - //从异步延迟任务队列中弹出任务 - fn delay_pop(&mut self, task: &mut Task) -> Option { - match self.delay_queue.pop_front() { - Some(t) => { - //填充任务 - t.copy_to(task); - Some(t) - }, - None => Option::None, - } - } - - //向异步任务队列尾加入任务 - fn push_back(&mut self, task: Task) { - let priority = task.get_priority() as u64; - self.weight += priority; - self.queue.push_back(task); - } - - //向异步延迟任务队列尾加入任务 - fn delay_push_back(&mut self, task: Task) { - self.delay_queue.push_back(task); - } - - //移除异步任务队列 - pub fn remove(&mut self) { - self.queue.clear(); - } - - //移除异步延迟队列任务 - pub fn delay_remove(&mut self) { - self.delay_queue.clear(); - } - - //清空异步任务池 - fn clear(&mut self) { - self.remove(); - self.delay_remove(); - } -} - -/* -* 任务池 -*/ -pub struct TaskPool { - task_cache: TaskCache, //任务缓存 - sync_pool: SyncPool, //同步任务池 - async_pool: AsyncPool, //异步任务池 -} - -unsafe impl Sync for TaskPool {} - -impl Display for TaskPool { - fn fmt(&self, f: &mut Formatter) -> Result { - write!(f, "TaskPool[cache_size = {}, sync_pool = {}, async_pool = {}]", - self.task_cache.size(), self.sync_pool, self.async_pool) - } -} - -impl TaskPool { - //构建一个任务池 - pub fn new(len: u32) -> Self { - TaskPool { - task_cache: TaskCache::new(len), - sync_pool: SyncPool::new(), - async_pool: AsyncPool::new(), - } - } - - //获取任务数量 - pub fn size(&self) -> u64 { - self.sync_pool.size() + self.sync_pool.delay_size() + self.async_pool.size() + self.async_pool.delay_size() - } - - //从任务池中弹出一个任务 - pub fn pop(&mut self, task: &mut Task) { - let mut wait_free: Option; - let mut r: u64; - let mut sw = self.sync_pool.weight; - let mut aw = self.async_pool.weight; - let w = sw + aw; - if w > 0 { - //判断从同步还是异步任务队列中弹出 - r = rand::thread_rng().gen_range(0, w); - if r < sw { - //从同步任务队列中弹出 - wait_free=self.sync_pool.pop(r, task); - self.free(wait_free); - } else { - //从异步任务队列中弹出 - wait_free=self.async_pool.pop(r - sw, task); - self.free(wait_free); - } - } - sw = self.sync_pool.delay_size(); - aw = self.async_pool.delay_size(); - if sw > 0 { - if aw > 0 { - //判断从同步还是异步延迟任务队列中弹出 - r = rand::thread_rng().gen_range(0, w); - if r < sw { - //从同步延迟任务队列中弹出 - wait_free=self.sync_pool.delay_pop(task); - self.free(wait_free); - } else { - //从异步延迟任务队列中弹出 - wait_free=self.async_pool.delay_pop(task); - self.free(wait_free); - } - } else { - //只有从同步延迟任务队列中弹出 - wait_free=self.sync_pool.delay_pop(task); - self.free(wait_free); - } - } else if aw > 0 { - //只有从异步延迟任务队列中弹出 - wait_free=self.async_pool.delay_pop(task); - self.free(wait_free); - } - } - - //向任务池加入一个任务 - pub fn push(&mut self, task_type: TaskType, priority: u64, func: Box, info: Atom) { - let mut task: Task = self.task_cache.pop(); - task.set_priority(priority); - task.set_func(Some(func)); - task.set_info(info); - if priority > 0 { - match task_type { - TaskType::Async => { - //加入异步任务队列 - self.async_pool.push_back(task); - }, - TaskType::Sync => { - //加入同步任务队列尾 - self.sync_pool.push_back(task); - }, - TaskType::SyncImme => { - //加入同步任务队列头 - self.sync_pool.push_front(task); - }, - _ => (), - } - } else { - //加入延迟任务队列 - match task_type { - TaskType::Async => { - //加入异步延迟任务队列 - self.async_pool.delay_push_back(task); - }, - TaskType::Sync => { - //加入同步延迟任务队列尾 - self.sync_pool.delay_push_back(task); - }, - TaskType::SyncImme => { - //加入同步延迟任务队列头 - self.sync_pool.delay_push_front(task); - }, - _ => (), - } - } - } - - //移除指定优先级的同步任务 - pub fn remove_sync_task(&mut self, priority: u64) { - self.sync_pool.remove(priority); - } - - //清空所有任务 - pub fn clear(&mut self) { - self.async_pool.clear(); - self.sync_pool.clear(); - } - - //释放指定任务 - fn free(&mut self, task: Option) { - match task { - Some(t) => self.task_cache.push(t), - None => (), - } - } -} \ No newline at end of file diff --git a/src/util.rs b/src/util.rs deleted file mode 100644 index 0ceeabf..0000000 --- a/src/util.rs +++ /dev/null @@ -1,79 +0,0 @@ -use time; -use libc::c_void; -use std::sync::Arc; - -/* -* 获取当前本地时间的秒数 -*/ -pub fn now_second() -> i64 { - time::get_time().sec -} - -/* -* 获取当前本地时间的毫秒数 -*/ -pub fn now_millisecond() -> i64 { - time::get_time().sec * 1000 + (time::get_time().nsec / 1000000) as i64 -} - -/* -* 获取当前本地时间的微秒数 -*/ -pub fn now_microsecond() -> i64 { - time::get_time().sec * 1000000 + (time::get_time().nsec / 1000) as i64 -} - -/* -* 获取当前本地时间的纳秒数 -*/ -pub fn now_nanosecond() -> i128 { - (time::get_time().sec * 1000000000) as i128 + time::get_time().nsec as i128 -} - -/* -* 将box转换为*const c_void -*/ -#[inline] -pub fn box2void(ptr_box: Box) -> *const c_void { - Box::into_raw(ptr_box) as *const c_void -} - -/* -* 将*mut c_void转换为box -*/ -#[inline] -pub fn void2box(ptr_void: *mut c_void) -> Box { - unsafe { Box::from_raw(ptr_void as *mut T) } -} - -/* -* 将Arc转换为*const c_void -*/ -#[inline] -pub fn arc2void(ptr_box: Arc) -> *const c_void { - Arc::into_raw(ptr_box) as *const c_void -} - -/* -* 将*mut c_void转换为Arc -*/ -#[inline] -pub fn void2arc(ptr_void: *mut c_void) -> Arc { - unsafe { Arc::from_raw(ptr_void as *mut T) } -} - -/* -* 将*const c_void转换为usize -*/ -#[inline] -pub fn void2usize(ptr_void: *const c_void) -> usize { - ptr_void as usize -} - -/* -* 将usize转换为*const c_void -*/ -#[inline] -pub fn usize2void(ptr: usize) -> *const c_void { - ptr as *const c_void -} \ No newline at end of file diff --git a/src/worker.rs b/src/worker.rs deleted file mode 100644 index e7b8bd1..0000000 --- a/src/worker.rs +++ /dev/null @@ -1,150 +0,0 @@ -use std::thread::park_timeout; -use std::time::{Instant, Duration}; -use std::sync::{Arc, Mutex, Condvar}; -use std::fmt::{Display, Formatter, Result}; -use std::sync::atomic::{Ordering, AtomicUsize}; - -use threadpool::ThreadPool; - -use task_pool::TaskPool; -use task::Task; - -/* -* 工作者状态 -*/ -#[derive(Clone)] -pub enum WorkerStatus { - Stop = 0, //停止 - Wait, //等待 - Running, //运行中 -} - -/* -* 工作者 -*/ -#[derive(Debug)] -pub struct Worker { - uid: u32, //工作者编号 - slow: Duration, //工作者慢任务时长,单位us - status: AtomicUsize, //工作者状态 - counter: AtomicUsize, //工作者计数器 -} - -unsafe impl Sync for Worker {} //声明保证多线程安全性 - -impl Display for Worker { - fn fmt(&self, f: &mut Formatter) -> Result { - write!(f, "Worker[uid = {}, slow = {:?}, status = {}, counter = {}]", - self.uid, self.slow, self.status.load(Ordering::Relaxed), self.counter.load(Ordering::Relaxed)) - } -} - -impl Worker { - //创建一个工作者 - pub fn new(uid: u32, slow: u32) -> Self { - Worker { - uid: uid, - slow: Duration::from_micros(slow as u64), - status: AtomicUsize::new(WorkerStatus::Wait as usize), - counter: AtomicUsize::new(0), - } - } - - //启动 - pub fn startup(pool: &ThreadPool, worker: Arc, sync: Arc<(Mutex, Condvar)>) -> bool { - pool.execute(move|| { - let mut task = Task::new(); - Worker::work_loop(worker, sync, &mut task); - }); - true - } - - //工作循环 - fn work_loop(worker: Arc, sync: Arc<(Mutex, Condvar)>, task: &mut Task) { - let mut status: usize; - loop { - status = worker.get_status(); - //处理控制状态 - if status == WorkerStatus::Stop as usize { - //退出当前循环 - break; - } else if status == WorkerStatus::Wait as usize { - //继续等待控制状态 - park_timeout(Duration::from_millis(1000)); - continue; - } else if status == WorkerStatus::Running as usize { - //继续工作 - worker.work(&sync, task); - } - } - } - - //获取工作者当前状态 - #[inline] - pub fn get_status(&self) -> usize { - self.status.load(Ordering::Relaxed) - } - - //设置工作者当前状态 - pub fn set_status(&self, current: WorkerStatus, new: WorkerStatus) -> bool { - match self.status.compare_exchange(current as usize, new as usize, Ordering::Acquire, Ordering::Relaxed) { - Ok(_) => true, - _ => false, - } - } - - //获取工作者的工作计数 - pub fn count(&self) -> usize { - self.counter.load(Ordering::Relaxed) - } - - //关闭工作者 - pub fn stop(&self) -> bool { - if self.get_status() == WorkerStatus::Stop as usize { - return true; - } - match self.status.compare_exchange(WorkerStatus::Running as usize, WorkerStatus::Stop as usize, - Ordering::Acquire, Ordering::Relaxed) { - Ok(_) => true, - _ => { - match self.status.compare_exchange(WorkerStatus::Wait as usize, WorkerStatus::Stop as usize, - Ordering::Acquire, Ordering::Relaxed) { - Ok(_) => true, - _ => false, - } - }, - } - } - - //工作 - fn work(&self, sync: &Arc<(Mutex, Condvar)>, task: &mut Task) { - //同步块 - { - let &(ref lock, ref cvar) = &**sync; - let mut task_pool = lock.lock().unwrap(); - while (*task_pool).size() == 0 { - //等待任务 - let (pool, wait) = cvar.wait_timeout(task_pool, Duration::from_micros(1000)).unwrap(); - if wait.timed_out() { - return //等待超时,则立即解锁,并处理控制状态 - } - task_pool = pool; - } - (*task_pool).pop(task); //获取任务 - } - check_slow_task(self, task); //执行任务 - self.counter.fetch_add(1, Ordering::Acquire); //增加工作计数 - } -} - -#[inline] -fn check_slow_task(worker: &Worker, task: &mut Task) { - let time = Instant::now(); - task.run(); //执行任务 - let elapsed = time.elapsed(); - if time.elapsed() >= worker.slow { - //记录慢任务 - //TODO... - println!("!!!!!!slow task, time: {}, task: {}", elapsed.as_secs() * 1000000 + (elapsed.subsec_micros() as u64), task); - } -} \ No newline at end of file diff --git a/src/worker_pool.rs b/src/worker_pool.rs deleted file mode 100644 index ed046db..0000000 --- a/src/worker_pool.rs +++ /dev/null @@ -1,155 +0,0 @@ -use fnv::FnvHashMap; -use std::sync::{Arc, Mutex, Condvar}; -use std::fmt::{Display, Formatter, Result as FmtResult}; //避免和标准Result冲突,改名为FmtResult - -use threadpool::{ThreadPool, Builder as ThreadPoolBuilder}; - -use task_pool::TaskPool; -use worker::{WorkerStatus, Worker}; - -/* -* 工作者池 -*/ -pub struct WorkerPool { - counter: u32, //工作者编号计数器 - map: FnvHashMap>, //工作者缓存 - thread_pool: ThreadPool, //线程池 -} - -impl Display for WorkerPool { - fn fmt(&self, f: &mut Formatter) -> FmtResult { - let pool = self.thread_pool.clone(); - write!(f, "WorkerPool[counter = {}, worker_size = {}, wait_size = {}, active_size = {}, panic_size = {}]", - self.counter, self.size(), pool.queued_count(), pool.active_count(), pool.panic_count()) - } -} - -impl WorkerPool { - //构建指定数量工作者的工作者池 - pub fn new(len: usize, stack_size: usize, slow: u32) -> Self { - let mut counter: u32 = 0; - let mut map = FnvHashMap::default(); - for _ in 0..len { - counter += 1; - map.insert(counter, Arc::new(Worker::new(counter, slow))); - } - WorkerPool { - counter: counter, - map: map, - thread_pool: ThreadPoolBuilder::new(). - num_threads(len). - thread_stack_size(stack_size). - build(), - } - } - - //获取工作者数量 - pub fn size(&self) -> u32 { - self.map.len() as u32 - } - - //获取指定状态的工作者编号数组 - pub fn workers(&self, status: usize) -> Vec { - let mut vec = Vec::::new(); - for (uid, worker) in self.map.iter() { - if worker.get_status() == status { - vec.push(*uid); - } - } - vec - } - - //休眠指定工作者 - pub fn sleep(&self, uid: u32) -> bool { - match self.map.get(&uid) { - Some(worker) => { - worker.set_status(WorkerStatus::Running, WorkerStatus::Wait) - }, - None => false, - } - } - - //唤醒指定工作者 - pub fn wakeup(&self, uid: u32) -> bool { - match self.map.get(&uid) { - Some(worker) => { - worker.set_status(WorkerStatus::Wait, WorkerStatus::Running) - }, - None => false, - } - } - - //停止指定工作者 - pub fn stop(&self, uid: u32) -> bool { - match self.map.get(&uid) { - Some(worker) => { - worker.stop() - }, - None => false, - } - } - - //启动工作者,启动时需要指定任务池的同步对象 - pub fn start(&self, sync: Arc<(Mutex, Condvar)>, uid: u32) -> bool { - match self.map.get(&uid) { - Some(worker) => { - if worker.set_status(WorkerStatus::Stop, WorkerStatus::Running) { - Worker::startup(&self.thread_pool, worker.clone(), sync.clone()) - } else { - false - } - }, - None => false, - } - } - - //在指定任务池中,运行工作池,需要指定任务池的同步对象 - pub fn run(&self, sync: Arc<(Mutex, Condvar)>) { - for (_, worker) in self.map.iter() { - if worker.set_status(WorkerStatus::Wait, WorkerStatus::Running) { - Worker::startup(&self.thread_pool, worker.clone(), sync.clone()); - } - } - } - - //增加工作者 - pub fn increase(&mut self, sync: Arc<(Mutex, Condvar)>, len: usize, slow: u32) { - if len == 0 { - return; - } - - let start = self.counter + 1; - let mut worker: Arc; - for _ in 0..len { - self.counter += 1; - worker = Arc::new(Worker::new(self.counter, slow)); - worker.stop(); - self.map.insert(self.counter, worker.clone()); - } - let end = self.counter + 1; - self.thread_pool.set_num_threads(self.counter as usize); - for uid in start..end { - self.start(sync.clone(), uid); //启动新创建的工作者 - } - } - - //减少工作者 - pub fn decrease(&mut self, len: usize) { - if len == 0 || len > self.counter as usize { - return; - } - - self.counter -= len as u32; - let min = self.counter; - //从工作池中移除已关闭的工作者 - self.map.retain(|&uid, worker| { - //从尾部开始关闭工作者 - if uid > min { - !worker.stop() - } else { - true - } - }); - self.thread_pool.set_num_threads(self.counter as usize); - } -} \ No newline at end of file diff --git a/tests/test.rs b/tests/test.rs index 4bc5418..f639e8d 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -3,20 +3,21 @@ #[cfg(test)] extern crate pi_vm; extern crate pi_lib; +extern crate pi_base; extern crate threadpool; use std::thread; use std::time::{Instant, Duration}; use std::sync::{Arc, Mutex, Condvar}; -use pi_vm::task::TaskType; -use pi_vm::task_pool::TaskPool; -use pi_vm::util::now_nanosecond; -use pi_vm::worker_pool::WorkerPool; -use pi_vm::pi_vm_impl::{JS_TASK_POOL, VMFactory, cast_task, block_reply, block_throw}; -use pi_vm::adapter::{load_lib_backtrace, register_native_object, dukc_remove_value, JSTemplate, JS}; - use pi_lib::atom::Atom; +use pi_base::task::TaskType; +use pi_base::task_pool::TaskPool; +use pi_base::util::now_nanosecond; +use pi_base::worker_pool::WorkerPool; +use pi_base::pi_base_impl::{JS_TASK_POOL, cast_js_task}; +use pi_vm::pi_vm_impl::{VMFactory, block_reply, block_throw}; +use pi_vm::adapter::{load_lib_backtrace, register_native_object, dukc_remove_value, JSTemplate, JS}; // // #[test] // fn njsc_test() { @@ -347,7 +348,7 @@ fn native_object_call_block_reply_test() { arc1.new_str("你好 World!!!!!!".to_string()); arc1.call(3); }); - cast_task(task_type, priority, func, Atom::from("call block task")); + cast_js_task(task_type, priority, func, Atom::from("call block task")); thread::sleep(Duration::from_millis(500)); //保证同步任务先执行 let result = |vm: Arc| { @@ -403,7 +404,7 @@ fn native_object_call_block_reply_test_by_clone() { copy1.new_str("你好 World!!!!!!".to_string()); copy1.call(3); }); - cast_task(task_type, priority, func, Atom::from("call block task")); + cast_js_task(task_type, priority, func, Atom::from("call block task")); thread::sleep(Duration::from_millis(500)); //保证同步任务先执行 let result = |vm: Arc| { @@ -462,16 +463,16 @@ fn task_test() { copy.call(3); thread::sleep(Duration::from_millis(1000)); //延迟结束任务 }); - cast_task(task_type, priority, func, Atom::from("first task")); + cast_js_task(task_type, priority, func, Atom::from("first task")); thread::sleep(Duration::from_millis(1000)); //延迟结束任务 }); - cast_task(task_type, priority, func, Atom::from("second task")); + cast_js_task(task_type, priority, func, Atom::from("second task")); println!("worker_pool: {}", worker_pool); //测试运行任务的同时增加工作者 for index in 0..10 { let mut copy: JS = (&js).clone().unwrap(); copy.run(); - cast_task(TaskType::Sync, 10, Box::new(move || { + cast_js_task(TaskType::Sync, 10, Box::new(move || { copy.get_js_function("echo".to_string()); copy.new_boolean(true); copy.new_u64(index); @@ -487,7 +488,7 @@ fn task_test() { for index in 0..10 { let mut copy: JS = (&js).clone().unwrap(); copy.run(); - cast_task(TaskType::Sync, 10, Box::new(move || { + cast_js_task(TaskType::Sync, 10, Box::new(move || { copy.get_js_function("echo".to_string()); copy.new_boolean(false); copy.new_u64(index);