Skip to content

Commit

Permalink
v0.0.1 of task-kue
Browse files Browse the repository at this point in the history
  • Loading branch information
rajivm committed Jan 14, 2016
0 parents commit 113bbea
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 0 deletions.
34 changes: 34 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Logs
logs
*.log
npm-debug.log*

# Runtime data
pids
*.pid
*.seed

# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov

# Coverage directory used by tools like istanbul
coverage

# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt

# node-waf configuration
.lock-wscript

# Compiled binary addons (http://nodejs.org/api/addons.html)
build/Release

# Dependency directory
# https://docs.npmjs.com/misc/faq#should-i-check-my-node-modules-folder-into-git
node_modules

# Optional npm cache directory
.npm

# Optional REPL history
.node_repl_history
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = require('./lib/task-kue');
5 changes: 5 additions & 0 deletions lib/task-kue/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
exports = module.exports

exports.KueMultiWorker = require('./kue-multi-worker')
exports.TaskWorker = require('./task-worker')
exports.Task = require('./task')
64 changes: 64 additions & 0 deletions lib/task-kue/kue-multi-worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"use strict";

var util = require('util');

module.exports = class KueMultiWorker {
constructor(queue, processType) {
if (!queue) {
queue = require('kue').createQueue();
}
if (!processType) {
processType = "default";
}
this._queue = queue;
this._processType = processType;
this._handlers = {};
this._errorCallback = function(e) {
util.log(e);
};
}

process(concurrency) {
if (!concurrency) {
concurrency = 1;
}
var self = this;
this._queue.process(this._processType, concurrency, function(job, done) {
self._processJob(job, done);
});
}

register(type, handler) {
this._handlers[type] = handler;
}

_processJob(job, done) {
var jobType = job.data.type;
if (!jobType) {
return this._callErrorCallback(new Error(util.format("Job is missing data.type")), job, done);
}
var handler = this._handlers[job.data.type];
if (!handler) {
return this._callErrorCallback(new Error(util.format("No handler for job type: %s", jobType)), job, done);
}
util.log("Running handler for jobType: " + jobType);
try {
handler(job, done);
} catch (e) {
return this._callErrorCallback(e, job, done);
}
}

onError(callback) {
this._errorCallback = callback;
}

_callErrorCallback(e, job, done) {
if (this._errorCallback) {
this._errorCallback(e, job);
}
if (done) {
done(e);
}
}
}
27 changes: 27 additions & 0 deletions lib/task-kue/task-worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"use strict";

var util = require('util');
var KueMultiWorker = require('./kue-multi-worker')
var Task = require("./task");

module.exports = class TaskWorker extends KueMultiWorker {
registerTask(taskCls) {
var self = this;
super.register(taskCls._jobType(), function(job, done) {
self._runTask(taskCls, job, done);
});
}

register(type, handler) {
if (type.prototype instanceof Task) {
this.registerTask(type);
}
super.register(type, handler);
}

_runTask(taskCls, job, done) {
var task = taskCls._deserialize(job.data);
done(task.run());
}
}

38 changes: 38 additions & 0 deletions lib/task-kue/task.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"use strict";

var util = require('util');

module.exports = class Task {
run() {
throw new Error("Task.run should be overridden.")
}

queue(queue, processType) {
if (!queue) {
queue = require('kue').createQueue();
}
if (!processType) {
processType = "default";
}
return queue.create(processType, this._serialize());
}

_serialize() {
return {
task: this,
type: this.constructor._jobType()
};
}

static _deserialize(data) {
var task = new this();
Object.keys(data.task).forEach(function(key) {
task[key] = data.task[key];
});
return task;
}

static _jobType() {
return util.format("Task.%s", this.prototype.constructor.name);
}
}
15 changes: 15 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name": "task-kue",
"version": "0.0.1",
"description": "Task based Queue that runs on Kue",
"main": "index.js",
"dependencies": {
"kue": "0.10.4"
},
"devDependencies": {},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "Rajiv Makhijani",
"license": "BSD-2-Clause"
}

0 comments on commit 113bbea

Please sign in to comment.