Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#23 plaggable io #30

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions lib/io/socket-io.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
(function() {
var SocketIOModule, debug, events, http, url,
extend = function(child, parent) { for (var key in parent) { if (hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; },
hasProp = {}.hasOwnProperty;

http = require('http');

events = require('eventemitter2');

url = require('url');

debug = require('debug')('linda:io:socket-io');

module.exports = SocketIOModule = (function(superClass) {
extend(SocketIOModule, superClass);

function SocketIOModule() {}

SocketIOModule.prototype.attach = function(linda) {
this.linda = linda;
this.tuplespace = this.linda.tuplespace.bind(this.linda);
return this;
};

SocketIOModule.prototype.listen = function(opts) {
if (opts == null) {
opts = {
io: null,
server: null
};
}
if (opts.io == null) {
throw new Error('"io" must be instance of Socket.IO');
}
if (!(opts.server instanceof http.Server)) {
throw new Error('"server" must be instance of http.Server');
}
this.io = opts.io;
this.server = opts.server;
this.oldListeners = this.server.listeners('request').splice(0);
this.server.removeAllListeners('request');
this.server.on('request', (function(_this) {
return function(req, res) {
var _url, i, len, listener, ref, results;
_url = url.parse(decodeURI(req.url), true);
if (_url.pathname === "/linda/linda.js") {
debug("GET\t" + _url.pathname);
res.setHeader('Content-Type', 'application/javascript');
res.writeHead(200);
res.end(_this.client_js_code);
return;
}
ref = _this.oldListeners;
results = [];
for (i = 0, len = ref.length; i < len; i++) {
listener = ref[i];
results.push(listener.call(_this.server, req, res));
}
return results;
};
})(this));
this.io.sockets.on('connection', (function(_this) {
return function(socket) {
var cids, info, ref, watch_cids;
cids = {};
info = {
from: socket.handshake.headers['x-forwarded-for'] || ((ref = socket.handshake.address) != null ? ref.address : void 0)
};
watch_cids = {};
socket.on('__linda_write', function(data) {
var ref1;
if ((ref1 = data.options) != null) {
ref1.from = info.from;
}
_this.tuplespace(data.tuplespace).write(data.tuple, data.options);
debug("write\t" + (JSON.stringify(data)) + " from " + info.from);
return _this.linda.emit('write', data);
});
socket.on('__linda_take', function(data) {
var cid;
cid = _this.tuplespace(data.tuplespace).option(data.options).take(data.tuple, function(err, tuple) {
cid = null;
return socket.emit("__linda_take_" + data.id, err, tuple);
});
cids[data.id] = cid;
debug("take\t" + (JSON.stringify(data)) + " from " + info.from);
_this.linda.emit('take', data);
return socket.once('disconnect', function() {
if (cid) {
return _this.tuplespace(data.tuplespace).cancel(cid);
}
});
});
socket.on('__linda_read', function(data) {
var cid;
cid = _this.tuplespace(data.tuplespace).option(data.options).read(data.tuple, function(err, tuple) {
cid = null;
return socket.emit("__linda_read_" + data.id, err, tuple);
});
cids[data.id] = cid;
debug("read\t" + (JSON.stringify(data)) + " from " + info.from);
_this.linda.emit('read', data);
return socket.once('disconnect', function() {
if (cid) {
return _this.tuplespace(data.tuplespace).cancel(cid);
}
});
});
watch_cids = {};
socket.on('__linda_watch', function(data) {
var cid;
debug("watch\t" + (JSON.stringify(data)) + " from " + info.from);
_this.linda.emit('watch', data);
if (watch_cids[data.id]) {
return;
}
watch_cids[data.id] = true;
cid = _this.tuplespace(data.tuplespace).watch(data.tuple, function(err, tuple) {
return socket.emit("__linda_watch_" + data.id, err, tuple);
});
cids[data.id] = cid;
return socket.once('disconnect', function() {
if (cid) {
return _this.tuplespace(data.tuplespace).cancel(cid);
}
});
});
return socket.on('__linda_cancel', function(data) {
debug("cancel\t" + (JSON.stringify(data)) + " from " + info.from);
_this.linda.emit('cancel', data);
_this.tuplespace(data.tuplespace).cancel(cids[data.id]);
return watch_cids[data.id] = false;
});
};
})(this));
return this;
};

return SocketIOModule;

})(events.EventEmitter2);

}).call(this);
24 changes: 12 additions & 12 deletions lib/linda-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
})();

TupleSpace = (function() {
function TupleSpace(linda, name) {
function TupleSpace(linda, name1) {
this.linda = linda;
this.name = name;
this.name = name1;
this.watch_callback_ids = {};
this.io_callbacks = [];
this.linda.io.on('disconnect', (function(_this) {
Expand All @@ -45,10 +45,10 @@
};

TupleSpace.prototype.remove_io_callbacks = function() {
var c, _i, _len, _ref;
_ref = this.io_callbacks;
for (_i = 0, _len = _ref.length; _i < _len; _i++) {
c = _ref[_i];
var c, j, len, ref;
ref = this.io_callbacks;
for (j = 0, len = ref.length; j < len; j++) {
c = ref[j];
this.linda.io.removeListener(c.name, c.listener);
}
return this.io_callbacks = [];
Expand Down Expand Up @@ -109,18 +109,18 @@
}
return setTimeout((function(_this) {
return function() {
var c, i, _i, _ref, _results;
_results = [];
for (i = _i = _ref = _this.io_callbacks.length - 1; _ref <= 0 ? _i <= 0 : _i >= 0; i = _ref <= 0 ? ++_i : --_i) {
var c, i, j, ref, results;
results = [];
for (i = j = ref = _this.io_callbacks.length - 1; ref <= 0 ? j <= 0 : j >= 0; i = ref <= 0 ? ++j : --j) {
c = _this.io_callbacks[i];
if (c.name.match(new RegExp("_" + id + "$"))) {
_this.linda.io.removeListener(c.name, c.listener);
_results.push(_this.io_callbacks.splice(i, 1));
results.push(_this.io_callbacks.splice(i, 1));
} else {
_results.push(void 0);
results.push(void 0);
}
}
return _results;
return results;
};
})(this), 100);
};
Expand Down
140 changes: 21 additions & 119 deletions lib/linda.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(function() {
var Client, Linda, Tuple, TupleSpace, debug, events, fs, http, path, request, socketio, url,
__hasProp = {}.hasOwnProperty,
__extends = function(child, parent) { for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; };
var Client, Linda, SocketIOModule, Tuple, TupleSpace, debug, events, fs, http, path, request, socketio, url,
extend = function(child, parent) { for (var key in parent) { if (hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; },
hasProp = {}.hasOwnProperty;

http = require('http');

Expand All @@ -25,14 +25,16 @@

Client = require(path.join(__dirname, 'linda-client'));

SocketIOModule = require(path.join(__dirname, 'io/socket-io'));

module.exports.TupleSpace = TupleSpace;

module.exports.Tuple = Tuple;

module.exports.Client = Client;

Linda = (function(_super) {
__extends(Linda, _super);
Linda = (function(superClass) {
extend(Linda, superClass);

function Linda() {
this.spaces = {};
Expand All @@ -46,11 +48,11 @@
})(this));
setInterval((function(_this) {
return function() {
var name, space, _ref;
var name, ref, space;
debug("TupleSpace\tcheck expire");
_ref = _this.spaces;
for (name in _ref) {
space = _ref[name];
ref = _this.spaces;
for (name in ref) {
space = ref[name];
if (space != null) {
space.check_expire();
}
Expand All @@ -64,119 +66,19 @@
return this.spaces[name] || (this.spaces[name] = new TupleSpace(name));
};

Linda.prototype.listen = function(opts) {
if (opts == null) {
opts = {
io: null,
server: null
};
}
if (opts.io == null) {
throw new Error('"io" must be instance of Socket.IO');
}
if (!(opts.server instanceof http.Server)) {
throw new Error('"server" must be instance of http.Server');
}
this.io = opts.io;
this.server = opts.server;
this.oldListeners = this.server.listeners('request').splice(0);
this.server.removeAllListeners('request');
this.server.on('request', (function(_this) {
return function(req, res) {
var listener, _i, _len, _ref, _results, _url;
_url = url.parse(decodeURI(req.url), true);
if (_url.pathname === "/linda/linda.js") {
debug("GET\t" + _url.pathname);
res.setHeader('Content-Type', 'application/javascript');
res.writeHead(200);
res.end(_this.client_js_code);
return;
}
_ref = _this.oldListeners;
_results = [];
for (_i = 0, _len = _ref.length; _i < _len; _i++) {
listener = _ref[_i];
_results.push(listener.call(_this.server, req, res));
}
return _results;
};
})(this));
this.io.sockets.on('connection', (function(_this) {
return function(socket) {
var cids, info, watch_cids, _ref;
cids = {};
info = {
from: socket.handshake.headers['x-forwarded-for'] || ((_ref = socket.handshake.address) != null ? _ref.address : void 0)
};
socket.on('__linda_write', function(data) {
var _ref1;
if ((_ref1 = data.options) != null) {
_ref1.from = info.from;
}
_this.tuplespace(data.tuplespace).write(data.tuple, data.options);
debug("write\t" + (JSON.stringify(data)) + " from " + info.from);
return _this.emit('write', data);
});
socket.on('__linda_take', function(data) {
var cid;
cid = _this.tuplespace(data.tuplespace).option(data.options).take(data.tuple, function(err, tuple) {
cid = null;
return socket.emit("__linda_take_" + data.id, err, tuple);
});
cids[data.id] = cid;
debug("take\t" + (JSON.stringify(data)) + " from " + info.from);
_this.emit('take', data);
return socket.once('disconnect', function() {
if (cid) {
return _this.tuplespace(data.tuplespace).cancel(cid);
}
});
});
socket.on('__linda_read', function(data) {
var cid;
cid = _this.tuplespace(data.tuplespace).option(data.options).read(data.tuple, function(err, tuple) {
cid = null;
return socket.emit("__linda_read_" + data.id, err, tuple);
});
cids[data.id] = cid;
debug("read\t" + (JSON.stringify(data)) + " from " + info.from);
_this.emit('read', data);
return socket.once('disconnect', function() {
if (cid) {
return _this.tuplespace(data.tuplespace).cancel(cid);
}
});
});
watch_cids = {};
socket.on('__linda_watch', function(data) {
var cid;
debug("watch\t" + (JSON.stringify(data)) + " from " + info.from);
_this.emit('watch', data);
if (watch_cids[data.id]) {
return;
}
watch_cids[data.id] = true;
cid = _this.tuplespace(data.tuplespace).watch(data.tuple, function(err, tuple) {
return socket.emit("__linda_watch_" + data.id, err, tuple);
});
cids[data.id] = cid;
return socket.once('disconnect', function() {
if (cid) {
return _this.tuplespace(data.tuplespace).cancel(cid);
}
});
});
return socket.on('__linda_cancel', function(data) {
debug("cancel\t" + (JSON.stringify(data)) + " from " + info.from);
_this.emit('cancel', data);
_this.tuplespace(data.tuplespace).cancel(cids[data.id]);
return watch_cids[data.id] = false;
});
};
})(this));
Linda.prototype.attach = function(io) {
this.io = io;
this.io.attach(this);
return this;
};

Linda.prototype.listen = function() {
if (this.io == null) {
this.attach(new SocketIOModule());
}
return this.io.listen(arguments[0]);
};

return Linda;

})(events.EventEmitter2);
Expand Down
Loading