Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(ext/websocket): efficient event kind serialization #18509

Merged
merged 6 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
perf(ext/websocket): efficient event kind serialization
  • Loading branch information
littledivy committed Mar 30, 2023
commit c09e39c8f9894f05bd62169d44b3928c53f98f0f
57 changes: 33 additions & 24 deletions ext/websocket/01_websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,14 @@ class WebSocket extends EventTarget {

async [_eventLoop]() {
while (this[_readyState] !== CLOSED) {
const { kind, value } = await core.opAsync(
const [kind, value] = await core.opAsync(
"op_ws_next_event",
this[_rid],
);

switch (kind) {
case "string": {
case 0: {
/* string */
this[_serverHandleIdleTimeout]();
const event = new MessageEvent("message", {
data: value,
Expand All @@ -412,14 +413,15 @@ class WebSocket extends EventTarget {
this.dispatchEvent(event);
break;
}
case "binary": {
case 1: {
/* binary */
this[_serverHandleIdleTimeout]();
let data;

if (this.binaryType === "blob") {
data = new Blob([value]);
} else {
data = value.buffer;
data = value;
}

const event = new MessageEvent("message", {
Expand All @@ -430,12 +432,32 @@ class WebSocket extends EventTarget {
this.dispatchEvent(event);
break;
}
case "pong": {
case 2: {
/* pong */
this[_serverHandleIdleTimeout]();
break;
}
case "closed":
case "close": {
case 5: {
/* error */
this[_readyState] = CLOSED;

const errorEv = new ErrorEvent("error", {
message: value,
});
this.dispatchEvent(errorEv);

const closeEv = new CloseEvent("close");
this.dispatchEvent(closeEv);
core.tryClose(this[_rid]);
break;
}
case 3: {
/* ping */
break;
}
default: {
/* close */
const code = kind;
const prevState = this[_readyState];
this[_readyState] = CLOSED;
clearTimeout(this[_idleTimeoutTimeout]);
Expand All @@ -445,8 +467,8 @@ class WebSocket extends EventTarget {
await core.opAsync(
"op_ws_close",
this[_rid],
value.code,
value.reason,
code,
value,
);
} catch {
// ignore failures
Expand All @@ -455,26 +477,13 @@ class WebSocket extends EventTarget {

const event = new CloseEvent("close", {
wasClean: true,
code: value.code,
reason: value.reason,
code: code,
reason: value,
});
this.dispatchEvent(event);
core.tryClose(this[_rid]);
break;
}
case "error": {
this[_readyState] = CLOSED;

const errorEv = new ErrorEvent("error", {
message: value,
});
this.dispatchEvent(errorEv);

const closeEv = new CloseEvent("close");
this.dispatchEvent(closeEv);
core.tryClose(this[_rid]);
break;
}
}
}
}
Expand Down
36 changes: 21 additions & 15 deletions ext/websocket/02_websocketstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,13 @@ class WebSocketStream {
PromisePrototypeThen(
(async () => {
while (true) {
const { kind } = await core.opAsync(
const [kind] = await core.opAsync(
"op_ws_next_event",
create.rid,
);

if (kind === "close") {
if (kind > 6) {
/* close */
break;
}
}
Expand Down Expand Up @@ -237,37 +238,42 @@ class WebSocketStream {
},
});
const pull = async (controller) => {
const { kind, value } = await core.opAsync(
const [kind, value] = await core.opAsync(
"op_ws_next_event",
this[_rid],
);

switch (kind) {
case "string": {
case 0:
case 1: {
/* string */
/* binary */
controller.enqueue(value);
break;
}
case "binary": {
controller.enqueue(value);
case 5: {
/* error */
const err = new Error(value);
this[_closed].reject(err);
controller.error(err);
core.tryClose(this[_rid]);
break;
}
case "ping": {
case 3: {
/* ping */
await core.opAsync("op_ws_send", this[_rid], {
kind: "pong",
});
await pull(controller);
break;
}
case "closed":
case "close": {
this[_closed].resolve(value);
core.tryClose(this[_rid]);
case 2: {
/* pong */
break;
}
case "error": {
const err = new Error(value);
this[_closed].reject(err);
controller.error(err);
default: {
/* close */
this[_closed].resolve(value);
core.tryClose(this[_rid]);
break;
}
Expand Down
65 changes: 40 additions & 25 deletions ext/websocket/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use deno_core::futures::stream::SplitStream;
use deno_core::futures::SinkExt;
use deno_core::futures::StreamExt;
use deno_core::op;
use deno_core::StringOrBuffer;

use deno_core::url;
use deno_core::AsyncRefCell;
Expand Down Expand Up @@ -461,23 +462,21 @@ pub async fn op_ws_close(
Ok(())
}

#[derive(Serialize)]
#[serde(tag = "kind", content = "value", rename_all = "camelCase")]
pub enum NextEventResponse {
String(String),
Binary(ZeroCopyBuf),
Close { code: u16, reason: String },
Ping,
Pong,
Error(String),
Closed,
#[repr(u16)]
pub enum MessageKind {
Text = 0,
Binary = 1,
Pong = 2,
Ping = 3,
Error = 5,
Closed = 6,
}

#[op]
pub async fn op_ws_next_event(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
) -> Result<NextEventResponse, AnyError> {
) -> Result<(u16, StringOrBuffer), AnyError> {
let resource = state
.borrow_mut()
.resource_table
Expand All @@ -486,24 +485,40 @@ pub async fn op_ws_next_event(
let cancel = RcRef::map(&resource, |r| &r.cancel);
let val = resource.next_message(cancel).await?;
let res = match val {
Some(Ok(Message::Text(text))) => NextEventResponse::String(text),
Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()),
Some(Ok(Message::Close(Some(frame)))) => NextEventResponse::Close {
code: frame.code.into(),
reason: frame.reason.to_string(),
},
Some(Ok(Message::Close(None))) => NextEventResponse::Close {
code: 1005,
reason: String::new(),
},
Some(Ok(Message::Ping(_))) => NextEventResponse::Ping,
Some(Ok(Message::Pong(_))) => NextEventResponse::Pong,
Some(Err(e)) => NextEventResponse::Error(e.to_string()),
Some(Ok(Message::Text(text))) => {
(MessageKind::Text as u16, StringOrBuffer::String(text))
}
Some(Ok(Message::Binary(data))) => (
MessageKind::Binary as u16,
StringOrBuffer::Buffer(data.into()),
),
Some(Ok(Message::Close(Some(frame)))) => (
frame.code.into(),
StringOrBuffer::String(frame.reason.to_string()),
),
Some(Ok(Message::Close(None))) => {
(1005, StringOrBuffer::String("".to_string()))
}
Some(Ok(Message::Ping(_))) => (
MessageKind::Ping as u16,
StringOrBuffer::Buffer(vec![].into()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to send these empty buffers? Could we replace them with undefined? That still wouldn't address the fact that this function is megamorphic and will get deoptimized by V8 every time data received is different than when the function got jitted (if it gets jitted at all)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea to explore: store the received data on the Websocket struct, return only the number to JS and immediately call a sync op that either gets a buffer or a string (ie. two separate ops). That way we wouldn't make V8 deoptimize this function

Copy link
Member Author

@littledivy littledivy Mar 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to send these empty buffers? Could we replace them with undefined?

Option<StringOrBuffer> makes it a little slow, since Ping and Pong are not frequent events i've left them as-is.

That still wouldn't address the fact that this function is megamorphic and will get deoptimized by V8 every time data received is different than when the function got jitted (if it gets jitted at all)

It won't get deoptimized everytime return value changes. AFAIK V8 can learn that this function returns either a AB or string back. It'll have JIT code prepared to handle that after a few calls.

),
Some(Ok(Message::Pong(_))) => (
MessageKind::Pong as u16,
StringOrBuffer::Buffer(vec![].into()),
),
Some(Err(e)) => (
MessageKind::Error as u16,
StringOrBuffer::String(e.to_string()),
),
None => {
// No message was received, presumably the socket closed while we waited.
// Try close the stream, ignoring any errors, and report closed status to JavaScript.
let _ = state.borrow_mut().resource_table.close(rid);
NextEventResponse::Closed
(
MessageKind::Closed as u16,
StringOrBuffer::Buffer(vec![].into()),
)
}
};
Ok(res)
Expand Down