Skip to content

Commit

Permalink
refactor: simplify send/receive
Browse files Browse the repository at this point in the history
  • Loading branch information
pemrouz committed Jul 24, 2016
1 parent 7dd6abe commit ec0dd3e
Show file tree
Hide file tree
Showing 3 changed files with 761 additions and 876 deletions.
234 changes: 122 additions & 112 deletions dist/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@
Object.defineProperty(exports, "__esModule", {
value: true
});

/* istanbul ignore next */
var _slicedToArray = function () { function sliceIterator(arr, i) { var _arr = []; var _n = true; var _d = false; var _e = undefined; try { for (var _i = arr[Symbol.iterator](), _s; !(_n = (_s = _i.next()).done); _n = true) { _arr.push(_s.value); if (i && _arr.length === i) break; } } catch (err) { _d = true; _e = err; } finally { try { if (!_n && _i["return"]) _i["return"](); } finally { if (_d) throw _e; } } return _arr; } return function (arr, i) { if (Array.isArray(arr)) { return arr; } else if (Symbol.iterator in Object(arr)) { return sliceIterator(arr, i); } else { throw new TypeError("Invalid attempt to destructure non-iterable instance"); } }; }();

exports.default = sync;

var _identity = require('utilise/identity');

var _identity2 = _interopRequireDefault(_identity);

var _promise = require('utilise/promise');

var _promise2 = _interopRequireDefault(_promise);

var _values = require('utilise/values');

var _values2 = _interopRequireDefault(_values);

var _extend = require('utilise/extend');

var _extend2 = _interopRequireDefault(_extend);

var _header = require('utilise/header');

var _header2 = _interopRequireDefault(_header);
Expand Down Expand Up @@ -59,6 +63,8 @@ var _is = require('utilise/is');

var _is2 = _interopRequireDefault(_is);

var _to = require('utilise/to');

/* istanbul ignore next */
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }

Expand All @@ -69,138 +75,141 @@ function sync(ripple, server) {
log('creating');

/* istanbul ignore next */
if (!_client2.default && !server) return;
if (!_client2.default && !server) return ripple;
/* istanbul ignore next */
if (!_client2.default) ripple.to = clean(ripple.to), (0, _values2.default)(ripple.types).map(function (type) {
return type.parse = headers(ripple)(type.parse);
});

ripple.stream = stream(ripple);
ripple.respond = respond(ripple);
ripple.io = io(server);
ripple.on('change.stream', ripple.stream()); // both - broadcast change to everyone
ripple.io.on('change', consume(ripple)); // client - receive change
ripple.io.on('response', response(ripple)); // client - receive response
ripple.io.on('connection', function (s) {
return s.on('change', consume(ripple));
}); // server - receive change
ripple.io.on('connection', function (s) {
return ripple.stream(s)();
}); // server - send all resources to new client
ripple.io.use(setIP);
ripple.io.use(ip);
ripple.req = send(ripple)(ripple);
/* istanbul ignore next */
ripple.send = _client2.default ? send(ripple)(ripple.io) : send(ripple);
ripple.on('change.send', broadcast(ripple));
ripple.io.on('change', consume(ripple));
ripple.io.on('connection', connected(ripple));
return ripple;
}

var respond = function respond(ripple) {
return function (socket, name, time) {
return function (reply) {
socket.emit('response', [name, time, reply]);
};
var connected = function connected(ripple) {
return function (socket) {
log('connected'.green, (0, _str2.default)(socket.ip).grey);
socket.on('change', consume(ripple));
ripple.send(socket)();
};
};

var response = function response(ripple) {
return function (_ref) {
var broadcast = function broadcast(ripple) {
return function (name, change) {
/* istanbul ignore next */
var _ref2 = _slicedToArray(_ref, 3);
(_client2.default ? ripple.send : ripple.send())((0, _extend2.default)({ name: name })(change));
};
};

var name = _ref2[0];
var time = _ref2[1];
var reply = _ref2[2];
var normalize = function normalize(ripple) {
var next = arguments.length <= 1 || arguments[1] === undefined ? _identity2.default : arguments[1];
return function (name, type, value) {
var req = _is2.default.obj(name) ? name : { name: name, type: type, value: value },
resource = ripple.resources[req.name];

if (!req.name) return next((0, _values2.default)(ripple.resources).map(normalize(ripple)));

if (!resource) return Promise.resolve([404, err('cannot find ' + req.name)]);

if (!req.type) req = {
name: req.name,
type: 'update',
headers: resource.headers,
value: resource.body,
time: now(resource)
};

ripple.resources[name].body.emit('response._' + time, reply);
if (req.type == 'update' && !req.key) req.headers = resource.headers;

return next(req);
};
};

// send diff to all or some sockets
var stream = function stream(ripple) {
return function (sockets) {
return function (name, change) {
if (!name) return (0, _values2.default)(ripple.resources).map(function (d) {
return stream(ripple)(sockets)(d.name);
});

// send all or some req, to all or some sockets
var send = function send(ripple) {
var l = arguments.length <= 1 || arguments[1] === undefined ? log : arguments[1];
return function (who) {
return normalize(ripple, function (req) {
var count = function count(sent) {
return (0, _str2.default)(sent.length).green.bold + '/' + (0, _str2.default)(everyone.length).green;
},
all = function all(d) {
return req.length && log('send'.grey, count(sockets), 'all'.bold, ('(' + req.length + ')').grey);
},
/* istanbul ignore next */
var everyone = _client2.default ? [ripple.io] : (0, _values2.default)(ripple.io.of('/').sockets),
log = count(everyone.length, name),
res = ripple.resources[name],
send = to(ripple, res, change);
everyone = _client2.default ? [ripple.io] : (0, _values2.default)(ripple.io.of('/').sockets),
sockets = _is2.default.arr(who) ? who : _is2.default.str(who) ? everyone.filter((0, _by2.default)('sessionID', who)) : !who ? everyone : [who],
/* istanbul ignore next */
promises = _is2.default.arr(req) ? (all(), req.map(send(ripple, l = _noop2.default)(sockets))) : sockets.map(function (s) {
return to(ripple, req, s);
}).filter(Boolean);

return !res ? log('no resource', name) : _is2.default.str(sockets) ? (log(everyone.filter((0, _by2.default)('sessionID', sockets)).map(send)), ripple) : !sockets ? (log(everyone.map(send)), ripple) : (log(send(sockets)), ripple);
};
if (promises.length) l('send'.grey, count(promises), req.name);
return Promise.all(promises);
});
};
};

// outgoing transforms
var to = function to(ripple, res, change) {
return function (socket) {
if ((0, _header2.default)('silent', socket)(res)) return delete res.headers.silent, false;

var xres = (0, _header2.default)('to')(res),
xtype = type(ripple)(res).to,
xall = ripple.to,
body,
rep,
out;

body = res.body;
if (xres) {
if (!(out = xres.call(socket, res, change))) return false;
if (out !== true) {
change = false, body = out;
}
}
var to = function to(ripple, req, socket, resource) {
if ((0, _header2.default)('silent', socket)(resource = ripple.resources[req.name])) return delete resource.headers.silent, false;

rep = { name: res.name, body: body, headers: res.headers };
if (xtype) {
if (!(out = xtype.call(socket, rep, change))) return false;
if (out !== true) change = false, rep = out;
}
var nametype = '(' + req.name + ', ' + req.type + ')',
xres = (0, _header2.default)('to')(resource) || _identity2.default,
xtyp = type(ripple)(resource).to || _identity2.default,
xall = ripple.to || _identity2.default,
p = (0, _promise2.default)();

if (xall) {
if (!(out = xall.call(socket, rep, change))) return false;
if (out !== true) change = false, rep = out;
}
req = (0, _extend2.default)({ socket: socket })(req);
if (!(req = xres(req))) return false;
if (!(req = xtyp(req))) return false;
if (!(req = xall(req))) return false;
delete req.socket;

return socket.emit('change', change ? [res.name, change] : [res.name, false, rep]), true;
};
socket == ripple ? consume(ripple)(req, res) : socket.emit('change', req, res);

return p;

function res() {
deb('ack'.grey, nametype, (0, _str2.default)(socket.ip).grey);
p.resolve.call(ripple, (0, _to.arr)(arguments));
}
};

// incoming transforms
var consume = function consume(ripple) {
return function (_ref3, ack) {
return function (req) {
/* istanbul ignore next */
var _ref4 = _slicedToArray(_ref3, 3);

var name = _ref4[0];
var change = _ref4[1];
var _ref4$ = _ref4[2];
var req = _ref4$ === undefined ? {} : _ref4$;

log('receiving', name);

var res = ripple.resources[name],
xall = ripple.from,
xtype = type(ripple)(res).from || type(ripple)(req).from // is latter needed?
,
xres = (0, _header2.default)('from')(res),
next = (0, _set2.default)(change),
silent = silence(this),
respond = ack || ripple.respond(this, name, change.time);

return xall && !xall.call(this, req, change, respond) ? debug('skip all', name) // rejected - by xall
: xtype && !xtype.call(this, req, change, respond) ? debug('skip type', name) // rejected - by xtype
: xres && !xres.call(this, req, change, respond) ? debug('skip res', name) // rejected - by xres
: !change ? ripple(silent(req)) // accept - replace (new)
: !change.key ? ripple(silent({ name: name, body: change.value })) // accept - replace at root
: (silent(res), next(res.body)); // accept - deep change
var res = arguments.length <= 1 || arguments[1] === undefined ? _noop2.default : arguments[1];

var nametype = '(' + req.name + ', ' + req.type + ')',
resource = ripple.resources[req.name],
silent = silence(req.socket = this),
xres = (0, _header2.default)('from')(resource) || _identity2.default,
xtyp = type(ripple)(resource).from || _identity2.default,
xall = ripple.from || _identity2.default;

log('recv'.grey, nametype);
try {
!req.name ? res(404, err('not found'.red, req.name)) : !(req = xall(req, res)) ? deb('skip', 'global', nametype) : !(req = xtyp(req, res)) ? deb('skip', 'type', nametype) : !(req = xres(req, res)) ? deb('skip', 'resource', nametype) : !req.key && req.type == 'update' ? (ripple(silent(body(req))), res(200, deb('ok ' + nametype))) : isStandardVerb(req.type) ? ((0, _set2.default)(req)(silent(resource).body), res(200, deb('ok ' + nametype, _key2.default.grey))) : !isStandardVerb(req.type) ? res(405, err('method not allowed', nametype)) : res(400, err('cannot process', nametype));
} catch (e) {
res(e.status || 500, err(e.message, nametype, '\n', e.stack));
}
};
};

var count = function count(total, name) {
return function (tally) {
return debug((0, _str2.default)((_is2.default.arr(tally) ? tally : [1]).filter(Boolean).length).green.bold + '/' + (0, _str2.default)(total).green, 'sending', name);
};
var body = function body(_ref) {
var name = _ref.name;
var _body = _ref.body;
var value = _ref.value;
var headers = _ref.headers;
return { name: name, headers: headers, body: value };
};

var headers = function headers(ripple) {
Expand All @@ -224,40 +233,41 @@ var io = function io(opts) {
return r;
};

var setIP = function setIP(socket, next) {
var ip = function ip(socket, next) {
socket.ip = socket.request.headers['x-forwarded-for'] || socket.request.connection.remoteAddress;
next();
};

var clean = function clean(next) {
return function (_ref5, change) {
var name = _ref5.name;
var body = _ref5.body;
var headers = _ref5.headers;

if (change) return next ? next.apply(this, arguments) : true;
return function (req, res) {
if (!req.headers || !req.headers.silent) return (next || _identity2.default)(req, res);

var stripped = {};

(0, _keys2.default)(headers).filter((0, _not2.default)((0, _is2.default)('silent'))).map(function (header) {
return stripped[header] = headers[header];
(0, _keys2.default)(req.headers).filter((0, _not2.default)((0, _is2.default)('silent'))).map(function (header) {
return stripped[header] = req.headers[header];
});

return (next || _identity2.default).apply(this, [{ name: name, body: body, headers: stripped }, change]);
req.headers = stripped;
return (next || _identity2.default)(req, res);
};
};

var type = function type(ripple) {
return function (res) {
return ripple.types[(0, _header2.default)('content-type')(res)] || {};
};
},
now = function now(d, t) {
return t = (0, _key2.default)('body.log.length')(d), _is2.default.num(t) ? t - 1 : t;
},
silence = function silence(socket) {
return function (res) {
return (0, _key2.default)('headers.silent', socket)(res);
};
},
isStandardVerb = _is2.default.in(['update', 'add', 'remove']),
log = require('utilise/log')('[ri/sync]'),
err = require('utilise/err')('[ri/sync]'),
/* istanbul ignore next */
debug = _noop2.default;
deb = (!_client2.default && process.env.DEBUG || '').split(',').some((0, _is2.default)('[ri/sync]')) ? log : _identity2.default;
Loading

0 comments on commit ec0dd3e

Please sign in to comment.