Skip to content

Commit

Permalink
优化了任务和工作者
Browse files Browse the repository at this point in the history
  • Loading branch information
zuon committed Apr 8, 2018
1 parent b43d652 commit 616496c
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 264 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ authors = ["zuon <[email protected]>"]
time = "*"
libc = "*"
rand = "*"
threadpool = "*"
threadpool = "*"
lazy_static = "*"
34 changes: 31 additions & 3 deletions src/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use libc::{c_void, c_char, int8_t, uint8_t, c_int, uint32_t, uint64_t, c_double, memcpy};
use std::ffi::{CStr, CString};
use std::slice::from_raw_parts;
use std::ffi::{CStr, CString};
use std::sync::{Arc, Mutex};
use std::os::raw::c_uchar;
use std::mem::transmute;
use std::ops::Drop;
use std::ptr::null;

use bonmgr::BonMgr;
use data_view_impl::*;

#[link(name = "njsc")]
Expand Down Expand Up @@ -83,9 +85,35 @@ pub fn register_data_view() {
}
}

lazy_static! {
static ref BON_MGR: Arc<Mutex<BonMgr>> = Arc::new(Mutex::new(BonMgr::new()));
}

//调用NativeObject函数
extern "C" fn native_object_function_call(hash: uint32_t, args_size: uint32_t, args: *const c_void) -> *const c_void {
null()
#[no_mangle]
pub extern "C" fn native_object_function_call(
vm: *const c_void,
hash: uint32_t,
args_size: uint32_t,
args: *const c_void) -> *const c_void {
if args_size == 0 {
return null();
}

let reply: Option<JSType>;
let mut js = JS {vm: vm as usize};
//同步块
{
let mut refer = BON_MGR.clone();
reply = (&mut *refer.
lock().
unwrap()).
call(&mut js, hash, Vec::new()).ok();
}
match reply {
Some(val) => val.value as *const c_void,
None => null(),
}
}

//执行njsc测试代码
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ extern crate libc;
extern crate rand;
extern crate threadpool;

#[macro_use]
extern crate lazy_static;

pub mod data_view_impl;
pub mod adapter;
pub mod util;
Expand Down
87 changes: 15 additions & 72 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use std::mem::transmute;
use std::collections::VecDeque;
use std::fmt::{Display, Formatter, Result};

use adapter::JSType;

/*
* 任务类型
*/
Expand All @@ -20,64 +18,33 @@ pub enum TaskType {
* 任务结构
*/
pub struct Task {
uid: u32, //任务唯一id
task_type: TaskType, //任务类型
priority: u32, //任务优先级
func: (usize, usize), //任务函数
args: Vec<JSType>, //任务参数
start_time: i64, //任务开始时间
finish_time: i64 //任务完成时间
info: &'static str, //任务信息
}

unsafe impl Sync for Task {} //声明保证多线程安全性

impl Display for Task {
fn fmt(&self, f: &mut Formatter) -> Result {
write!(f, "Task[uid = {}, type = {:?}, priority = {}, start_time = {}, finish_time = {}]",
self.uid, self.task_type, self.priority, self.start_time, self.finish_time)
write!(f, "Task[priority = {}, func = {:?}, info = {}]", self.priority, self.func, self.info)
}
}

impl Task {
pub fn new() -> Self {
Task {
uid: 0,
task_type: TaskType::Empty,
priority: 0,
func: (0, 0),
args: Vec::new(),
start_time: 0,
finish_time: 0,
priority: 0,
func: (0, 0),
info: "",
}
}

pub fn copy_to(mut self, dest: &mut Self) -> Self {
pub fn copy_to(&self, dest: &mut Self) {
//复制其它成员
dest.uid = *&self.uid;
dest.task_type = *&self.task_type;
dest.priority = *&self.priority;
dest.func = *&self.func;
//移动源参数
for index in 0..self.args.len() {
dest.args[index] = self.args.remove(index);
}
self
}

pub fn get_uid(&self) -> u32 {
self.uid
}

pub fn set_uid(&mut self, uid: u32) {
self.uid = uid;
}

pub fn get_type(&self) -> TaskType {
self.task_type
}

pub fn set_type(&mut self, task_type: TaskType) {
self.task_type = task_type;
dest.priority = self.priority;
dest.func = self.func;
dest.info = self.info;
}

pub fn get_priority(&self) -> u32 {
Expand All @@ -98,43 +65,19 @@ impl Task {
None => (),
}
}

pub fn get_args(&self) -> Vec<JSType> {
self.args.to_vec()
}

pub fn add_args(&mut self, arg: JSType) {
self.args.push(arg);
}

pub fn set_args(&mut self, args: Vec<JSType>) {
self.args = args;
pub fn get_info(&self) -> &str {
self.info
}

pub fn get_start_time(&self) -> i64 {
self.start_time
}

pub fn set_start_time(&mut self, start_time: i64) {
self.start_time = start_time;
}

pub fn get_finish_time(&self) -> i64 {
self.finish_time
}

pub fn set_finish_time(&mut self, finish_time: i64) {
self.finish_time = finish_time;

pub fn set_info(&mut self, info: &'static str) {
self.info = info;
}

pub fn reset(&mut self) {
self.uid = 0;
self.task_type = TaskType::Empty;
self.priority = 0;
self.func = (0, 0);
self.args.clear();
self.start_time = 0;
self.finish_time = 0;
self.info = "";
}

pub fn run(&self) {
Expand Down
45 changes: 12 additions & 33 deletions src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use std::collections::{HashMap, VecDeque};
use std::fmt::{Display, Formatter, Result};

use task::{TaskType, Task, TaskCache};
use util::now_microsecond;
use adapter::JSType;

/*
* 同步任务池
Expand Down Expand Up @@ -58,11 +56,9 @@ impl SyncPool {
if w < 0 {
self.weight -= priority; //减少同步任务池权重
match queue.pop_front() {
Some(mut t) => {
Some(t) => {
//填充任务
t = t.copy_to(task);
task.set_start_time(t.get_start_time());
task.set_finish_time(now_microsecond()); //设置任务完成时间
t.copy_to(task);
reply = Some(t);
},
None => (),
Expand All @@ -76,11 +72,9 @@ impl SyncPool {
//从同步延迟任务队列中弹出任务
fn delay_pop(&mut self, task: &mut Task) -> Option<Task> {
match self.delay_queue.pop_front() {
Some(mut t) => {
Some(t) => {
//填充任务
t = t.copy_to(task);
task.set_start_time(t.get_start_time());
task.set_finish_time(now_microsecond()); //设置任务完成时间
t.copy_to(task);
Some(t)
},
None => Option::None,
Expand Down Expand Up @@ -187,11 +181,9 @@ impl AsyncPool<Task> {
if index > -1 {
self.weight -= priority as u64; //减少异步任务池权重
match self.queue.remove(index as usize) {
Some(mut t) => {
Some(t) => {
//填充任务
t = t.copy_to(task);
task.set_start_time(t.get_start_time());
task.set_finish_time(now_microsecond()); //设置任务完成时间
t.copy_to(task);
reply = Some(t);
},
None => (),
Expand All @@ -203,11 +195,9 @@ impl AsyncPool<Task> {
//从异步延迟任务队列中弹出任务
fn delay_pop(&mut self, task: &mut Task) -> Option<Task> {
match self.delay_queue.pop_front() {
Some(mut t) => {
Some(t) => {
//填充任务
t = t.copy_to(task);
task.set_start_time(t.get_start_time());
task.set_finish_time(now_microsecond()); //设置任务完成时间
t.copy_to(task);
Some(t)
},
None => Option::None,
Expand Down Expand Up @@ -247,7 +237,6 @@ impl AsyncPool<Task> {
* 任务池
*/
pub struct TaskPool {
counter: u32, //任务编号计数器
task_cache: TaskCache, //任务缓存
sync_pool: SyncPool, //同步任务池
async_pool: AsyncPool<Task>, //异步任务池
Expand All @@ -257,16 +246,15 @@ unsafe impl Sync for TaskPool {}

impl Display for TaskPool {
fn fmt(&self, f: &mut Formatter) -> Result {
write!(f, "TaskPool[counter = {}, cache_size = {}, sync_pool = {}, async_pool = {}]",
self.counter, self.task_cache.size(), self.sync_pool, self.async_pool)
write!(f, "TaskPool[cache_size = {}, sync_pool = {}, async_pool = {}]",
self.task_cache.size(), self.sync_pool, self.async_pool)
}
}

impl TaskPool {
//构建一个任务池
pub fn new(len: u32) -> Self {
TaskPool {
counter: 0,
task_cache: TaskCache::new(len),
sync_pool: SyncPool::new(),
async_pool: AsyncPool::new(),
Expand Down Expand Up @@ -325,14 +313,11 @@ impl TaskPool {
}

//向任务池加入一个任务
pub fn push(&mut self, task_type: TaskType, priority: u32, func: Box<FnBox()>, args: Vec<JSType>) {
pub fn push(&mut self, task_type: TaskType, priority: u32, func: Box<FnBox()>, info: &'static str) {
let mut task: Task = self.task_cache.pop();
task.set_uid(self.new_uid());
task.set_type(task_type);
task.set_priority(priority);
task.set_func(Some(func));
task.set_args(args);
task.set_start_time(now_microsecond()); //设置任务开始时间
task.set_info(info);
if priority > 0 {
match task_type {
TaskType::Async => {
Expand Down Expand Up @@ -380,12 +365,6 @@ impl TaskPool {
self.sync_pool.clear();
}

//获取新的任务编号
fn new_uid(&mut self) -> u32 {
self.counter += 1;
self.counter
}

//释放指定任务
fn free(&mut self, task: Option<Task>) {
match task {
Expand Down
Loading

0 comments on commit 616496c

Please sign in to comment.