From aa99a6ada86b96ff589dcf65b06183b802a77888 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 22 May 2019 12:17:21 -0700 Subject: [PATCH] quic: fixes and improvements PR-URL: https://github.com/nodejs/quic/pull/31 --- lib/internal/quic/core.js | 29 +++- lib/internal/stream_base_commons.js | 4 +- src/node_quic_session.cc | 9 +- src/node_quic_stream.cc | 118 +++++++++++------ src/node_quic_stream.h | 162 +++++++++++++++++++---- test/parallel/test-quic-client-server.js | 9 +- 6 files changed, 256 insertions(+), 75 deletions(-) diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index 7a640de229..26c2ea6182 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -1348,10 +1348,34 @@ class QuicStream extends Duplex { this[kHandle] = handle; this.#id = id; this.#session = session; + this._readableState.readingMore = true; + + // See src/node_quic_stream.h for an explanation + // of the initial states for unidirectional streams. + if (this.unidirectional) { + if (session instanceof QuicServerSession) { + if (this.serverInitiated) { + // Close the readable side + this.push(null); + this.read(); + } else { + // Close the writable side + this.end(); + } + } else { + if (this.serverInitiated) { + // Close the writable side + this.end(); + } else { + this.push(null); + this.read(); + } + } + } } get serverInitiated() { - return this.#id & 0b01; + return !!(this.#id & 0b01); } get clientInitiated() { @@ -1359,7 +1383,7 @@ class QuicStream extends Duplex { } get unidirectional() { - return this.#id & 0b10; + return !!(this.#id & 0b10); } get bidirectional() { @@ -1372,6 +1396,7 @@ class QuicStream extends Duplex { // remaining within the duplex writable side queue. this.end(); this.push(null); + this.read(); process.nextTick(emit.bind(this, 'reset', finalSize, appErrorCode)); // TODO(@jasnell): Should we destroy here? It's not yet clear // what else should be done diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index cc0c0fbd28..74a21326db 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -143,7 +143,9 @@ function afterWriteDispatched(self, req, err) { req.async = !!streamBaseState[kLastWriteWasAsync]; if (err !== 0) - return self.destroy(errnoException(err, 'write', req.error), cb); + return self.destroy( + errnoException(err, 'write', req.error), + req.callback()); if (!req.async && typeof req.callback === 'function') { req.callback(); diff --git a/src/node_quic_session.cc b/src/node_quic_session.cc index 00435139ed..f87667af5b 100644 --- a/src/node_quic_session.cc +++ b/src/node_quic_session.cc @@ -1426,7 +1426,6 @@ int QuicSession::Send0RTTStreamData( ssize_t ndatalen = 0; std::vector vec; - uint8_t fin = stream->IsShutdown() ? 1 : 0; size_t count = stream->DrainInto(&vec, from); size_t c = count; ngtcp2_vec* v = vec.data(); @@ -1439,7 +1438,7 @@ int QuicSession::Send0RTTStreamData( max_pktlen_, &ndatalen, stream->GetID(), - fin, + stream->IsWritable() ? 0 : 1, reinterpret_cast(v), c, uv_hrtime()); @@ -1505,7 +1504,7 @@ int QuicSession::SendStreamData( max_pktlen_, &ndatalen, stream->GetID(), - stream->IsShutdown() ? 1 : 0, + stream->IsWritable() ? 0 : 1, reinterpret_cast(v), c, uv_hrtime()); @@ -1719,7 +1718,9 @@ void QuicSession::StopRetransmitTimer() { // Called by ngtcp2 when a stream has been closed. If the stream does // not exist, the close is ignored. void QuicSession::StreamClose(int64_t stream_id, uint16_t app_error_code) { - CHECK(!IsDestroyed()); + // Ignore if the session has already been destroyed + if (IsDestroyed()) + return; Debug(this, "Closing stream %llu with code %d", stream_id, app_error_code); QuicStream* stream = FindStream(stream_id); diff --git a/src/node_quic_stream.cc b/src/node_quic_stream.cc index 6ba9f39b64..1a9272c01c 100644 --- a/src/node_quic_stream.cc +++ b/src/node_quic_stream.cc @@ -60,9 +60,10 @@ QuicStream::QuicStream( StreamBase(session->env()), session_(session), stream_id_(stream_id), - flags_(QUIC_STREAM_FLAG_NONE), + flags_(QUICSTREAM_FLAG_INITIAL), available_outbound_length_(0) { CHECK_NOT_NULL(session); + SetInitialFlags(); session->AddStream(this); StreamBase::AttachToObject(GetObject()); PushStreamListener(&stream_listener_); @@ -74,14 +75,43 @@ QuicStream::~QuicStream() { CHECK_EQ(0, streambuf_.Length()); } +inline void QuicStream::SetInitialFlags() { + if (GetDirection() == QUIC_STREAM_UNIDIRECTIONAL) { + if (session_->IsServer()) { + switch (GetOrigin()) { + case QUIC_STREAM_SERVER: + SetReadClose(); + break; + case QUIC_STREAM_CLIENT: + SetWriteClose(); + break; + default: + UNREACHABLE(); + } + } else { + switch (GetOrigin()) { + case QUIC_STREAM_SERVER: + SetWriteClose(); + break; + case QUIC_STREAM_CLIENT: + SetReadClose(); + break; + default: + UNREACHABLE(); + } + } + } +} + // QuicStream::Close() is called by the QuicSession when ngtcp2 detects that // a stream has been closed. This, in turn, calls out to the JavaScript to // start the process of tearing down and destroying the QuicStream instance. void QuicStream::Close(uint16_t app_error_code) { Debug(this, "Stream %llu closed with code %d", GetID(), app_error_code); + SetReadClose(); + SetWriteClose(); HandleScope scope(env()->isolate()); Context::Scope context_context(env()->context()); - flags_ |= QUIC_STREAM_FLAG_CLOSED; Local arg = Number::New(env()->isolate(), app_error_code); MakeCallback(env()->quic_on_stream_close_function(), 1, &arg); } @@ -108,15 +138,23 @@ void QuicStream::Reset(uint64_t final_size, uint16_t app_error_code) { } void QuicStream::Destroy() { + SetReadClose(); + SetWriteClose(); streambuf_.Cancel(); session_->RemoveStream(stream_id_); session_ = nullptr; } +// Do shutdown is called when the JS stream writable side is closed. +// We want to mark the writable side closed and send pending data. int QuicStream::DoShutdown(ShutdownWrap* req_wrap) { if (IsDestroyed()) return UV_EPIPE; - flags_ |= QUIC_STREAM_FLAG_SHUT; + // Do nothing if the stream was already shutdown. Specifically, + // we should not attempt to send anything on the QuicSession + if (!IsWritable()) + return 1; + SetWriteClose(); session_->SendStreamData(this); return 1; } @@ -128,7 +166,9 @@ int QuicStream::DoWrite( uv_stream_t* send_handle) { CHECK_NULL(send_handle); - if (IsDestroyed()) { + // A write should not have happened if we've been destroyed or + // the QuicStream is no longer writable. + if (IsDestroyed() || !IsWritable()) { req_wrap->Done(UV_EOF); return 0; } @@ -174,23 +214,17 @@ int QuicStream::DoWrite( // to be careful not to allow the internal buffer to grow // too large, or we'll run into several other problems. - uint64_t len = streambuf_.Copy(bufs, nbufs); + streambuf_.Copy(bufs, nbufs); req_wrap->Done(0); + session_->SendStreamData(this); // IncrementAvailableOutboundLength(len); - session_->SendStreamData(this); return 0; } -uint64_t QuicStream::GetID() const { - return stream_id_; -} - -QuicSession* QuicStream::Session() { - return session_; -} - void QuicStream::AckedDataOffset(uint64_t offset, size_t datalen) { + if (IsDestroyed()) + return; streambuf_.Consume(datalen); } @@ -212,40 +246,34 @@ inline void QuicStream::DecrementAvailableOutboundLength(size_t amount) { available_outbound_length_ -= amount; } -QuicStream* QuicStream::New( - QuicSession* session, - uint64_t stream_id) { - Local obj; - if (!session->env() - ->quicserverstream_constructor_template() - ->NewInstance(session->env()->context()).ToLocal(&obj)) { - return nullptr; - } - return new QuicStream(session, obj, stream_id); -} - int QuicStream::ReadStart() { CHECK(!this->IsDestroyed()); - Debug(this, "Reading started."); - flags_ |= QUIC_STREAM_FLAG_READ_START; - flags_ &= ~QUIC_STREAM_FLAG_READ_PAUSED; + CHECK(IsReadable()); + SetReadStart(); + SetReadResume(); return 0; } int QuicStream::ReadStop() { CHECK(!this->IsDestroyed()); - if (!IsReading()) - return 0; - Debug(this, "Reading stopped"); - flags_ |= QUIC_STREAM_FLAG_READ_PAUSED; + CHECK(IsReadable()); + SetReadPause(); return 0; } // Passes chunks of data on to the JavaScript side as soon as they are -// received. The caller of this must have a HandleScope. +// received but only if we're still readable. The caller of this must have a +// HandleScope. +// TODO(@jasnell): There's currently no flow control here. The data is pushed +// up to the JavaScript side regardless of whether the JS stream is flowing and +// the connected peer is told to keep sending. We need to implement back +// pressure. void QuicStream::ReceiveData(int fin, const uint8_t* data, size_t datalen) { - Debug(this, "Receiving %d bytes of data. Final? %s", - datalen, fin ? "yes" : "no"); + Debug(this, "Receiving %d bytes of data. Final? %s. Readable? %s", + datalen, fin ? "yes" : "no", IsReadable() ? "yes" : "no"); + + if (!IsReadable()) + return; while (datalen > 0) { uv_buf_t buf = EmitAlloc(datalen); @@ -265,8 +293,24 @@ void QuicStream::ReceiveData(int fin, const uint8_t* data, size_t datalen) { EmitRead(avail, buf); }; - if (fin) + // When fin != 0, we've received that last chunk of data for this + // stream, indicating that the stream is no longer readable. + if (fin) { + SetReadClose(); EmitRead(UV_EOF); + } +} + +QuicStream* QuicStream::New( + QuicSession* session, + uint64_t stream_id) { + Local obj; + if (!session->env() + ->quicserverstream_constructor_template() + ->NewInstance(session->env()->context()).ToLocal(&obj)) { + return nullptr; + } + return new QuicStream(session, obj, stream_id); } // JavaScript API diff --git a/src/node_quic_stream.h b/src/node_quic_stream.h index 4cd4814ec3..d07c5e00e7 100644 --- a/src/node_quic_stream.h +++ b/src/node_quic_stream.h @@ -17,24 +17,80 @@ namespace quic { class QuicSession; class QuicServerSession; -enum quic_stream_flags { - QUIC_STREAM_FLAG_NONE = 0x0, - QUIC_STREAM_FLAG_SHUT = 0x1, - QUIC_STREAM_FLAG_READ_START = 0x2, - QUIC_STREAM_FLAG_READ_PAUSED = 0x4, - QUIC_STREAM_FLAG_CLOSED = 0x8, - QUIC_STREAM_FLAG_EOS = 0x20 -}; - class QuicStreamListener : public StreamListener { public: uv_buf_t OnStreamAlloc(size_t suggested_size) override; void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; }; +// QuicStream's are simple data flows that, fortunately, do not +// require much. They may be: +// +// * Bidirectional or Unidirectional +// * Server or Client Initiated +// +// The flow direction and origin of the stream are important in +// determining the write and read state (Open or Closed). Specifically: +// +// A Unidirectional stream originating with the Server is: +// +// * Server Writable (Open) but not Client Writable (Closed) +// * Client Readable (Open) but not Server Readable (Closed) +// +// Likewise, a Unidirectional stream originating with the +// Client is: +// +// * Client Writable (Open) but not Server Writable (Closed) +// * Server Readable (Open) but not Client Readable (Closed) +// +// Bidirectional Stream States +// +------------+--------------+--------------------+---------------------+ +// | | Initiated By | Initial Read State | Initial Write State | +// +------------+--------------+--------------------+---------------------+ +// | On Server | Server | Open | Open | +// +------------+--------------+--------------------+---------------------+ +// | On Server | Client | Open | Open | +// +------------+--------------+--------------------+---------------------+ +// | On Client | Server | Open | Open | +// +------------+--------------+--------------------+---------------------+ +// | On Client | Client | Open | Open | +// +------------+--------------+--------------------+---------------------+ +// +// Unidirectional Stream States +// +------------+--------------+--------------------+---------------------+ +// | | Initiated By | Initial Read State | Initial Write State | +// +------------+--------------+--------------------+---------------------+ +// | On Server | Server | Closed | Open | +// +------------+--------------+--------------------+---------------------+ +// | On Server | Client | Open | Closed | +// +------------+--------------+--------------------+---------------------+ +// | On Client | Server | Open | Closed | +// +------------+--------------+--------------------+---------------------+ +// | On Client | Client | Closed | Open | +// +------------+--------------+--------------------+---------------------+ +// +// The Closed states is terminal. A stream may be destroyed +// naturally when both the read and write states are Closed. +// Although, any stream may be abruptly terminated at any time. +// +// A stream that is Open Writable may have data pending or not. +// +// A QuicSession should only attempt to send stream data when (a) there +// is data pending to send of (b) there is no remaining data to send and +// the writable side is ready to transition to Closed. class QuicStream : public AsyncWrap, public StreamBase { public: + enum QuicStreamDirection { + QUIC_STREAM_BIRECTIONAL, + QUIC_STREAM_UNIDIRECTIONAL + }; + + enum QuicStreamOrigin { + QUIC_STREAM_SERVER, + QUIC_STREAM_CLIENT + }; + static void Initialize( Environment* env, v8::Local target, @@ -44,9 +100,49 @@ class QuicStream : public AsyncWrap, ~QuicStream() override; - uint64_t GetID() const; + inline QuicStreamDirection GetDirection() const { + return stream_id_ & 0b10 ? + QUIC_STREAM_UNIDIRECTIONAL : + QUIC_STREAM_BIRECTIONAL; + } + + inline QuicStreamOrigin GetOrigin() const { + return stream_id_ & 0b01 ? + QUIC_STREAM_SERVER : + QUIC_STREAM_CLIENT; + } + + uint64_t GetID() const { return stream_id_; } + + inline bool IsDestroyed() { + return session_ == nullptr; + } + + inline bool IsWritable() { + return (flags_ & QUICSTREAM_FLAG_WRITE) == 0; + } + + inline bool IsReadable() { + return (flags_ & QUICSTREAM_FLAG_READ) == 0; + } + + inline bool IsReadStarted() { + return flags_ & QUICSTREAM_FLAG_READ_STARTED; + } + + inline bool IsReadPaused() { + return flags_ & QUICSTREAM_FLAG_READ_PAUSED; + } + + bool IsAlive() override { + return !IsDestroyed() && !IsClosing(); + } + + bool IsClosing() override { + return !IsWritable() && !IsReadable(); + } - QuicSession* Session(); + QuicSession* Session() { return session_; } virtual void AckedDataOffset(uint64_t offset, size_t datalen); @@ -62,20 +158,6 @@ class QuicStream : public AsyncWrap, size_t nbufs, uv_stream_t* send_handle) override; - bool IsAlive() override { - return !IsDestroyed() && !IsShutdown() && !IsClosing(); - } - bool IsClosing() override { - return flags_ & QUIC_STREAM_FLAG_SHUT || - flags_ & QUIC_STREAM_FLAG_EOS; - } - - inline bool IsDestroyed() { return session_ == nullptr; } - inline bool IsEnded() { return flags_ & QUIC_STREAM_FLAG_EOS; } - inline bool IsPaused() { return flags_ & QUIC_STREAM_FLAG_READ_PAUSED; } - inline bool IsReading() { return flags_ & QUIC_STREAM_FLAG_READ_START; } - inline bool IsShutdown() { return flags_ & QUIC_STREAM_FLAG_SHUT; } - inline void IncrementAvailableOutboundLength(size_t amount); inline void DecrementAvailableOutboundLength(size_t amount); @@ -115,6 +197,36 @@ class QuicStream : public AsyncWrap, v8::Local target, uint64_t stream_id); + inline void SetInitialFlags(); + + enum Flags { + QUICSTREAM_FLAG_INITIAL = 0, + QUICSTREAM_FLAG_READ = 1, + QUICSTREAM_FLAG_WRITE = 2, + QUICSTREAM_FLAG_READ_STARTED = 3, + QUICSTREAM_FLAG_READ_PAUSED = 8 + }; + + inline void SetWriteClose() { + flags_ |= QUICSTREAM_FLAG_WRITE; + } + + inline void SetReadClose() { + flags_ |= QUICSTREAM_FLAG_READ; + } + + inline void SetReadStart() { + flags_ |= QUICSTREAM_FLAG_READ_STARTED; + } + + inline void SetReadPause() { + flags_ |= QUICSTREAM_FLAG_READ_PAUSED; + } + + inline void SetReadResume() { + flags_ &= QUICSTREAM_FLAG_READ_PAUSED; + } + QuicStreamListener stream_listener_; QuicSession* session_; uint64_t stream_id_; diff --git a/test/parallel/test-quic-client-server.js b/test/parallel/test-quic-client-server.js index 77e494c58c..3164dfdde3 100644 --- a/test/parallel/test-quic-client-server.js +++ b/test/parallel/test-quic-client-server.js @@ -21,13 +21,12 @@ const filedata = fs.readFileSync(__filename, { encoding: 'utf8' }); const { createSocket } = require('quic'); let client; -const server = createSocket({ type: 'udp4', port: 5678 }); +const server = createSocket({ type: 'udp4', port: 0 }); const unidata = ['I wonder if it worked.', 'test']; const kServerName = 'test'; const kALPN = 'zzz'; // ALPN can be overriden to whatever we want -const keylogFile = fs.createWriteStream('keys.log'); const kKeylogs = [ /QUIC_SERVER_HANDSHAKE_TRAFFIC_SECRET.*/, @@ -46,7 +45,6 @@ const countdown = new Countdown(2, () => { debug('Countdown expired. Destroying sockets'); server.close(); client.close(); - keylogFile.end(); }); server.listen({ key, cert, alpn: kALPN }); @@ -55,7 +53,6 @@ server.on('session', common.mustCall((session) => { session.on('keylog', common.mustCall((line) => { assert(kKeylogs.shift().test(line)); - keylogFile.write(line); }, kKeylogs.length)); session.on('secure', common.mustCall((servername, alpn, cipher) => { @@ -95,7 +92,7 @@ server.on('session', common.mustCall((session) => { server.on('ready', common.mustCall(() => { debug('Server is listening on port %d', server.address.port); - client = createSocket({ type: 'udp4', port: 5679 }); + client = createSocket({ type: 'udp4', port: 0 }); const req = client.connect({ type: 'udp4', address: 'localhost', @@ -126,7 +123,7 @@ server.on('ready', common.mustCall(() => { debug('QuicClientSession TLS Handshake Complete'); debug(' Server name: %s', servername); debug(' ALPN: %s', alpn); - debug(' Cipy: %s, %s', cipher.name, cipher.version); + debug(' Cipher: %s, %s', cipher.name, cipher.version); assert.strictEqual(servername, kServerName); assert.strictEqual(req.servername, kServerName); assert.strictEqual(alpn, kALPN);