From b818ded684c3aec814d18428b4ebd5a699005c5f Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Thu, 30 Mar 2023 19:32:38 +0530 Subject: [PATCH 1/5] perf(ext/websocket): special op for sending binary data frames --- ext/websocket/01_websocket.js | 5 +---- ext/websocket/lib.rs | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js index 399b46c52d8382..39ec56d25d51f5 100644 --- a/ext/websocket/01_websocket.js +++ b/ext/websocket/01_websocket.js @@ -301,10 +301,7 @@ class WebSocket extends EventTarget { const sendTypedArray = (ta) => { this[_bufferedAmount] += ta.byteLength; PromisePrototypeThen( - core.opAsync("op_ws_send", this[_rid], { - kind: "binary", - value: ta, - }), + core.opAsync("op_ws_send_binary", this[_rid], ta), () => { this[_bufferedAmount] -= ta.byteLength; }, diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index 4195f39b8b451f..adfa75c512bfa1 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -402,6 +402,20 @@ pub enum SendValue { Ping, } +#[op] +pub async fn op_ws_send_binary( + state: Rc>, + rid: ResourceId, + data: ZeroCopyBuf, +) -> Result<(), AnyError> { + let resource = state + .borrow_mut() + .resource_table + .get::(rid)?; + resource.send(Message::Binary(data.to_vec())).await?; + Ok(()) +} + #[op] pub async fn op_ws_send( state: Rc>, @@ -504,6 +518,7 @@ deno_core::extension!(deno_websocket, op_ws_send, op_ws_close, op_ws_next_event, + op_ws_send_binary, ], esm = [ "01_websocket.js", "02_websocketstream.js" ], options = { From c09e39c8f9894f05bd62169d44b3928c53f98f0f Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Thu, 30 Mar 2023 21:32:18 +0530 Subject: [PATCH 2/5] perf(ext/websocket): efficient event kind serialization --- ext/websocket/01_websocket.js | 57 ++++++++++++++----------- ext/websocket/02_websocketstream.js | 36 +++++++++------- ext/websocket/lib.rs | 65 ++++++++++++++++++----------- 3 files changed, 94 insertions(+), 64 deletions(-) diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js index 39ec56d25d51f5..3ed729caca0989 100644 --- a/ext/websocket/01_websocket.js +++ b/ext/websocket/01_websocket.js @@ -397,13 +397,14 @@ class WebSocket extends EventTarget { async [_eventLoop]() { while (this[_readyState] !== CLOSED) { - const { kind, value } = await core.opAsync( + const [kind, value] = await core.opAsync( "op_ws_next_event", this[_rid], ); switch (kind) { - case "string": { + case 0: { + /* string */ this[_serverHandleIdleTimeout](); const event = new MessageEvent("message", { data: value, @@ -412,14 +413,15 @@ class WebSocket extends EventTarget { this.dispatchEvent(event); break; } - case "binary": { + case 1: { + /* binary */ this[_serverHandleIdleTimeout](); let data; if (this.binaryType === "blob") { data = new Blob([value]); } else { - data = value.buffer; + data = value; } const event = new MessageEvent("message", { @@ -430,12 +432,32 @@ class WebSocket extends EventTarget { this.dispatchEvent(event); break; } - case "pong": { + case 2: { + /* pong */ this[_serverHandleIdleTimeout](); break; } - case "closed": - case "close": { + case 5: { + /* error */ + this[_readyState] = CLOSED; + + const errorEv = new ErrorEvent("error", { + message: value, + }); + this.dispatchEvent(errorEv); + + const closeEv = new CloseEvent("close"); + this.dispatchEvent(closeEv); + core.tryClose(this[_rid]); + break; + } + case 3: { + /* ping */ + break; + } + default: { + /* close */ + const code = kind; const prevState = this[_readyState]; this[_readyState] = CLOSED; clearTimeout(this[_idleTimeoutTimeout]); @@ -445,8 +467,8 @@ class WebSocket extends EventTarget { await core.opAsync( "op_ws_close", this[_rid], - value.code, - value.reason, + code, + value, ); } catch { // ignore failures @@ -455,26 +477,13 @@ class WebSocket extends EventTarget { const event = new CloseEvent("close", { wasClean: true, - code: value.code, - reason: value.reason, + code: code, + reason: value, }); this.dispatchEvent(event); core.tryClose(this[_rid]); break; } - case "error": { - this[_readyState] = CLOSED; - - const errorEv = new ErrorEvent("error", { - message: value, - }); - this.dispatchEvent(errorEv); - - const closeEv = new CloseEvent("close"); - this.dispatchEvent(closeEv); - core.tryClose(this[_rid]); - break; - } } } } diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js index 0a3aeb19268bce..b58ff2dfc40101 100644 --- a/ext/websocket/02_websocketstream.js +++ b/ext/websocket/02_websocketstream.js @@ -167,12 +167,13 @@ class WebSocketStream { PromisePrototypeThen( (async () => { while (true) { - const { kind } = await core.opAsync( + const [kind] = await core.opAsync( "op_ws_next_event", create.rid, ); - if (kind === "close") { + if (kind > 6) { + /* close */ break; } } @@ -237,37 +238,42 @@ class WebSocketStream { }, }); const pull = async (controller) => { - const { kind, value } = await core.opAsync( + const [kind, value] = await core.opAsync( "op_ws_next_event", this[_rid], ); switch (kind) { - case "string": { + case 0: + case 1: { + /* string */ + /* binary */ controller.enqueue(value); break; } - case "binary": { - controller.enqueue(value); + case 5: { + /* error */ + const err = new Error(value); + this[_closed].reject(err); + controller.error(err); + core.tryClose(this[_rid]); break; } - case "ping": { + case 3: { + /* ping */ await core.opAsync("op_ws_send", this[_rid], { kind: "pong", }); await pull(controller); break; } - case "closed": - case "close": { - this[_closed].resolve(value); - core.tryClose(this[_rid]); + case 2: { + /* pong */ break; } - case "error": { - const err = new Error(value); - this[_closed].reject(err); - controller.error(err); + default: { + /* close */ + this[_closed].resolve(value); core.tryClose(this[_rid]); break; } diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index adfa75c512bfa1..839c2309645995 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -8,6 +8,7 @@ use deno_core::futures::stream::SplitStream; use deno_core::futures::SinkExt; use deno_core::futures::StreamExt; use deno_core::op; +use deno_core::StringOrBuffer; use deno_core::url; use deno_core::AsyncRefCell; @@ -461,23 +462,21 @@ pub async fn op_ws_close( Ok(()) } -#[derive(Serialize)] -#[serde(tag = "kind", content = "value", rename_all = "camelCase")] -pub enum NextEventResponse { - String(String), - Binary(ZeroCopyBuf), - Close { code: u16, reason: String }, - Ping, - Pong, - Error(String), - Closed, +#[repr(u16)] +pub enum MessageKind { + Text = 0, + Binary = 1, + Pong = 2, + Ping = 3, + Error = 5, + Closed = 6, } #[op] pub async fn op_ws_next_event( state: Rc>, rid: ResourceId, -) -> Result { +) -> Result<(u16, StringOrBuffer), AnyError> { let resource = state .borrow_mut() .resource_table @@ -486,24 +485,40 @@ pub async fn op_ws_next_event( let cancel = RcRef::map(&resource, |r| &r.cancel); let val = resource.next_message(cancel).await?; let res = match val { - Some(Ok(Message::Text(text))) => NextEventResponse::String(text), - Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()), - Some(Ok(Message::Close(Some(frame)))) => NextEventResponse::Close { - code: frame.code.into(), - reason: frame.reason.to_string(), - }, - Some(Ok(Message::Close(None))) => NextEventResponse::Close { - code: 1005, - reason: String::new(), - }, - Some(Ok(Message::Ping(_))) => NextEventResponse::Ping, - Some(Ok(Message::Pong(_))) => NextEventResponse::Pong, - Some(Err(e)) => NextEventResponse::Error(e.to_string()), + Some(Ok(Message::Text(text))) => { + (MessageKind::Text as u16, StringOrBuffer::String(text)) + } + Some(Ok(Message::Binary(data))) => ( + MessageKind::Binary as u16, + StringOrBuffer::Buffer(data.into()), + ), + Some(Ok(Message::Close(Some(frame)))) => ( + frame.code.into(), + StringOrBuffer::String(frame.reason.to_string()), + ), + Some(Ok(Message::Close(None))) => { + (1005, StringOrBuffer::String("".to_string())) + } + Some(Ok(Message::Ping(_))) => ( + MessageKind::Ping as u16, + StringOrBuffer::Buffer(vec![].into()), + ), + Some(Ok(Message::Pong(_))) => ( + MessageKind::Pong as u16, + StringOrBuffer::Buffer(vec![].into()), + ), + Some(Err(e)) => ( + MessageKind::Error as u16, + StringOrBuffer::String(e.to_string()), + ), None => { // No message was received, presumably the socket closed while we waited. // Try close the stream, ignoring any errors, and report closed status to JavaScript. let _ = state.borrow_mut().resource_table.close(rid); - NextEventResponse::Closed + ( + MessageKind::Closed as u16, + StringOrBuffer::Buffer(vec![].into()), + ) } }; Ok(res) From 2f4522968c50e7e3a102180bc31e19271440b167 Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Thu, 30 Mar 2023 23:12:29 +0530 Subject: [PATCH 3/5] Fix --- ext/websocket/02_websocketstream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js index b58ff2dfc40101..36bafdeee7af71 100644 --- a/ext/websocket/02_websocketstream.js +++ b/ext/websocket/02_websocketstream.js @@ -273,7 +273,7 @@ class WebSocketStream { } default: { /* close */ - this[_closed].resolve(value); + this[_closed].resolve(kind); core.tryClose(this[_rid]); break; } From dba0dfa07bdde1d7406b186d9fd31d1a33110a35 Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Fri, 31 Mar 2023 08:59:49 +0530 Subject: [PATCH 4/5] review changes --- ext/websocket/01_websocket.js | 2 +- ext/websocket/02_websocketstream.js | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js index 884298710e42e5..03a6427c2944eb 100644 --- a/ext/websocket/01_websocket.js +++ b/ext/websocket/01_websocket.js @@ -394,7 +394,7 @@ class WebSocket extends EventTarget { async [_eventLoop]() { while (this[_readyState] !== CLOSED) { - const [kind, value] = await core.opAsync( + const { 0: kind, 1: value } = await core.opAsync( "op_ws_next_event", this[_rid], ); diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js index 36bafdeee7af71..a380f38b930abb 100644 --- a/ext/websocket/02_websocketstream.js +++ b/ext/websocket/02_websocketstream.js @@ -167,7 +167,7 @@ class WebSocketStream { PromisePrototypeThen( (async () => { while (true) { - const [kind] = await core.opAsync( + const { 0: kind } = await core.opAsync( "op_ws_next_event", create.rid, ); @@ -238,7 +238,7 @@ class WebSocketStream { }, }); const pull = async (controller) => { - const [kind, value] = await core.opAsync( + const { 0: kind, 1: value } = await core.opAsync( "op_ws_next_event", this[_rid], ); From 72a1285d8730066f7377d6621df96de5115b957d Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Fri, 31 Mar 2023 09:07:32 +0530 Subject: [PATCH 5/5] fixes --- ext/websocket/02_websocketstream.js | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js index a380f38b930abb..6e487f0b7c6c9d 100644 --- a/ext/websocket/02_websocketstream.js +++ b/ext/websocket/02_websocketstream.js @@ -271,9 +271,18 @@ class WebSocketStream { /* pong */ break; } + case 6: { + /* closed */ + this[_closed].resolve(undefined); + core.tryClose(this[_rid]); + break; + } default: { /* close */ - this[_closed].resolve(kind); + this[_closed].resolve({ + code: kind, + reason: value, + }); core.tryClose(this[_rid]); break; }