Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(ext/websocket): optimize op_ws_next_event #16325

Merged
merged 13 commits into from
Oct 19, 2022
40 changes: 27 additions & 13 deletions ext/websocket/01_websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
ArrayPrototypeMap,
ArrayPrototypeSome,
DataView,
Uint32Array,
ErrorPrototypeToString,
ObjectDefineProperties,
ObjectPrototypeIsPrototypeOf,
Expand Down Expand Up @@ -82,6 +83,10 @@
const _idleTimeoutDuration = Symbol("[[idleTimeout]]");
const _idleTimeoutTimeout = Symbol("[[idleTimeoutTimeout]]");
const _serverHandleIdleTimeout = Symbol("[[serverHandleIdleTimeout]]");

/* [event type, close code] */
const eventBuf = new Uint32Array(2);

class WebSocket extends EventTarget {
[_rid];

Expand Down Expand Up @@ -392,13 +397,15 @@

async [_eventLoop]() {
while (this[_readyState] !== CLOSED) {
const { kind, value } = await core.opAsync(
const value = await core.opAsync(
"op_ws_next_event",
this[_rid],
eventBuf,
);

const kind = eventBuf[0];
switch (kind) {
case "string": {
/* string */
case 0: {
this[_serverHandleIdleTimeout]();
const event = new MessageEvent("message", {
data: value,
Expand All @@ -407,7 +414,8 @@
this.dispatchEvent(event);
break;
}
case "binary": {
/* binary */
case 1: {
this[_serverHandleIdleTimeout]();
let data;

Expand All @@ -424,18 +432,23 @@
this.dispatchEvent(event);
break;
}
case "ping": {
/* ping */
case 3: {
core.opAsync("op_ws_send", this[_rid], {
kind: "pong",
});
Comment on lines 455 to 457
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can actually delete this because tungstenite auto-responds

Copy link
Member Author

@littledivy littledivy Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't want to touch functionality in this PR. maybe a follow up?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

break;
}
case "pong": {
/* pong */
case 4: {
this[_serverHandleIdleTimeout]();
break;
}
case "closed":
case "close": {
/* closed */
case 6: // falls through
/* close */
case 2: {
const code = eventBuf[1];
const prevState = this[_readyState];
this[_readyState] = CLOSED;
clearTimeout(this[_idleTimeoutTimeout]);
Expand All @@ -445,8 +458,8 @@
await core.opAsync(
"op_ws_close",
this[_rid],
value.code,
value.reason,
code,
value,
);
} catch {
// ignore failures
Expand All @@ -455,14 +468,15 @@

const event = new CloseEvent("close", {
wasClean: true,
code: value.code,
reason: value.reason,
code,
reason: value,
});
this.dispatchEvent(event);
core.tryClose(this[_rid]);
break;
}
case "error": {
/* error */
case 5: {
this[_readyState] = CLOSED;

const errorEv = new ErrorEvent("error", {
Expand Down
40 changes: 28 additions & 12 deletions ext/websocket/02_websocketstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
SymbolFor,
TypeError,
Uint8ArrayPrototype,
Uint32Array,
} = window.__bootstrap.primordials;

webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter(
Expand Down Expand Up @@ -168,12 +169,15 @@
PromisePrototypeThen(
(async () => {
while (true) {
const { kind } = await core.opAsync(
const kind = new Uint32Array(2);
await core.opAsync(
"op_ws_next_event",
create.rid,
kind,
);

if (kind === "close") {
/* close */
if (kind[0] === 2) {
break;
}
}
Expand Down Expand Up @@ -237,35 +241,46 @@
await this.closed;
},
});
/* [event type, close code] */
const eventBuf = new Uint32Array(2);

const pull = async (controller) => {
const { kind, value } = await core.opAsync(
const value = await core.opAsync(
"op_ws_next_event",
this[_rid],
eventBuf,
);

const kind = eventBuf[0];
switch (kind) {
case "string": {
/* string */
case 0: {
controller.enqueue(value);
break;
}
case "binary": {
/* binary */
case 1: {
controller.enqueue(value);
break;
}
case "ping": {
/* ping */
case 3: {
await core.opAsync("op_ws_send", this[_rid], {
kind: "pong",
});
await pull(controller);
break;
}
case "closed":
case "close": {
this[_closed].resolve(value);
/* closed */
case 6: // falls through
/* close */
case 2: {
const code = eventBuf[1];
this[_closed].resolve(code);
core.tryClose(this[_rid]);
break;
}
case "error": {
/* error */
case 5: {
const err = new Error(value);
this[_closed].reject(err);
controller.error(err);
Expand All @@ -285,7 +300,8 @@
return pull(controller);
}

this[_closed].resolve(value);
const code = eventBuf[1];
this[_closed].resolve(code);
core.tryClose(this[_rid]);
}
};
Expand Down
66 changes: 48 additions & 18 deletions ext/websocket/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::StringOrBuffer;
use deno_core::ZeroCopyBuf;
use deno_tls::create_client_config;
use http::header::HeaderName;
Expand Down Expand Up @@ -462,40 +463,69 @@ pub enum NextEventResponse {
Closed,
}

#[op]
#[repr(u32)]
enum NextEventKind {
String = 0,
Binary = 1,
Close = 2,
Ping = 3,
Pong = 4,
Error = 5,
Closed = 6,
}

#[op(deferred)]
pub async fn op_ws_next_event(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
) -> Result<NextEventResponse, AnyError> {
kind_out: &mut [u32],
) -> Result<Option<StringOrBuffer>, AnyError> {
let resource = state
.borrow_mut()
.resource_table
.get::<WsStreamResource>(rid)?;

let cancel = RcRef::map(&resource, |r| &r.cancel);
let val = resource.next_message(cancel).await?;
let res = match val {
Some(Ok(Message::Text(text))) => NextEventResponse::String(text),
Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()),
Some(Ok(Message::Close(Some(frame)))) => NextEventResponse::Close {
code: frame.code.into(),
reason: frame.reason.to_string(),
},
Some(Ok(Message::Close(None))) => NextEventResponse::Close {
code: 1005,
reason: String::new(),
},
Some(Ok(Message::Ping(_))) => NextEventResponse::Ping,
Some(Ok(Message::Pong(_))) => NextEventResponse::Pong,
Some(Err(e)) => NextEventResponse::Error(e.to_string()),
let (kind, value) = match val {
Some(Ok(Message::Text(text))) => (
NextEventKind::String as u32,
Some(StringOrBuffer::String(text)),
),
Some(Ok(Message::Binary(data))) => (
NextEventKind::Binary as u32,
Some(StringOrBuffer::Buffer(data.into())),
),
Some(Ok(Message::Close(Some(frame)))) => {
let code: u16 = frame.code.into();
kind_out[1] = code as u32;
(
NextEventKind::Close as u32,
Some(StringOrBuffer::String(frame.reason.to_string())),
)
}
Some(Ok(Message::Close(None))) => {
kind_out[1] = 1005;
(
NextEventKind::Close as u32,
Some(StringOrBuffer::String(String::new())),
)
}
Some(Ok(Message::Ping(_))) => (NextEventKind::Ping as u32, None),
Some(Ok(Message::Pong(_))) => (NextEventKind::Pong as u32, None),
Some(Err(e)) => (
NextEventKind::Error as u32,
Some(StringOrBuffer::String(e.to_string())),
),
None => {
// No message was received, presumably the socket closed while we waited.
// Try close the stream, ignoring any errors, and report closed status to JavaScript.
let _ = state.borrow_mut().resource_table.close(rid);
NextEventResponse::Closed
(NextEventKind::Closed as u32, None)
}
};
Ok(res)
kind_out[0] = kind as u32;
Ok(value)
}

pub fn init<P: WebSocketPermissions + 'static>(
Expand Down