Skip to content

Commit

Permalink
refactor to work with standard versioned events
Browse files Browse the repository at this point in the history
  • Loading branch information
pemrouz committed Feb 28, 2016
1 parent cefca1d commit 678aab4
Show file tree
Hide file tree
Showing 5 changed files with 710 additions and 544 deletions.
264 changes: 106 additions & 158 deletions dist/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,16 @@
Object.defineProperty(exports, "__esModule", {
value: true
});

/* istanbul ignore next */
var _slicedToArray = function () { function sliceIterator(arr, i) { var _arr = []; var _n = true; var _d = false; var _e = undefined; try { for (var _i = arr[Symbol.iterator](), _s; !(_n = (_s = _i.next()).done); _n = true) { _arr.push(_s.value); if (i && _arr.length === i) break; } } catch (err) { _d = true; _e = err; } finally { try { if (!_n && _i["return"]) _i["return"](); } finally { if (_d) throw _e; } } return _arr; } return function (arr, i) { if (Array.isArray(arr)) { return arr; } else if (Symbol.iterator in Object(arr)) { return sliceIterator(arr, i); } else { throw new TypeError("Invalid attempt to destructure non-iterable instance"); } }; }();

exports.default = sync;

var _identity = require('utilise/identity');

var _identity2 = _interopRequireDefault(_identity);

var _replace = require('utilise/replace');

var _replace2 = _interopRequireDefault(_replace);

var _prepend = require('utilise/prepend');

var _prepend2 = _interopRequireDefault(_prepend);

var _flatten = require('utilise/flatten');

var _flatten2 = _interopRequireDefault(_flatten);

var _values = require('utilise/values');

var _values2 = _interopRequireDefault(_values);
Expand All @@ -31,27 +23,25 @@ var _header2 = _interopRequireDefault(_header);

var _client = require('utilise/client');

/* istanbul ignore next */
var _client2 = _interopRequireDefault(_client);

var _noop = require('utilise/noop');

/* istanbul ignore next */
var _noop2 = _interopRequireDefault(_noop);

var _keys = require('utilise/keys');

var _keys2 = _interopRequireDefault(_keys);

var _key = require('utilise/key');

var _key2 = _interopRequireDefault(_key);

var _str = require('utilise/str');

var _str2 = _interopRequireDefault(_str);

var _not = require('utilise/not');
var _set = require('utilise/set');

var _not2 = _interopRequireDefault(_not);
var _set2 = _interopRequireDefault(_set);

var _key = require('utilise/key');

var _key2 = _interopRequireDefault(_key);

var _by = require('utilise/by');

Expand All @@ -72,178 +62,136 @@ function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { de
function sync(ripple, server) {
log('creating');

/* istanbul ignore next */
if (!_client2.default && !server) return;
(0, _values2.default)(ripple.types).map(headers(ripple));
ripple.sync = emit(ripple);
/* istanbul ignore next */
if (!_client2.default) (0, _values2.default)(ripple.types).map(headers(ripple));
ripple.stream = stream(ripple);
ripple.io = io(server);
ripple.on('change', function (res) {
return emit(ripple)()(res.name);
});
ripple.io.on('change', silent(ripple));
ripple.on('change', ripple.stream()); // both - broadcast change to everyone
ripple.io.on('change', consume(ripple)); // client - receive change
ripple.io.on('connection', function (s) {
return s.on('change', change(ripple));
});
return s.on('change', consume(ripple));
}); // server - receive change
ripple.io.on('connection', function (s) {
return emit(ripple)(s)();
});
return ripple.stream(s)();
}); // server - send all resources to new client
/* istanbul ignore next */
ripple.io.use(setIP);
return ripple;
}

function change(ripple) {
return function (req) {
log('receiving', req.name);
// send diff to all or some sockets
var stream = function stream(ripple) {
return function (sockets) {
return function (name, change) {
if (!name) return (0, _values2.default)(ripple.resources).map(function (d) {
return stream(ripple)(sockets)(d.name);
});

var socket = this,
res = ripple.resources[req.name],
check = type(ripple)(req).from || _identity2.default;

if (!res) return log('no resource', req.name);
if (!check.call(this, req)) return debug('type skip', req.name);
if (!_is2.default.obj(res.body)) return silent(ripple)(req);

var to = (0, _header2.default)('proxy-to')(res) || _identity2.default,
from = (0, _header2.default)('proxy-from')(res),
body = to.call(socket, (0, _key2.default)('body')(res)),
deltas = (0, _jsondiffpatch.diff)(body, req.body);

if (_is2.default.arr(deltas)) return delta('') && res.body.emit('change');

(0, _keys2.default)(deltas).reverse().filter((0, _not2.default)((0, _is2.default)('_t'))).map(paths(deltas)).reduce(_flatten2.default, []).map(delta).some(Boolean) && res.body.emit('change');

function delta(k) {
var d = (0, _key2.default)(k)(deltas),
name = req.name
// , body = res.body
,
index = k.replace(/(^|\.)_/g, '$1'),
type = d.length == 1 ? 'push' : d.length == 2 ? 'update' : d[2] === 0 ? 'remove' : '',
value = type == 'update' ? d[1] : d[0],
next = types[type];

if (!type) return false;
if (!from || from.call(socket, value, body, index, type, name, next)) {
!index ? silent(ripple)(req) : next(index, value, body, name, res);
return true;
}
}
/* istanbul ignore next */
var everyone = _client2.default ? [ripple.io] : ripple.io.of('/').sockets,
res = ripple.resources[name],
send = to(ripple, change, res),
log = count(everyone.length, name);

return (0, _header2.default)('silent', true)(res) ? delete res.headers.silent : _is2.default.str(sockets) ? (log(everyone.filter((0, _by2.default)('sessionID', sockets)).map(send)), ripple) : !sockets ? (log(everyone.map(send)), ripple) : (log(send(sockets)), ripple);
};
};
}
};

// outgoing transforms
// TODO: in fn body = is.fn(res.body) ? str(res.body) : res.body
var to = function to(ripple, change, res) {
return function (socket) {
var xres = (0, _header2.default)('to')(res),
xtype = type(ripple)(res).to;

function paths(base) {
return function (k) {
var d = (0, _key2.default)(k)(base);
k = _is2.default.arr(k) ? k : [k];
var body = xres ? xres.call(socket, res, change) : res.body;
if (!body) return false;

return _is2.default.arr(d) ? k.join('.') : (0, _keys2.default)(d).map((0, _prepend2.default)(k.join('.') + '.')).map(paths(base));
var rep = xtype ? xtype.call(socket, { name: res.name, body: body, headers: res.headers }, change) : { name: res.name, body: body, headers: res.headers };
if (!rep) return false;

return socket.emit('change', change && (!xres || body === true) ? [res.name, change] : [res.name, false, rep]), true;
};
}
};

function push(k, value, body, name) {
var path = k.split('.'),
tail = path.pop(),
o = (0, _key2.default)(path.join('.'))(body) || body;
// incoming transforms
// if (!res) return log('no resource', name) // TODO skip adding new resources on server
// if (!is.obj(res.body)) return silent(ripple)(req)
var consume = function consume(ripple) {
return function (_ref) {
/* istanbul ignore next */
var _ref2 = _slicedToArray(_ref, 3);

_is2.default.arr(o) ? o.splice(tail, 0, value) : (0, _key2.default)(k, value)(body);
}
var name = _ref2[0];
var change = _ref2[1];
var req = _ref2[2];

function remove(k, value, body, name) {
var path = k.split('.'),
tail = path.pop(),
o = (0, _key2.default)(path.join('.'))(body) || body;
log('receiving', name);

_is2.default.arr(o) ? o.splice(tail, 1) : delete o[tail];
}
var socket = this,
res = ripple.resources[name],
xtype = type(ripple)(res).from || type(ripple)(req).from,
xres = (0, _header2.default)('from')(res),
types = ripple.types,
next = (0, _set2.default)(change);

return !res && !types[(0, _header2.default)('content-type')(req)] ? debug('req skip', name) // rejected - corrupted
: xtype && !xtype.call(socket, req, change) ? debug('type skip', name) // rejected - by xtype
: xres && !xres.call(socket, req, change) ? debug('res skip', name) // rejected - by xres
: !change ? ripple(silent(req)) // accept - replace (new)
: !change.key ? ripple(silent({ name: name, body: change.value })) // accept - replace at root
: (silent(res), next(res.body)); // accept - deep change
};
};

function update(k, value, body, name) {
(0, _key2.default)(k, value)(body);
}
var count = function count(total, name) {
return function (tally) {
return log((0, _str2.default)((_is2.default.arr(tally) ? tally : [1]).filter(Boolean).length).green.bold + '/' + (0, _str2.default)(total).green, 'sending', name);
};
};

function headers(ripple) {
var headers = function headers(ripple) {
return function (type) {
/* istanbul ignore next */
var parse = type.parse || _noop2.default;
type.parse = function (res) {
if (_client2.default) return parse.apply(this, arguments), res;
var existing = ripple.resources[res.name],
from = (0, _header2.default)('proxy-from')(existing),
to = (0, _header2.default)('proxy-to')(existing);

res.headers['proxy-from'] = (0, _header2.default)('proxy-from')(res) || (0, _header2.default)('from')(res) || from;
res.headers['proxy-to'] = (0, _header2.default)('proxy-to')(res) || (0, _header2.default)('to')(res) || to;
from = (0, _header2.default)('from')(res) || (0, _header2.default)('from')(existing),
to = (0, _header2.default)('to')(res) || (0, _header2.default)('to')(existing);
if (from) res.headers.from = from;
if (to) res.headers.to = to;
return parse.apply(this, arguments), res;
};
};
}
};

function silent(ripple) {
return function (res) {
return res.headers.silent = true, ripple(res);
};
}

function io(opts) {
var io = function io(opts) {
/* istanbul ignore next */
var r = !_client2.default ? require('socket.io')(opts.server || opts) : window.io ? window.io() : _is2.default.fn(require('socket.io-client')) ? require('socket.io-client')() : { on: _noop2.default, emit: _noop2.default };
/* istanbul ignore next */
r.use = r.use || _noop2.default;
return r;
}

// emit all or some resources, to all or some clients
function emit(ripple) {
return function (socket) {
return function (name) {
if (arguments.length && !name) return;
if (!name) return (0, _values2.default)(ripple.resources).map((0, _key2.default)('name')).map(emit(ripple)(socket)), ripple;
};

var res = ripple.resources[name],
sockets = _client2.default ? [ripple.io] : ripple.io.of('/').sockets,
lgt = stats(sockets.length, name),
silent = (0, _header2.default)('silent', true)(res);

return silent ? delete res.headers.silent : !res ? log('no resource to emit: ', name) : _is2.default.str(socket) ? lgt(sockets.filter((0, _by2.default)('sessionID', socket)).map(to(ripple)(res))) : !socket ? lgt(sockets.map(to(ripple)(res))) : lgt([to(ripple)(res)(socket)]);
};
};
}

function to(ripple) {
return function (res) {
return function (socket) {
var body = _is2.default.fn(res.body) ? '' + res.body : res.body,
rep,
fn = {
type: type(ripple)(res).to || _identity2.default,
res: res.headers['proxy-to'] || _identity2.default
};

body = fn.res.call(socket, body);
if (!body) return false;

rep = fn.type.call(socket, { name: res.name, body: body, headers: res.headers });
if (!rep) return false;

socket.emit('change', rep);
return true;
};
};
}

function stats(total, name) {
return function (results) {
log((0, _str2.default)(results.filter(Boolean).length).green.bold + '/' + (0, _str2.default)(total).green, 'sending', name);
};
}

function setIP(socket, next) {
/* istanbul ignore next */
var setIP = function setIP(socket, next) {
socket.ip = socket.request.headers['x-forwarded-for'] || socket.request.connection.remoteAddress;
next();
}
};

function type(ripple) {
var silent = function silent(res) {
return (0, _key2.default)('headers.silent', true)(res);
};

var type = function type(ripple) {
return function (res) {
return ripple.types[(0, _header2.default)('content-type')(res)];
return ripple.types[(0, _header2.default)('content-type')(res)] || {};
};
}
};

var log = require('utilise/log')('[ri/sync]'),
err = require('utilise/err')('[ri/sync]'),
debug = _noop2.default,
types = { push: push, remove: remove, update: update };
debug = log;
14 changes: 9 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
"url": "git://github.com/rijs/sync.git"
},
"scripts": {
"ignore": "find ./dist -type f -exec sed -i \"s/function _interopRequire/\\/* istanbul ignore next *\\/\\nfunction _interopRequire/g\" {} ';'",
"ignore": "find ./dist -type f -exec sed -i -E \"s/(function _interopRequire|^.*?_client2|^.*?_noop2|^.*?_slicedToArray|^.*?setIP)/\\/* istanbul ignore next *\\/\\n\\1/g\" {} ';'",
"babel": "babel src -d dist",
"clean": "rm -rf dist && mkdir dist",
"build": "npm run clean && npm run babel && npm run ignore",
"test": "popper",
"test": "istanbul test ./node_modules/mocha/bin/_mocha --report html -- -R spec",
"coverage": "istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- -R spec && rm -rf ./coverage",
"cov": "istanbul cover ./node_modules/mocha/bin/_mocha -- -R spec",
"test-popper": "popper",
"version": "npm run build && git add -A",
"postversion": "git push && git push --tags"
},
Expand All @@ -22,19 +25,20 @@
"babel-preset-es2015": "*",
"browserify": "*",
"chai": "*",
"popper": "*",
"coveralls": "*",
"istanbul": "*",
"mocha": "*",
"mocha-lcov-reporter": "*",
"uglify-js": "*",
"mockery": "^1.4.0",
"popper": "*",
"rijs": "*",
"rijs.core": "*",
"rijs.css": "*",
"rijs.data": "*",
"rijs.fn": "*",
"rijs.serve": "*",
"serve-static": "^1.9.2"
"serve-static": "^1.9.2",
"uglify-js": "*"
},
"dependencies": {
"jsondiffpatch": "^0.1.31",
Expand Down
Loading

0 comments on commit 678aab4

Please sign in to comment.