Skip to content

Commit

Permalink
Add recipient check to support multiple clients on the same WS stream
Browse files Browse the repository at this point in the history
  • Loading branch information
fonsp authored and pankgeorg committed Nov 8, 2021
1 parent 5bf0145 commit 58fc26e
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 8 deletions.
8 changes: 6 additions & 2 deletions frontend/common/PlutoConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ const create_vscode_connection = (address, { on_message, on_socket_close }, time
last_task = last_task.then(async () => {
try {
const raw = event.data // The json-encoded data that the extension sent
console.log("raw", raw)
if (raw.type === "ws_proxy") {
const buffer = await decode_base64_to_arraybuffer(raw.base64_encoded)
const message = unpack(new Uint8Array(buffer))
Expand Down Expand Up @@ -400,9 +399,14 @@ export const create_pluto_connection = async ({
try {
ws_connection = await (vscode_available ? create_vscode_connection : create_ws_connection)(String(ws_address), {
on_message: (update) => {
const by_me = update.initiator_id == client_id
const for_me = update.recipient_id === client_id
const by_me = update.initiator_id === client_id
const request_id = update.request_id

if (!for_me) {
return
}

if (by_me && request_id) {
const request = sent_requests.get(request_id)
if (request) {
Expand Down
1 change: 0 additions & 1 deletion frontend/components/Editor.js
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,6 @@ export class Editor extends Component {
return await this.actions.add_remote_cell_at(index + delta, code)
},
confirm_delete_multiple: async (verb, cell_ids) => {
console.log(confirm)
if (cell_ids.length <= 1 || (await confirm(`${verb} ${cell_ids.length} cells?`))) {
if (cell_ids.some((cell_id) => this.state.notebook.cell_results[cell_id].running || this.state.notebook.cell_results[cell_id].queued)) {
if (await confirm('This cell is still running - would you like to interrupt the notebook?')) {
Expand Down
3 changes: 3 additions & 0 deletions src/webserver/Dynamic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ Update the local state of all clients connected to this notebook.
"""
function send_notebook_changes!(πŸ™‹::ClientRequest; commentary::Any=nothing)
notebook_dict = notebook_to_js(πŸ™‹.notebook)
@info "Connected clients" length(πŸ™‹.session.connected_clients) Set(c -> c.stream for c in πŸ™‹.session.connected_clients)
for (_, client) in πŸ™‹.session.connected_clients
if client.connected_notebook !== nothing && client.connected_notebook.notebook_id == πŸ™‹.notebook.notebook_id
current_dict = get(current_state_for_clients, client, :empty)
Expand All @@ -186,6 +187,7 @@ function send_notebook_changes!(πŸ™‹::ClientRequest; commentary::Any=nothing)

# Make sure we do send a confirmation to the client who made the request, even without changes
is_response = πŸ™‹.initiator !== nothing && client == πŸ™‹.initiator.client
@info "Responding" is_response (πŸ™‹.initiator !== nothing) (commentary === nothing)

if !isempty(patches) || is_response
response = Dict(
Expand Down Expand Up @@ -279,6 +281,7 @@ responses[:update_notebook] = function response_update_notebook(πŸ™‹::ClientRequ
patches = (Base.convert(Firebasey.JSONPatch, update) for update in πŸ™‹.body["updates"])

if length(patches) == 0
@info "Empty patches"
send_notebook_changes!(πŸ™‹)
return nothing
end
Expand Down
14 changes: 9 additions & 5 deletions src/webserver/PutUpdates.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@

function serialize_message_to_stream(io::IO, message::UpdateMessage)
to_send = Dict(:type => message.type, :message => message.message)
function serialize_message_to_stream(io::IO, message::UpdateMessage, recipient::ClientSession)
to_send = Dict{Symbol,Any}(
:type => message.type,
:message => message.message,
:recipient_id => recipient.id,
)
if message.notebook !== nothing
to_send[:notebook_id] = message.notebook.notebook_id
end
Expand All @@ -15,8 +19,8 @@ function serialize_message_to_stream(io::IO, message::UpdateMessage)
pack(io, to_send)
end

function serialize_message(message::UpdateMessage)
sprint(serialize_message_to_stream, message)
function serialize_message(message::UpdateMessage, recipient::ClientSession)
sprint(serialize_message_to_stream, message, recipient)
end

"Send `messages` to all clients connected to the `notebook`."
Expand Down Expand Up @@ -76,7 +80,7 @@ function flushclient(client::ClientSession)
if client.stream isa HTTP.WebSockets.WebSocket
client.stream.frame_type = HTTP.WebSockets.WS_BINARY
end
write(client.stream, serialize_message(next_to_send))
write(client.stream, serialize_message(next_to_send, client))
else
put!(flushtoken)
return false
Expand Down

0 comments on commit 58fc26e

Please sign in to comment.