diff --git a/lib/unified.js b/lib/unified.js index 2c5d41fa..f045444e 100644 --- a/lib/unified.js +++ b/lib/unified.js @@ -15,7 +15,7 @@ */ var events = require('events'); -var originalPipe = require('stream').Stream.prototype.pipe; +var once = require('once'); var extend = require('extend'); var bail = require('bail'); var vfile = require('vfile'); @@ -142,20 +142,12 @@ function unified() { return destination; } - /* - * Initialise events. - */ - - events.init.call(processor); - /* * Mix in methods. */ for (key in emitter) { - if (isFunction(emitter[key])) { - processor[key] = emitter[key]; - } + processor[key] = emitter[key]; } /* @@ -550,6 +542,7 @@ function unified() { if (!err) { processor.emit('data', file.contents); + processor.emit('end'); } else { // Don’t enter an infinite error throwing loop. global.setTimeout(function () { @@ -564,8 +557,10 @@ function unified() { /** * Pipe the processor into a writable stream. * - * Aside from setting options, all arguments - * are passed to `Stream#pipe`. + * Basically `Stream#pipe`, but inlined and + * simplified to keep the bundled size down. + * + * @see https://github.com/nodejs/node/blob/master/lib/stream.js#L26 * * @param {Stream} dest - Writable stream. * @param {Object?} [options] - Processing @@ -573,11 +568,69 @@ function unified() { * @return {Stream} - The destination stream. */ function pipe(dest, options) { + var onend = once(function () { + dest.end(); + }); + assertConcrete('pipe'); - settings = options; + settings = options || {}; + + /** + * Handle data. + * + * @param {*} chunk - Data to pass through. + */ + function ondata(chunk) { + if (dest.writable) { + dest.write(chunk); + } + } + + /** + * Clean listeners. + */ + function cleanup() { + processor.removeListener('data', ondata); + processor.removeListener('end', onend); + processor.removeListener('error', onerror); + processor.removeListener('end', cleanup); + processor.removeListener('close', cleanup); + + dest.removeListener('error', onerror); + dest.removeListener('close', cleanup); + } + + /** + * Close dangling pipes and handle unheard errors. + * + * @param {Error} err - Exception. + */ + function onerror(err) { + cleanup(); + + if (!processor.listenerCount('error')) { + throw err; // Unhandled stream error in pipe. + } + } + + processor.on('data', ondata); + processor.on('error', onerror); + processor.on('end', cleanup); + processor.on('close', cleanup); + + // If the 'end' option is not supplied, dest.end() will be called when + // the 'end' or 'close' events are received. Only dest.end() once. + if (!dest._isStdio && settings.end !== false) { + processor.on('end', onend); + } + + dest.on('error', onerror); + dest.on('close', cleanup); + + dest.emit('pipe', processor); - return originalPipe.apply(this, arguments); + return dest; } /* diff --git a/package.json b/package.json index df4d86f2..8886b524 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,7 @@ "dependencies": { "bail": "^1.0.0", "extend": "^3.0.0", + "once": "^1.3.3", "vfile": "^1.0.0" }, "browser": { diff --git a/test.js b/test.js index caef57c8..fd49a9b7 100644 --- a/test.js +++ b/test.js @@ -1285,8 +1285,6 @@ test('unified', function (t) { p .use(function (processor) { processor.Parser = SimpleParser; - }) - .use(function (processor) { processor.Compiler = SimpleCompiler; }) .on('data', function (value) { @@ -1309,6 +1307,75 @@ test('unified', function (t) { t.test('pipe(destination[, options])', function (st) { var p; + var s; + + p = unified().use(function (processor) { + processor.Parser = SimpleParser; + processor.Compiler = SimpleCompiler; + }); + + /* Not writable. */ + p.pipe(new stream.Readable()); + + st.doesNotThrow( + function () { + p.end('foo'); + }, + 'should not throw when piping to a non-writable stream' + ); + + p = unified().use(function (processor) { + processor.Parser = SimpleParser; + processor.Compiler = SimpleCompiler; + }); + + s = new stream.PassThrough(); + s._isStdio = true; + + p.pipe(s); + + p.write('alpha'); + p.write('bravo'); + p.end('charlie'); + + st.doesNotThrow( + function () { + s.write('delta') + }, + 'should not `end` stdio streams' + ); + + p = unified() + .use(function (processor) { + processor.Parser = SimpleParser; + processor.Compiler = SimpleCompiler; + }) + .on('error', function (err) { + st.equal( + err.message, + 'Whoops!', + 'should pass errors' + ); + }); + + p.pipe(new PassThrough()); + p.emit('error', new Error('Whoops!')); + + p = unified() + .use(function (processor) { + processor.Parser = SimpleParser; + processor.Compiler = SimpleCompiler; + }); + + p.pipe(new PassThrough()); + + st.throws( + function () { + p.emit('error', new Error('Whoops!')); + }, + /Whoops!/, + 'should throw if errors are not listened to' + ); p = unified(); @@ -1360,6 +1427,14 @@ test('unified', function (t) { p.end('delta'); + st.throws( + function () { + p.end('foxtrot'); + }, + /Did not expect `write` after `end`/, + 'should throw on write after end' + ); + st.end(); });