Skip to content

Commit

Permalink
fs: add flush option to createWriteStream()
Browse files Browse the repository at this point in the history
This commit adds a 'flush' option to the createWriteStream()
family of functions.

Refs: #49886
PR-URL: #50093
Reviewed-By: Yagiz Nizipli <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Marco Ippolito <[email protected]>
  • Loading branch information
cjihrig authored Oct 11, 2023
1 parent f23a935 commit 8a49735
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 4 deletions.
11 changes: 11 additions & 0 deletions doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ fd.createReadStream({ start: 90, end: 99 });
<!-- YAML
added: v16.11.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/50093
description: The `flush` option is now supported.
-->
* `options` {Object}
Expand All @@ -326,6 +330,8 @@ added: v16.11.0
* `emitClose` {boolean} **Default:** `true`
* `start` {integer}
* `highWaterMark` {number} **Default:** `16384`
* `flush` {boolean} If `true`, the underlying file descriptor is flushed
prior to closing it. **Default:** `false`.
* Returns: {fs.WriteStream}
`options` may also include a `start` option to allow writing data at some
Expand Down Expand Up @@ -2540,6 +2546,9 @@ If `options` is a string, then it specifies the encoding.
<!-- YAML
added: v0.1.31
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/50093
description: The `flush` option is now supported.
- version: v16.10.0
pr-url: https://github.com/nodejs/node/pull/40013
description: The `fs` option does not need `open` method if an `fd` was provided.
Expand Down Expand Up @@ -2593,6 +2602,8 @@ changes:
* `fs` {Object|null} **Default:** `null`
* `signal` {AbortSignal|null} **Default:** `null`
* `highWaterMark` {number} **Default:** `16384`
* `flush` {boolean} If `true`, the underlying file descriptor is flushed
prior to closing it. **Default:** `false`.
* Returns: {fs.WriteStream}
`options` may also include a `start` option to allow writing data at some
Expand Down
1 change: 1 addition & 0 deletions lib/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -3108,6 +3108,7 @@ function createReadStream(path, options) {
* emitClose?: boolean;
* start: number;
* fs?: object | null;
* flush?: boolean;
* }} [options]
* @returns {WriteStream}
*/
Expand Down
1 change: 1 addition & 0 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ class FileHandle extends EventEmitter {
* autoClose?: boolean;
* emitClose?: boolean;
* start: number;
* flush?: boolean;
* }} [options]
* @returns {WriteStream}
*/
Expand Down
28 changes: 24 additions & 4 deletions lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const {
kEmptyObject,
} = require('internal/util');
const {
validateBoolean,
validateFunction,
validateInteger,
} = require('internal/validators');
Expand Down Expand Up @@ -92,6 +93,9 @@ const FileHandleOperations = (handle) => {
PromisePrototypeThen(handle.close(),
() => cb(), cb);
},
fsync: (fd, cb) => {
PromisePrototypeThen(handle.sync(), () => cb(), cb);
},
read: (fd, buf, offset, length, pos, cb) => {
PromisePrototypeThen(handle.read(buf, offset, length, pos),
(r) => cb(null, r.bytesRead, r.buffer),
Expand All @@ -113,14 +117,22 @@ const FileHandleOperations = (handle) => {
function close(stream, err, cb) {
if (!stream.fd) {
cb(err);
} else {
stream[kFs].close(stream.fd, (er) => {
cb(er || err);
} else if (stream.flush) {
stream[kFs].fsync(stream.fd, (flushErr) => {
_close(stream, err || flushErr, cb);
});
stream.fd = null;
} else {
_close(stream, err, cb);
}
}

function _close(stream, err, cb) {
stream[kFs].close(stream.fd, (er) => {
cb(er || err);
});
stream.fd = null;
}

function importFd(stream, options) {
if (typeof options.fd === 'number') {
// When fd is a raw descriptor, we must keep our fingers crossed
Expand Down Expand Up @@ -350,6 +362,14 @@ function WriteStream(path, options) {
validateFunction(this[kFs].close, 'options.fs.close');
}

this.flush = options.flush;
if (this.flush == null) {
this.flush = false;
} else {
validateBoolean(this.flush, 'options.flush');
validateFunction(this[kFs].fsync, 'options.fs.fsync');
}

// It's enough to override either, in which case only one will be used.
if (!this[kFs].write) {
this._write = null;
Expand Down
81 changes: 81 additions & 0 deletions test/parallel/test-fs-write-stream-flush.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
'use strict';
const common = require('../common');
const tmpdir = require('../common/tmpdir');
const assert = require('node:assert');
const fs = require('node:fs');
const fsp = require('node:fs/promises');
const test = require('node:test');
const data = 'foo';
let cnt = 0;

function nextFile() {
return tmpdir.resolve(`${cnt++}.out`);
}

tmpdir.refresh();

test('validation', () => {
for (const flush of ['true', '', 0, 1, [], {}, Symbol()]) {
assert.throws(() => {
fs.createWriteStream(nextFile(), { flush });
}, { code: 'ERR_INVALID_ARG_TYPE' });
}
});

test('performs flush', (t, done) => {
const spy = t.mock.method(fs, 'fsync');
const file = nextFile();
const stream = fs.createWriteStream(file, { flush: true });

stream.write(data, common.mustSucceed(() => {
stream.close(common.mustSucceed(() => {
const calls = spy.mock.calls;
assert.strictEqual(calls.length, 1);
assert.strictEqual(calls[0].result, undefined);
assert.strictEqual(calls[0].error, undefined);
assert.strictEqual(calls[0].arguments.length, 2);
assert.strictEqual(typeof calls[0].arguments[0], 'number');
assert.strictEqual(typeof calls[0].arguments[1], 'function');
assert.strictEqual(fs.readFileSync(file, 'utf8'), data);
done();
}));
}));
});

test('does not perform flush', (t, done) => {
const values = [undefined, null, false];
const spy = t.mock.method(fs, 'fsync');
let cnt = 0;

for (const flush of values) {
const file = nextFile();
const stream = fs.createWriteStream(file, { flush });

stream.write(data, common.mustSucceed(() => {
stream.close(common.mustSucceed(() => {
assert.strictEqual(fs.readFileSync(file, 'utf8'), data);
cnt++;

if (cnt === values.length) {
assert.strictEqual(spy.mock.calls.length, 0);
done();
}
}));
}));
}
});

test('works with file handles', async () => {
const file = nextFile();
const handle = await fsp.open(file, 'w');
const stream = handle.createWriteStream({ flush: true });

return new Promise((resolve) => {
stream.write(data, common.mustSucceed(() => {
stream.close(common.mustSucceed(() => {
assert.strictEqual(fs.readFileSync(file, 'utf8'), data);
resolve();
}));
}));
});
});

0 comments on commit 8a49735

Please sign in to comment.