Skip to content

Commit

Permalink
Adjust wasm socket
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv committed Jun 26, 2023
1 parent 4dec3b7 commit 2e660bb
Showing 1 changed file with 49 additions and 44 deletions.
93 changes: 49 additions & 44 deletions subxt/src/client/lightclient/platform/wasm_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use futures::{io, prelude::*};
use send_wrapper::SendWrapper;
use wasm_bindgen::{prelude::*, JsCast};
use web_sys::{MessageEvent, WebSocket};

use std::{
collections::VecDeque,
Expand All @@ -21,14 +20,16 @@ pub enum Error {
ConnectionError(String),
}

/// Wasm websocket.
/// Websocket for WASM environments.
///
/// This is a rust-based wrapper around browser's WebSocket API.
pub struct WasmSocket {
inner: Arc<Mutex<InnerWasmSocket>>,
}

/// The state of the [`WasmSocket`].
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum ConnectionState {
enum ConnectionState {
/// Initial state of the socket.
Connecting,
/// Socket is fully opened.
Expand All @@ -40,29 +41,39 @@ pub enum ConnectionState {
}

struct InnerWasmSocket {
/// The state of the connection.
state: ConnectionState,
/// This implements `Send` and panics if the value is accessed
/// or dropped from another thread.
///
/// This is safe in wasm environments.
socket: SendWrapper<WebSocket>,
socket: SendWrapper<web_sys::WebSocket>,
/// Data buffer for the socket.
data: VecDeque<u8>,
/// Waker from `poll_read` / `poll_write`.
waker: Option<Waker>,
/// In memory callbacks to handle messages from the browser socket.
callbacks: Option<SendWrapper<Callbacks>>,
}

/// Registered callbacks of the [`WasmSocket`].
struct Callbacks {
open: Closure<dyn FnMut()>,
message: Closure<dyn FnMut(MessageEvent)>,
error: Closure<dyn FnMut(web_sys::Event)>,
close: Closure<dyn FnMut(web_sys::CloseEvent)>,
}
///
/// These need to be kept around until the socket is dropped.
type Callbacks = (
Closure<dyn FnMut()>,
Closure<dyn FnMut(web_sys::MessageEvent)>,
Closure<dyn FnMut(web_sys::Event)>,
Closure<dyn FnMut(web_sys::CloseEvent)>,
);

impl WasmSocket {
/// Constructs a new [`WasmSocket`].
/// Establish a WebSocket connection.
///
/// The error is a string representing the browser error.
/// Visit [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#exceptions_thrown)
/// for more info.
pub fn new(addr: &str) -> Result<Self, Error> {
let socket = match WebSocket::new(addr) {
let socket = match web_sys::WebSocket::new(addr) {
Ok(socket) => socket,
Err(err) => return Err(Error::ConnectionError(format!("{:?}", err))),
};
Expand All @@ -77,22 +88,22 @@ impl WasmSocket {
callbacks: None,
}));

let open = Closure::<dyn FnMut()>::new({
let open_callback = Closure::<dyn FnMut()>::new({
let inner = inner.clone();
move || {
let mut inner = inner.lock().expect("Mutex is poised; qed");
inner.state = ConnectionState::Opened;

if let Some(waker) = &inner.waker {
waker.wake_by_ref();
if let Some(waker) = inner.waker.take() {
waker.wake();
}
}
});
socket.set_onopen(Some(open.as_ref().unchecked_ref()));
socket.set_onopen(Some(open_callback.as_ref().unchecked_ref()));

let message = Closure::<dyn FnMut(_)>::new({
let message_callback = Closure::<dyn FnMut(_)>::new({
let inner = inner.clone();
move |event: MessageEvent| {
move |event: web_sys::MessageEvent| {
let Ok(buffer) = event.data().dyn_into::<js_sys::ArrayBuffer>() else {
panic!("Unexpected data format {:?}", event.data());
};
Expand All @@ -101,56 +112,50 @@ impl WasmSocket {
let bytes = js_sys::Uint8Array::new(&buffer).to_vec();
inner.data.extend(bytes.into_iter());

if let Some(waker) = &inner.waker {
waker.wake_by_ref();
if let Some(waker) = inner.waker.take() {
waker.wake();
}
}
});
socket.set_onmessage(Some(message.as_ref().unchecked_ref()));
socket.set_onmessage(Some(message_callback.as_ref().unchecked_ref()));

let error = Closure::<dyn FnMut(_)>::new({
let error_callback = Closure::<dyn FnMut(_)>::new({
let inner = inner.clone();
move |_| {
// Callback does not provide useful information, signal it back to the stream.
let mut inner = inner.lock().expect("Mutex is poised; qed");
inner.state = ConnectionState::Error;

if let Some(waker) = &inner.waker {
waker.wake_by_ref();
if let Some(waker) = inner.waker.take() {
waker.wake();
}
}
});
socket.set_onerror(Some(error.as_ref().unchecked_ref()));
socket.set_onerror(Some(error_callback.as_ref().unchecked_ref()));

let close = Closure::<dyn FnMut(_)>::new({
let close_callback = Closure::<dyn FnMut(_)>::new({
let inner = inner.clone();
move |_| {
let mut inner = inner.lock().expect("Mutex is poised; qed");
inner.state = ConnectionState::Closed;

if let Some(waker) = &inner.waker {
waker.wake_by_ref();
if let Some(waker) = inner.waker.take() {
waker.wake();
}
}
});
socket.set_onclose(Some(close.as_ref().unchecked_ref()));

let callbacks = SendWrapper::new(Callbacks {
open,
message,
error,
close,
});
socket.set_onclose(Some(close_callback.as_ref().unchecked_ref()));

let callbacks = SendWrapper::new((
open_callback,
message_callback,
error_callback,
close_callback,
));
inner.lock().expect("Mutex poised; qed").callbacks = Some(callbacks);

Ok(Self { inner })
}

/// The state of the [`WasmSocket`].
pub fn state(&self) -> ConnectionState {
let inner = self.inner.lock().expect("Mutex is poised; qed");
inner.state
}
}

impl AsyncRead for WasmSocket {
Expand All @@ -175,7 +180,7 @@ impl AsyncRead for WasmSocket {

let n = inner.data.len().min(buf.len());
for k in buf.iter_mut().take(n) {
*k = inner.data.pop_front().unwrap();
*k = inner.data.pop_front().expect("Buffer non empty; qed");
}
Poll::Ready(Ok(n))
}
Expand Down Expand Up @@ -213,7 +218,7 @@ impl AsyncWrite for WasmSocket {
}

fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Pending
Poll::Ready(Ok(()))
}
}

Expand Down

0 comments on commit 2e660bb

Please sign in to comment.