From f84b4160b12db81ef0e87ead87d3ced441b9d4f2 Mon Sep 17 00:00:00 2001 From: Brian White Date: Thu, 10 Jan 2019 15:52:27 -0500 Subject: [PATCH] net: allow reading data into a static buffer --- benchmark/net/net-s2c.js | 43 ++++++++++---- lib/internal/stream_base_commons.js | 17 +++++- lib/net.js | 88 +++++++++++++++++------------ src/connection_wrap.cc | 23 ++++++++ src/connection_wrap.h | 4 ++ src/pipe_wrap.cc | 20 ++++++- src/pipe_wrap.h | 5 ++ src/stream_base-inl.h | 12 ++++ src/stream_base.cc | 39 ++++++++----- src/stream_base.h | 3 + src/stream_wrap.cc | 13 +++++ src/stream_wrap.h | 5 ++ src/tcp_wrap.cc | 17 +++++- src/tcp_wrap.h | 2 + 14 files changed, 225 insertions(+), 66 deletions(-) diff --git a/benchmark/net/net-s2c.js b/benchmark/net/net-s2c.js index 6ee5afa663aaca..78b01f810bbe83 100644 --- a/benchmark/net/net-s2c.js +++ b/benchmark/net/net-s2c.js @@ -5,33 +5,54 @@ const common = require('../common.js'); const PORT = common.PORT; const bench = common.createBenchmark(main, { - len: [64, 102400, 1024 * 1024 * 16], + sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 1024 * 1024], type: ['utf', 'asc', 'buf'], + recvbuflen: [0, 64 * 1024, 1024 * 1024], dur: [5] }); var chunk; var encoding; +var recvbuf; +var received = 0; + +function main({ dur, sendchunklen, type, recvbuflen }) { + if (isFinite(recvbuflen) && recvbuflen > 0) + recvbuf = Buffer.alloc(recvbuflen); -function main({ dur, len, type }) { switch (type) { case 'buf': - chunk = Buffer.alloc(len, 'x'); + chunk = Buffer.alloc(sendchunklen, 'x'); break; case 'utf': encoding = 'utf8'; - chunk = 'ü'.repeat(len / 2); + chunk = 'ü'.repeat(sendchunklen / 2); break; case 'asc': encoding = 'ascii'; - chunk = 'x'.repeat(len); + chunk = 'x'.repeat(sendchunklen); break; default: throw new Error(`invalid type: ${type}`); } const reader = new Reader(); - const writer = new Writer(); + var writer; + var socketOpts; + if (recvbuf === undefined) { + writer = new Writer(); + socketOpts = { port: PORT }; + } else { + socketOpts = { + port: PORT, + onread: { + buffer: recvbuf, + callback: function(nread, buf) { + received += nread; + } + } + }; + } // the actual benchmark. const server = net.createServer(function(socket) { @@ -39,14 +60,15 @@ function main({ dur, len, type }) { }); server.listen(PORT, function() { - const socket = net.connect(PORT); + const socket = net.connect(socketOpts); socket.on('connect', function() { bench.start(); - socket.pipe(writer); + if (recvbuf === undefined) + socket.pipe(writer); setTimeout(function() { - const bytes = writer.received; + const bytes = received; const gbits = (bytes * 8) / (1024 * 1024 * 1024); bench.end(gbits); process.exit(0); @@ -58,12 +80,11 @@ function main({ dur, len, type }) { const net = require('net'); function Writer() { - this.received = 0; this.writable = true; } Writer.prototype.write = function(chunk, encoding, cb) { - this.received += chunk.length; + received += chunk.length; if (typeof encoding === 'function') encoding(); diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index 31291e751d57a6..ccace4c6fb0193 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -17,6 +17,8 @@ const { owner_symbol } = require('internal/async_hooks').symbols; const kMaybeDestroy = Symbol('kMaybeDestroy'); const kUpdateTimer = Symbol('kUpdateTimer'); const kAfterAsyncWrite = Symbol('kAfterAsyncWrite'); +const kBuffer = Symbol('kBuffer'); +const kBufferCb = Symbol('kBufferCb'); function handleWriteReq(req, data, encoding) { const { handle } = req; @@ -140,9 +142,16 @@ function onStreamRead(arrayBuffer) { stream[kUpdateTimer](); if (nread > 0 && !stream.destroyed) { - const offset = streamBaseState[kArrayBufferOffset]; - const buf = new FastBuffer(arrayBuffer, offset, nread); - if (!stream.push(buf)) { + var ret; + const userBuf = stream[kBuffer]; + if (userBuf) { + ret = (stream[kBufferCb](nread, userBuf) !== false); + } else { + const offset = streamBaseState[kArrayBufferOffset]; + const buf = new FastBuffer(arrayBuffer, offset, nread); + ret = stream.push(buf); + } + if (!ret) { handle.reading = false; if (!stream.destroyed) { const err = handle.readStop(); @@ -186,4 +195,6 @@ module.exports = { kAfterAsyncWrite, kMaybeDestroy, kUpdateTimer, + kBuffer, + kBufferCb, }; diff --git a/lib/net.js b/lib/net.js index 7680c8860e1521..7e3f4abcd7d841 100644 --- a/lib/net.js +++ b/lib/net.js @@ -63,7 +63,9 @@ const { writeGeneric, onStreamRead, kAfterAsyncWrite, - kUpdateTimer + kUpdateTimer, + kBuffer, + kBufferCb } = require('internal/stream_base_commons'); const { codes: { @@ -101,18 +103,20 @@ function getFlags(ipv6Only) { return ipv6Only === true ? TCPConstants.UV_TCP_IPV6ONLY : 0; } -function createHandle(fd, is_server) { +function createHandle(fd, is_server, buf) { validateInt32(fd, 'fd', 0); const type = TTYWrap.guessHandleType(fd); if (type === 'PIPE') { return new Pipe( - is_server ? PipeConstants.SERVER : PipeConstants.SOCKET + is_server ? PipeConstants.SERVER : PipeConstants.SOCKET, + buf ); } if (type === 'TCP') { return new TCP( - is_server ? TCPConstants.SERVER : TCPConstants.SOCKET + is_server ? TCPConstants.SERVER : TCPConstants.SOCKET, + buf ); } @@ -241,6 +245,8 @@ function Socket(options) { this._host = null; this[kLastWriteQueueSize] = 0; this[kTimeout] = null; + this[kBuffer] = null; + this[kBufferCb] = null; if (typeof options === 'number') options = { fd: options }; // Legacy interface. @@ -265,40 +271,50 @@ function Socket(options) { if (options.handle) { this._handle = options.handle; // private this[async_id_symbol] = getNewAsyncId(this._handle); - } else if (options.fd !== undefined) { - const { fd } = options; - let err; - - // createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not - // a valid `PIPE` or `TCP` descriptor - this._handle = createHandle(fd, false); - - err = this._handle.open(fd); + } else { + const onread = options.onread; + if (onread !== null && typeof onread === 'object' && + Buffer.isBuffer(onread.buffer) && + typeof onread.callback === 'function') { + this[kBuffer] = onread.buffer; + this[kBufferCb] = onread.callback; + } + if (options.fd !== undefined) { + const { fd } = options; + let err; - // While difficult to fabricate, in some architectures - // `open` may return an error code for valid file descriptors - // which cannot be opened. This is difficult to test as most - // un-openable fds will throw on `createHandle` - if (err) - throw errnoException(err, 'open'); + // createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not + // a valid `PIPE` or `TCP` descriptor + this._handle = createHandle(fd, false); - this[async_id_symbol] = this._handle.getAsyncId(); + err = this._handle.open(fd); - if ((fd === 1 || fd === 2) && - (this._handle instanceof Pipe) && - process.platform === 'win32') { - // Make stdout and stderr blocking on Windows - err = this._handle.setBlocking(true); + // While difficult to fabricate, in some architectures + // `open` may return an error code for valid file descriptors + // which cannot be opened. This is difficult to test as most + // un-openable fds will throw on `createHandle` if (err) - throw errnoException(err, 'setBlocking'); - - this._writev = null; - this._write = makeSyncWrite(fd); - // makeSyncWrite adjusts this value like the original handle would, so - // we need to let it do that by turning it into a writable, own property. - Object.defineProperty(this._handle, 'bytesWritten', { - value: 0, writable: true - }); + throw errnoException(err, 'open'); + + this[async_id_symbol] = this._handle.getAsyncId(); + + if ((fd === 1 || fd === 2) && + (this._handle instanceof Pipe) && + process.platform === 'win32') { + // Make stdout and stderr blocking on Windows + err = this._handle.setBlocking(true); + if (err) + throw errnoException(err, 'setBlocking'); + + this._writev = null; + this._write = makeSyncWrite(fd); + // makeSyncWrite adjusts this value like the original handle would, so + // we need to let it do that by turning it into a writable, own + // property. + Object.defineProperty(this._handle, 'bytesWritten', { + value: 0, writable: true + }); + } } } @@ -888,8 +904,8 @@ Socket.prototype.connect = function(...args) { if (!this._handle) { this._handle = pipe ? - new Pipe(PipeConstants.SOCKET) : - new TCP(TCPConstants.SOCKET); + new Pipe(PipeConstants.SOCKET, this[kBuffer]) : + new TCP(TCPConstants.SOCKET, this[kBuffer]); initSocketHandle(this); } diff --git a/src/connection_wrap.cc b/src/connection_wrap.cc index db239f9becdb53..ef626a65d68ce2 100644 --- a/src/connection_wrap.cc +++ b/src/connection_wrap.cc @@ -29,6 +29,17 @@ ConnectionWrap::ConnectionWrap(Environment* env, reinterpret_cast(&handle_), provider) {} +template +ConnectionWrap::ConnectionWrap(Environment* env, + Local object, + ProviderType provider, + uv_buf_t buf) + : LibuvStreamWrap(env, + object, + reinterpret_cast(&handle_), + provider, + buf) {} + template void ConnectionWrap::OnConnection(uv_stream_t* handle, @@ -116,11 +127,23 @@ template ConnectionWrap::ConnectionWrap( Local object, ProviderType provider); +template ConnectionWrap::ConnectionWrap( + Environment* env, + Local object, + ProviderType provider, + uv_buf_t buf); + template ConnectionWrap::ConnectionWrap( Environment* env, Local object, ProviderType provider); +template ConnectionWrap::ConnectionWrap( + Environment* env, + Local object, + ProviderType provider, + uv_buf_t buf); + template void ConnectionWrap::OnConnection( uv_stream_t* handle, int status); diff --git a/src/connection_wrap.h b/src/connection_wrap.h index 5b114088760dad..399b7b45721652 100644 --- a/src/connection_wrap.h +++ b/src/connection_wrap.h @@ -19,6 +19,10 @@ class ConnectionWrap : public LibuvStreamWrap { ConnectionWrap(Environment* env, v8::Local object, ProviderType provider); + ConnectionWrap(Environment* env, + v8::Local object, + ProviderType provider, + uv_buf_t buf); UVType handle_; }; diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index 6259cbdd1918dd..30c37aff1a79d5 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -149,7 +149,14 @@ void PipeWrap::New(const FunctionCallbackInfo& args) { UNREACHABLE(); } - new PipeWrap(env, args.This(), provider, ipc); + if (args.Length() > 1 && Buffer::HasInstance(args[1])) { + uv_buf_t buf; + buf.base = Buffer::Data(args[1]); + buf.len = Buffer::Length(args[1]); + new PipeWrap(env, args.This(), provider, ipc, buf); + } else { + new PipeWrap(env, args.This(), provider, ipc); + } } @@ -163,6 +170,17 @@ PipeWrap::PipeWrap(Environment* env, // Suggestion: uv_pipe_init() returns void. } +PipeWrap::PipeWrap(Environment* env, + Local object, + ProviderType provider, + bool ipc, + uv_buf_t buf) + : ConnectionWrap(env, object, provider, buf) { + int r = uv_pipe_init(env->event_loop(), &handle_, ipc); + CHECK_EQ(r, 0); // How do we proxy this error up to javascript? + // Suggestion: uv_pipe_init() returns void. +} + void PipeWrap::Bind(const FunctionCallbackInfo& args) { PipeWrap* wrap; diff --git a/src/pipe_wrap.h b/src/pipe_wrap.h index b98d850439f0f4..139dbdc3bac42c 100644 --- a/src/pipe_wrap.h +++ b/src/pipe_wrap.h @@ -55,6 +55,11 @@ class PipeWrap : public ConnectionWrap { v8::Local object, ProviderType provider, bool ipc); + PipeWrap(Environment* env, + v8::Local object, + ProviderType provider, + bool ipc, + uv_buf_t buf); static void New(const v8::FunctionCallbackInfo& args); static void Bind(const v8::FunctionCallbackInfo& args); diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 7e2bbaa1730f2e..4abd28af9d17ad 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -150,6 +150,14 @@ inline void StreamResource::EmitWantsWrite(size_t suggested_size) { } inline StreamBase::StreamBase(Environment* env) : env_(env) { + buf_.base = nullptr; + buf_.len = 0; + PushStreamListener(&default_listener_); +} + +inline StreamBase::StreamBase(Environment* env, uv_buf_t buf) + : env_(env), + buf_(buf) { PushStreamListener(&default_listener_); } @@ -157,6 +165,10 @@ inline Environment* StreamBase::stream_env() const { return env_; } +inline uv_buf_t StreamBase::stream_buf() const { + return buf_; +} + inline int StreamBase::Shutdown(v8::Local req_wrap_obj) { Environment* env = stream_env(); diff --git a/src/stream_base.cc b/src/stream_base.cc index 739964eb85a762..cbf362f6870e5f 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -295,7 +295,7 @@ void StreamBase::CallJSOnreadMethod(ssize_t nread, DCHECK_EQ(static_cast(nread), nread); DCHECK_LE(offset, INT32_MAX); - if (ab.IsEmpty()) { + if (ab.IsEmpty() && buf_.base == nullptr) { DCHECK_EQ(offset, 0); DCHECK_LE(nread, 0); } else { @@ -347,7 +347,12 @@ void StreamResource::ClearError() { uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) { - return uv_buf_init(Malloc(suggested_size), suggested_size); + StreamBase* stream = static_cast(stream_); + const uv_buf_t stream_buf = stream->stream_buf(); + if (stream_buf.base != nullptr) + return stream_buf; + else + return uv_buf_init(Malloc(suggested_size), suggested_size); } @@ -357,22 +362,28 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { Environment* env = stream->stream_env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); + const uv_buf_t stream_buf = stream->stream_buf(); + Local obj; if (nread <= 0) { - free(buf.base); - if (nread < 0) - stream->CallJSOnreadMethod(nread, Local()); - return; - } + if (stream_buf.base == nullptr) + free(buf.base); + if (nread == 0) + return; + } else if (stream_buf.base != nullptr) { + CHECK_LE(static_cast(nread), stream_buf.len); + } else { + CHECK_LE(static_cast(nread), buf.len); - CHECK_LE(static_cast(nread), buf.len); - char* base = Realloc(buf.base, nread); + char* base = Realloc(buf.base, nread); - Local obj = ArrayBuffer::New( - env->isolate(), - base, - nread, - v8::ArrayBufferCreationMode::kInternalized); // Transfer ownership to V8. + obj = ArrayBuffer::New( + env->isolate(), + base, + nread, + // Transfer ownership to V8. + v8::ArrayBufferCreationMode::kInternalized); + } stream->CallJSOnreadMethod(nread, obj); } diff --git a/src/stream_base.h b/src/stream_base.h index 063c8714fd8eb8..a96751381beae0 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -271,6 +271,7 @@ class StreamBase : public StreamResource { // This is named `stream_env` to avoid name clashes, because a lot of // subclasses are also `BaseObject`s. Environment* stream_env() const; + uv_buf_t stream_buf() const; // Shut down the current stream. This request can use an existing // ShutdownWrap object (that was created in JS), or a new one will be created. @@ -301,6 +302,7 @@ class StreamBase : public StreamResource { protected: explicit StreamBase(Environment* env); + explicit StreamBase(Environment* env, uv_buf_t buf); // JS Methods int ReadStartJS(const v8::FunctionCallbackInfo& args); @@ -339,6 +341,7 @@ class StreamBase : public StreamResource { private: Environment* env_; + uv_buf_t buf_; EmitToJSStreamListener default_listener_; void SetWriteResult(const StreamWriteResult& res); diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index ae54b019fea034..ca66815a300480 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -106,6 +106,19 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env, stream_(stream) { } +LibuvStreamWrap::LibuvStreamWrap(Environment* env, + Local object, + uv_stream_t* stream, + AsyncWrap::ProviderType provider, + uv_buf_t buf) + : HandleWrap(env, + object, + reinterpret_cast(stream), + provider), + StreamBase(env, buf), + stream_(stream) { +} + Local LibuvStreamWrap::GetConstructorTemplate( Environment* env) { diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 19366ff4fba2c4..638072639d263f 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -84,6 +84,11 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { v8::Local object, uv_stream_t* stream, AsyncWrap::ProviderType provider); + LibuvStreamWrap(Environment* env, + v8::Local object, + uv_stream_t* stream, + AsyncWrap::ProviderType provider, + uv_buf_t buf); AsyncWrap* GetAsyncWrap() override; diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index dac621ec879e5c..8712ac2ef5ce17 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -158,7 +158,14 @@ void TCPWrap::New(const FunctionCallbackInfo& args) { UNREACHABLE(); } - new TCPWrap(env, args.This(), provider); + if (args.Length() > 1 && Buffer::HasInstance(args[1])) { + uv_buf_t buf; + buf.base = Buffer::Data(args[1]); + buf.len = Buffer::Length(args[1]); + new TCPWrap(env, args.This(), provider, buf); + } else { + new TCPWrap(env, args.This(), provider); + } } @@ -169,6 +176,14 @@ TCPWrap::TCPWrap(Environment* env, Local object, ProviderType provider) // Suggestion: uv_tcp_init() returns void. } +TCPWrap::TCPWrap(Environment* env, Local object, ProviderType provider, + uv_buf_t buf) + : ConnectionWrap(env, object, provider, buf) { + int r = uv_tcp_init(env->event_loop(), &handle_); + CHECK_EQ(r, 0); // How do we proxy this error up to javascript? + // Suggestion: uv_tcp_init() returns void. +} + void TCPWrap::SetNoDelay(const FunctionCallbackInfo& args) { TCPWrap* wrap; diff --git a/src/tcp_wrap.h b/src/tcp_wrap.h index db269f65281639..3fc626d49857a7 100644 --- a/src/tcp_wrap.h +++ b/src/tcp_wrap.h @@ -67,6 +67,8 @@ class TCPWrap : public ConnectionWrap { TCPWrap(Environment* env, v8::Local object, ProviderType provider); + TCPWrap(Environment* env, v8::Local object, + ProviderType provider, uv_buf_t buf); static void New(const v8::FunctionCallbackInfo& args); static void SetNoDelay(const v8::FunctionCallbackInfo& args);