Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: mutable highwatermark #33346

19 changes: 19 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -1810,6 +1810,24 @@ pushed to the stream.
An attempt was made to call [`stream.unshift()`][] after the `'end'` event was
emitted.

<a id="ERR_STREAM_UPDATING_HIGHWATERMARK_IN_READ"></a>
### `ERR_STREAM_UPDATING_HIGHWATERMARK_IN_READ`
<!-- YAML
added: REPLACEME
-->

An attempt was made to update `readableHighWarkMark` from [`stream.read()`][]
function.

<a id="ERR_STREAM_UPDATING_HIGHWATERMARK_IN_WRITE"></a>
### `ERR_STREAM_UPDATING_HIGHWATERMARK_IN_WRITE`
<!-- YAML
added: REPLACEME
-->

An attempt was made to update `writableHighWaterMark` from [`stream.write()`][]
function.

<a id="ERR_STREAM_WRAP"></a>
### `ERR_STREAM_WRAP`

Expand Down Expand Up @@ -2569,6 +2587,7 @@ such as `process.stdout.on('data')`.
[`stream.push()`]: stream.html#stream_readable_push_chunk_encoding
[`stream.unshift()`]: stream.html#stream_readable_unshift_chunk_encoding
[`stream.write()`]: stream.html#stream_writable_write_chunk_encoding_callback
[`stream.read()`]: stream.html#stream_readable_read_size
[`subprocess.kill()`]: child_process.html#child_process_subprocess_kill_signal
[`subprocess.send()`]: child_process.html#child_process_subprocess_send_message_sendhandle_options_callback
[`util.getSystemErrorName(error.errno)`]: util.html#util_util_getsystemerrorname_err
Expand Down
12 changes: 6 additions & 6 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -540,13 +540,13 @@ Is set to `true` immediately before the [`'finish'`][] event is emitted.

##### `writable.writableHighWaterMark`
<!-- YAML
added: v9.3.0
added: REPLACEME
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be changed, instead add an additional metadata entry that mentions the setter ability.

-->

* {number}

Return the value of `highWaterMark` passed when constructing this
`Writable`.
Get or set the value of `highWaterMark` for a given `Writable` stream. Should
not update `writableHighWaterMark` from [`stream.write`][stream-write]

##### `writable.writableLength`
<!-- YAML
Expand Down Expand Up @@ -1183,13 +1183,13 @@ in the [Stream Three States][] section.

##### `readable.readableHighWaterMark`
<!-- YAML
added: v9.3.0
added: REPLACEME
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here

-->

* {number}

Returns the value of `highWaterMark` passed when constructing this
`Readable`.
Get or set the value of `highWaterMark` for a given `Readable` stream. Should
not update `readableHighWaterMark` from [`stream.read()`][stream-read].

##### `readable.readableLength`
<!-- YAML
Expand Down
12 changes: 11 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ const {
ERR_INVALID_ARG_TYPE,
ERR_STREAM_PUSH_AFTER_EOF,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
ERR_STREAM_UPDATING_HIGHWATERMARK_IN_READ,
} = require('internal/errors').codes;

const kPaused = Symbol('kPaused');
Expand Down Expand Up @@ -461,6 +462,7 @@ Readable.prototype.read = function(n) {
debug('length less than watermark', doRead);
}


// However, if we've ended, then there's no point, if we're already
// reading, then it's unnecessary, and if we're destroyed or errored,
// then it's not allowed.
Expand Down Expand Up @@ -1088,6 +1090,14 @@ ObjectDefineProperties(Readable.prototype, {
enumerable: false,
get: function() {
return this._readableState.highWaterMark;
},
set: function(val) {
const state = this._readableState;
if (state.reading) {
errorOrDestroy(this, new ERR_STREAM_UPDATING_HIGHWATERMARK_IN_READ());
return;
}
state.highWaterMark = val;
}
},

Expand Down
11 changes: 10 additions & 1 deletion lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ const {
ERR_STREAM_ALREADY_FINISHED,
ERR_STREAM_NULL_VALUES,
ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING
ERR_UNKNOWN_ENCODING,
ERR_STREAM_UPDATING_HIGHWATERMARK_IN_WRITE
} = require('internal/errors').codes;

const { errorOrDestroy } = destroyImpl;
Expand Down Expand Up @@ -758,6 +759,14 @@ ObjectDefineProperties(Writable.prototype, {
writableHighWaterMark: {
get() {
return this._writableState && this._writableState.highWaterMark;
},
set(val) {
const state = this._writableState;
if (state.writing) {
errorOrDestroy(this, new ERR_STREAM_UPDATING_HIGHWATERMARK_IN_WRITE());
return;
}
state.highWaterMark = val;
}
},

Expand Down
4 changes: 4 additions & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,10 @@ E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
'stream.unshift() after end event', Error);
E('ERR_STREAM_UPDATING_HIGHWATERMARK_IN_READ',
'Cannot update highwatermark while reading', Error);
E('ERR_STREAM_UPDATING_HIGHWATERMARK_IN_WRITE',
'Cannot update highwatermark while writing', Error);
E('ERR_STREAM_WRAP', 'Stream has StringDecoder set or is in objectMode', Error);
E('ERR_STREAM_WRITE_AFTER_END', 'write after end', Error);
E('ERR_SYNTHETIC', 'JavaScript Callstack', Error);
Expand Down
22 changes: 22 additions & 0 deletions test/parallel/test-stream-readable-hwm-error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const stream = require('stream');

const rs = new stream.Readable({
read: function() {
this.readableHighWaterMark = 200;
this.push(null);
}
});

const ws = stream.Writable({ write() {} });

rs.on('error', common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_UPDATING_HIGHWATERMARK_IN_READ');
assert.strictEqual(err.message, 'Cannot update highwatermark while reading');
}));
rs.on('end', common.mustNotCall());

rs.pipe(ws);
25 changes: 25 additions & 0 deletions test/parallel/test-stream-readable-hwm-mutable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const stream = require('stream');

const currHighWaterMark = 20;
let pushes = 0;
const rs = new stream.Readable({
read: common.mustCall(function() {
(pushes++ < 100) ? this.push(Buffer.alloc(1024)) : this.push(null);
}, 101)
});

const ws = stream.Writable({
write: common.mustCall(function(data, enc, cb) {
setImmediate(cb);
}, 100)
});

assert.strictEqual(rs.readableHighWaterMark, 16384); // default HWM
rs.readableHighWaterMark = currHighWaterMark;
assert.strictEqual(rs.readableHighWaterMark, currHighWaterMark);

rs.pipe(ws);
24 changes: 24 additions & 0 deletions test/parallel/test-stream-writable-hwm-error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const stream = require('stream');

const rs = new stream.Readable({
read: function() {
this.push(Buffer.alloc(1024));
this.push(null);
}
});

const ws = stream.Writable({ write() {
this.writableHighWaterMark = 20;
} });

ws.on('error', common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_UPDATING_HIGHWATERMARK_IN_WRITE');
assert.strictEqual(err.message, 'Cannot update highwatermark while writing');
}));
ws.on('end', common.mustNotCall());

rs.pipe(ws);
25 changes: 25 additions & 0 deletions test/parallel/test-stream-writable-hwm-mutable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const stream = require('stream');

let pushes = 0;
const rs = new stream.Readable({
read: common.mustCall(function() {
pushes++ < 100 ? this.push(Buffer.alloc(1024)) : this.push(null);
}, 101)
});

const currHighWaterMark = 0;
const ws = stream.Writable({
write: common.mustCall(function(_data, _enc, cb) {
setImmediate(cb);
}, 100)
});

assert.strictEqual(ws.writableHighWaterMark, 16384); // default HWM
ws.writableHighWaterMark = currHighWaterMark;
assert.strictEqual(ws.writableHighWaterMark, currHighWaterMark);

rs.pipe(ws);