Skip to content

Commit

Permalink
Update to inline pipe, slimming down bundled size
Browse files Browse the repository at this point in the history
  • Loading branch information
wooorm committed Jun 11, 2016
1 parent 13e4e30 commit 712474f
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 16 deletions.
81 changes: 67 additions & 14 deletions lib/unified.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

var events = require('events');
var originalPipe = require('stream').Stream.prototype.pipe;
var once = require('once');
var extend = require('extend');
var bail = require('bail');
var vfile = require('vfile');
Expand Down Expand Up @@ -142,20 +142,12 @@ function unified() {
return destination;
}

/*
* Initialise events.
*/

events.init.call(processor);

/*
* Mix in methods.
*/

for (key in emitter) {
if (isFunction(emitter[key])) {
processor[key] = emitter[key];
}
processor[key] = emitter[key];
}

/*
Expand Down Expand Up @@ -550,6 +542,7 @@ function unified() {

if (!err) {
processor.emit('data', file.contents);
processor.emit('end');
} else {
// Don’t enter an infinite error throwing loop.
global.setTimeout(function () {
Expand All @@ -564,20 +557,80 @@ function unified() {
/**
* Pipe the processor into a writable stream.
*
* Aside from setting options, all arguments
* are passed to `Stream#pipe`.
* Basically `Stream#pipe`, but inlined and
* simplified to keep the bundled size down.
*
* @see https://github.com/nodejs/node/blob/master/lib/stream.js#L26
*
* @param {Stream} dest - Writable stream.
* @param {Object?} [options] - Processing
* configuration.
* @return {Stream} - The destination stream.
*/
function pipe(dest, options) {
var onend = once(function () {
dest.end();
});

assertConcrete('pipe');

settings = options;
settings = options || {};

/**
* Handle data.
*
* @param {*} chunk - Data to pass through.
*/
function ondata(chunk) {
if (dest.writable) {
dest.write(chunk);
}
}

/**
* Clean listeners.
*/
function cleanup() {
processor.removeListener('data', ondata);
processor.removeListener('end', onend);
processor.removeListener('error', onerror);
processor.removeListener('end', cleanup);
processor.removeListener('close', cleanup);

dest.removeListener('error', onerror);
dest.removeListener('close', cleanup);
}

/**
* Close dangling pipes and handle unheard errors.
*
* @param {Error} err - Exception.
*/
function onerror(err) {
cleanup();

if (!processor.listenerCount('error')) {
throw err; // Unhandled stream error in pipe.
}
}

processor.on('data', ondata);
processor.on('error', onerror);
processor.on('end', cleanup);
processor.on('close', cleanup);

// If the 'end' option is not supplied, dest.end() will be called when
// the 'end' or 'close' events are received. Only dest.end() once.
if (!dest._isStdio && settings.end !== false) {
processor.on('end', onend);
}

dest.on('error', onerror);
dest.on('close', cleanup);

dest.emit('pipe', processor);

return originalPipe.apply(this, arguments);
return dest;
}

/*
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"dependencies": {
"bail": "^1.0.0",
"extend": "^3.0.0",
"once": "^1.3.3",
"vfile": "^1.0.0"
},
"browser": {
Expand Down
79 changes: 77 additions & 2 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1285,8 +1285,6 @@ test('unified', function (t) {
p
.use(function (processor) {
processor.Parser = SimpleParser;
})
.use(function (processor) {
processor.Compiler = SimpleCompiler;
})
.on('data', function (value) {
Expand All @@ -1309,6 +1307,75 @@ test('unified', function (t) {

t.test('pipe(destination[, options])', function (st) {
var p;
var s;

p = unified().use(function (processor) {
processor.Parser = SimpleParser;
processor.Compiler = SimpleCompiler;
});

/* Not writable. */
p.pipe(new stream.Readable());

st.doesNotThrow(
function () {
p.end('foo');
},
'should not throw when piping to a non-writable stream'
);

p = unified().use(function (processor) {
processor.Parser = SimpleParser;
processor.Compiler = SimpleCompiler;
});

s = new stream.PassThrough();
s._isStdio = true;

p.pipe(s);

p.write('alpha');
p.write('bravo');
p.end('charlie');

st.doesNotThrow(
function () {
s.write('delta')
},
'should not `end` stdio streams'
);

p = unified()
.use(function (processor) {
processor.Parser = SimpleParser;
processor.Compiler = SimpleCompiler;
})
.on('error', function (err) {
st.equal(
err.message,
'Whoops!',
'should pass errors'
);
});

p.pipe(new PassThrough());
p.emit('error', new Error('Whoops!'));

p = unified()
.use(function (processor) {
processor.Parser = SimpleParser;
processor.Compiler = SimpleCompiler;
});

p.pipe(new PassThrough());

st.throws(
function () {
p.emit('error', new Error('Whoops!'));
},
/Whoops!/,
'should throw if errors are not listened to'
);

p = unified();

Expand Down Expand Up @@ -1360,6 +1427,14 @@ test('unified', function (t) {

p.end('delta');

st.throws(
function () {
p.end('foxtrot');
},
/Did not expect `write` after `end`/,
'should throw on write after end'
);

st.end();
});

Expand Down

0 comments on commit 712474f

Please sign in to comment.