Skip to content

Commit

Permalink
flush_then_close improvements
Browse files Browse the repository at this point in the history
* make it possible to call with an already formatted packet
* use it to send a websocket close packet on shutdown
plus some cosmetic changes
  • Loading branch information
totaam committed Nov 9, 2022
1 parent 89e9282 commit 08bd0d2
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 21 deletions.
37 changes: 21 additions & 16 deletions xpra/net/protocol/socket_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def start_network_read_thread():


def send_disconnect(self, reasons, done_callback=None):
self.flush_then_close(["disconnect"]+list(reasons), done_callback=done_callback)
self.flush_then_close(self.encode, ["disconnect"]+list(reasons), done_callback=done_callback)

def send_now(self, packet):
if self._closed:
Expand Down Expand Up @@ -1120,7 +1120,7 @@ def debug_str(s):
self._process_packet_cb(self, packet)
packet = None

def flush_then_close(self, last_packet, done_callback=None): #pylint: disable=method-hidden
def flush_then_close(self, encoder=None, last_packet=None, done_callback=None): #pylint: disable=method-hidden
""" Note: this is best effort only
the packet may not get sent.
Expand All @@ -1130,11 +1130,11 @@ def flush_then_close(self, last_packet, done_callback=None): #pylint: disable
we wait again for the queue to flush,
then no matter what, we close the connection and stop the threads.
"""
def closing_already(last_packet, done_callback=None):
def closing_already(encoder, last_packet, done_callback=None):
log("flush_then_close%s had already been called, this new request has been ignored",
(last_packet, done_callback))
(encoder, last_packet, done_callback))
self.flush_then_close = closing_already
log("flush_then_close(%s, %s) closed=%s", last_packet, done_callback, self._closed)
log("flush_then_close%s closed=%s", (encoder, last_packet, done_callback), self._closed)
def done():
log("flush_then_close: done, callback=%s", done_callback)
if done_callback:
Expand All @@ -1151,24 +1151,25 @@ def wait_for_queue(timeout=10):
log("flush_then_close: queue still busy, closing without sending the last packet")
try:
self._write_lock.release()
except Exception:
pass
self.close()
done()
finally:
self.close()
done()
else:
log("flush_then_close: still waiting for queue to flush")
self.timeout_add(100, wait_for_queue, timeout-1)
else:
if not last_packet:
self.close()
done()
return
log("flush_then_close: queue is now empty, sending the last packet and closing")
chunks = self.encode(last_packet)
def close_and_release():
log("flush_then_close: wait_for_packet_sent() close_and_release()")
self.close()
try:
self._write_lock.release()
except Exception:
pass
done()
finally:
done()
def wait_for_packet_sent():
log("flush_then_close: wait_for_packet_sent() queue.empty()=%s, closed=%s",
self._write_queue.empty(), self._closed)
Expand All @@ -1183,9 +1184,13 @@ def packet_queued(*_args):
if wait_for_packet_sent():
#check again every 100ms
self.timeout_add(100, wait_for_packet_sent)
self._add_chunks_to_queue(last_packet[0], chunks,
start_send_cb=None, end_send_cb=packet_queued,
synchronous=False, more=False)
if encoder:
chunks = encoder(last_packet)
self._add_chunks_to_queue(last_packet[0], chunks,
start_send_cb=None, end_send_cb=packet_queued,
synchronous=False, more=False)
else:
self.raw_write("flush-then-close", (last_packet, ))
#just in case wait_for_packet_sent never fires:
self.timeout_add(5*1000, close_and_release)

Expand Down
22 changes: 17 additions & 5 deletions xpra/net/websockets/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,23 @@ def __init__(self, *args, **kwargs):
self.make_frame_header = self.make_wsframe_header

def __repr__(self):
return "WebSocket(%s)" % self._conn
return f"WebSocket({self._conn})"

def close(self, message=None):
if not self._closed:
pass
super().close(message)
self.ws_data = b""
self.ws_payload = []

def send_ws_close(self, code=1000, reason="closing"):
data = struct.pack("!H", code)
if reason:
#should validate that length is less than 125
data += reason.encode("utf-8")
header = encode_hybi_header(OPCODE_CLOSE, len(data), has_mask=False, fin=True)
self.flush_then_close(None, header+data)


def make_wsframe_header(self, packet_type, items):
payload_len = sum(len(item) for item in items)
Expand Down Expand Up @@ -106,19 +116,21 @@ def parse_ws_frame(self, buf):
self.ws_payload_opcode = 0
else:
if self.ws_payload and self.ws_payload_opcode:
raise Exception("expected a continuation frame not %s" % OPCODES.get(opcode, opcode))
op = OPCODES.get(opcode, opcode)
raise Exception(f"expected a continuation frame not {op}")
full_payload = payload
if not fin:
if opcode not in (OPCODE_BINARY, OPCODE_TEXT):
raise Exception("cannot handle fragmented '%s' frames" % OPCODES.get(opcode, opcode))
op = OPCODES.get(opcode, opcode)
raise RuntimeError(f"cannot handle fragmented {op} frames")
#fragmented, keep this payload for later
self.ws_payload_opcode = opcode
self.ws_payload.append(payload)
continue
if opcode==OPCODE_BINARY:
self._read_queue_put(full_payload)
elif opcode==OPCODE_TEXT:
if first_time("ws-text-frame-from-%s" % self._conn):
if first_time(f"ws-text-frame-from-{self._conn}"):
log.warn("Warning: handling text websocket frame as binary")
self._read_queue_put(full_payload)
elif opcode==OPCODE_CLOSE:
Expand All @@ -128,7 +140,7 @@ def parse_ws_frame(self, buf):
elif opcode==OPCODE_PONG:
self._process_ws_pong(full_payload)
else:
log.warn("Warning unhandled websocket opcode '%s'", OPCODES.get(opcode, "%#x" % opcode))
log.warn("Warning unhandled websocket opcode '%s'", OPCODES.get(opcode, f"{opcode:x}"))
log("payload=%r", payload)

def _process_ws_ping(self, payload):
Expand Down

0 comments on commit 08bd0d2

Please sign in to comment.