diff --git a/src/xpra/server/proxy_server.py b/src/xpra/server/proxy_server.py index 1cc1ff4f55..15e3b89dab 100644 --- a/src/xpra/server/proxy_server.py +++ b/src/xpra/server/proxy_server.py @@ -37,7 +37,7 @@ class ProxyServer(ServerCore): """ def __init__(self): - log("ProxyServer.__init__()") + debug("ProxyServer.__init__()") ServerCore.__init__(self) self._max_connections = MAX_CONCURRENT_CONNECTIONS self.main_loop = None @@ -49,6 +49,7 @@ def __init__(self): self.timeout_add = gobject.timeout_add self.source_remove = gobject.source_remove self._socket_timeout = PROXY_SOCKET_TIMEOUT + self.control_commands = ["hello", "stop"] #ensure we cache the platform info before intercepting SIGCHLD #as this will cause a fork and SIGCHLD to be emitted: from xpra.version_util import get_platform_info @@ -56,7 +57,7 @@ def __init__(self): signal.signal(signal.SIGCHLD, self.sigchld) def init(self, opts): - log("ProxyServer.init(%s)", opts) + debug("ProxyServer.init(%s)", opts) if not opts.auth: raise Exception("The proxy server requires an authentication mode") ServerCore.init(self, opts) @@ -71,15 +72,35 @@ def do_run(self): self.main_loop = gobject.MainLoop() self.main_loop.run() + def do_handle_command_request(self, proto, command, args): + if command in ("help", "hello"): + return ServerCore.do_handle_command_request(self, proto, command, args) + assert command=="stop" + if len(args)!=1: + return ServerCore.control_command_response(self, proto, command, 4, "invalid number of arguments, usage: 'xpra control stop DISPLAY'") + display = args[0] + debug("stop command: will try to find proxy process for display %s", display) + for process, v in list(self.processes.items()): + disp,mq = v + if disp==display: + pid = process.pid + log.info("stop command: found process %s with pid %s for display %s, sending it 'stop' request", process, pid, display) + mq.put("stop") + return self.control_command_response(proto, command, 0, "stopped proxy process with pid %s" % pid) + return self.control_command_response(proto, command, 14, "no proxy found for display %s" % display) + + def stop_all_proxies(self): processes = self.processes self.processes = {} - log("stop_all_proxies() will stop proxy processes: %s", processes) + debug("stop_all_proxies() will stop proxy processes: %s", processes) for process, v in processes.items(): + if not process.is_alive(): + continue disp,mq = v - log("stop_all_proxies() stopping process %s for display %s", process, disp) + debug("stop_all_proxies() stopping process %s for display %s", process, disp) mq.put("stop") - log("stop_all_proxies() done") + debug("stop_all_proxies() done") def cleanup(self): self.stop_all_proxies() diff --git a/src/xpra/server/server_base.py b/src/xpra/server/server_base.py index 174be49553..a3b5aa1b1e 100644 --- a/src/xpra/server/server_base.py +++ b/src/xpra/server/server_base.py @@ -76,6 +76,14 @@ def __init__(self): self.lossless_mode_encodings = [] self.default_encoding = None + #control mode: + self.control_commands = ["hello", "help", + "compression", "encoder", "refresh", + "sound-output", + "scaling", + "suspend", "resume", "name", + "client"] + self.init_encodings() self.init_packet_handlers() self.init_aliases() @@ -443,10 +451,6 @@ def hello_oked(self, proto, packet, c, auth_caps): if c.boolget("info_request", False): self.send_hello_info(proto) return - command_req = c.strlistget("command_request") - if len(command_req)>0: - self.handle_command_request(proto, command_req) - return #"normal" connection, so log welcome message: log.info("Handshake complete; enabling connection") @@ -621,17 +625,8 @@ def send_hello(self, server_source, root_w, root_h, key_repeat, server_cipher): server_source.hello(capabilities) - def handle_command_request(self, proto, args): - try: - self.do_handle_command_request(proto, args) - except Exception, e: - log.error("error processing command %s", args, exc_info=True) - proto.send_now(("hello", {"command_response" : (127, "error processing command: %s" % e)})) - - def do_handle_command_request(self, proto, args): - assert len(args)>0 - log("handle_command_request(%s, %s)", proto, args) - command = args[0] + def do_handle_command_request(self, proto, command, args): + log("handle_command_request(%s, %s, %s)", proto, command, args) def respond(error=0, response=""): log("command request response(%s)=%s", command, response) hello = {"command_response" : (error, response)} @@ -643,117 +638,123 @@ def arg_err(n, msg): def success(): respond(0, "success") - commands = ("hello", - "compression", "encoder", - "sound-output", - "scaling", - "suspend", "resume", "name", - "client") - if command=="help": - return respond(0, "control supports: %s" % (", ".join(commands))) - - if command not in commands: - return respond(6, "invalid command") - - if command=="hello": - return respond(0, "hello") - #from here on, we assume the command applies to the - #current client connection, of which there must only be one: - sss = list(self._server_sources.items()) - if len(sss)==0: - return respond(2, "no client connected") - elif len(sss)>1: - return respond(3, "more than one client connected") - cproto, csource = sss[0] - - def may_forward_client_command(client_command): - if client_command[0] not in csource.control_commands: - log.info("not forwarded to client (not supported)") - return False - csource.send_client_command(*client_command) - return True - - log("handle_command_request will apply to client: %s", csource) - if command=="compression": - if len(args)!=2: - return argn_err(2) - compression = args[1].lower() + sources = list(self._server_sources.values()) + protos = list(self._server_sources.keys()) + def forward_all_clients(client_command): + """ forwards the command to all clients """ + for source in sources: + """ forwards to *the* client, if there is *one* """ + if client_command[0] not in source.control_commands: + log.info("client command '%s' not forwarded to client %s (not supported)", client_command, source) + return False + source.send_client_command(*client_command) + + #handle commands that either don't require a client, + #or can work on more than one connected client: + if command in ("help", "hello"): + #generic case: + return ServerCore.do_handle_command_request(self, proto, command, args) + elif command=="name": + if len(args)!=1: + return argn_err(1) + self.session_name = args[0] + log.info("changed session name: %s", self.session_name) + forward_all_clients(["name"]) + return respond(0, "session name set") + elif command=="compression": + if len(args)!=1: + return argn_err(1) + compression = args[0].lower() opts = ("lz4", "zlib") if compression=="lz4": - cproto.enable_lz4() - may_forward_client_command(["enable_lz4"]) + for cproto in protos: + cproto.enable_lz4() + forward_all_clients(["enable_lz4"]) return success() elif compression=="zlib": - cproto.enable_zlib() - may_forward_client_command(["enable_zlib"]) + for cproto in protos: + cproto.enable_zlib() + forward_all_clients(["enable_zlib"]) return success() return arg_err(1, "must be one of: %s" % (", ".join(opts))) elif command=="encoder": - if len(args)!=2: - return argn_err(2) - encoder = args[1].lower() + if len(args)!=1: + return argn_err(1) + encoder = args[0].lower() opts = ("bencode", "rencode") if encoder=="bencode": - cproto.enable_bencode() - may_forward_client_command(["enable_bencode"]) + for cproto in protos: + cproto.enable_bencode() + forward_all_clients(["enable_bencode"]) return success() elif encoder=="rencode": - cproto.enable_rencode() - may_forward_client_command(["enable_rencode"]) + for cproto in protos: + cproto.enable_rencode() + forward_all_clients(["enable_rencode"]) return success() return arg_err(1, "must be one of: %s" % (", ".join(opts))) elif command=="sound-output": - if len(args)<2: + if len(args)<1: return argn_err("more than 1") - msg = csource.sound_control(*args[1:]) - return respond(0, msg) + msg = [] + for csource in sources: + msg.append("%s : %s" % (csource, csource.sound_control(*args[1:]))) + return respond(0, ", ".join(msg)) elif command=="suspend": - csource.suspend(True, self._id_to_window) - return respond(0, "suspended") + for csource in sources: + csource.suspend(True, self._id_to_window) + return respond(0, "suspended %s clients" % len(sources)) elif command=="resume": - csource.resume(True, self._id_to_window) - return respond(0, "resumed") + for csource in sources: + csource.resume(True, self._id_to_window) + return respond(0, "resumed %s clients" % len(sources)) + elif command=="refresh": + widwin = list(self._id_to_window.items()) + for wid, window in widwin: + for csource in sources: + csource.full_quality_refresh(wid, window, {}) + return respond(0, "refreshed %s window for %s clients" % (len(widwin), len(sources))) elif command=="scaling": - if len(args)!=3: - return argn_err(3) - if args[1]=="*": - wids = csource.window_sources.keys() + if len(args)!=2: + return argn_err(2) + from xpra.server.window_video_source import parse_scaling_value + try: + scaling = parse_scaling_value(args[1]) + except: + return respond(11, "invalid scaling value %s" % args[1]) + if args[0]=="*": + wids = list(self._id_to_window.keys()) else: try: - wid = int(args[1]) - csource.window_sources[wid] + wid = int(args[0]) + window = self._id_to_window[wid] wids = [wid] except: - return respond(10, "cannot find window id %s" % args[1]) - try: - from xpra.server.window_video_source import parse_scaling_value - scaling = parse_scaling_value(args[2]) - except: - return respond(11, "invalid scaling value %s" % args[2]) - for wid in wids: - window = self._id_to_window.get(wid) - if not window: - continue - ws = csource.window_sources.get(wid) - if ws: - ws.set_scaling(scaling) - csource.refresh(wid, window, {}) - return respond(0, "scaling set to %s" % str(scaling)) - elif command=="name": - if len(args)!=2: - return argn_err(1) - self.session_name = args[1] - log.info("changed session name: %s", self.session_name) - may_forward_client_command(["name"]) - return respond(0, "session name set") + return respond(10, "cannot find window id %s" % args[0]) + + for csource in sources: + for wid in wids: + window = self._id_to_window.get(wid) + if not window: + continue + ws = csource.window_sources.get(wid) + if ws: + ws.set_scaling(scaling) + csource.refresh(wid, window, {}) + return respond(0, "scaling set to %s on window %s for %s clients" % (str(scaling), args[0], len(sources))) elif command=="client": - if len(args)<2: - return argn_err("at least 2") - client_command = args[1:] - if client_command[0] not in csource.control_commands: - return respond(12, "client does not support control command '%s'" % client_command[0]) + if len(args)==0: + return argn_err("at least 1") + client_command = args + count = 0 + for source in sources: + if client_command[0] in source.control_commands: + count += 1 + source.send_client_command(*client_command) + else: + log.warn("client %s does not support client command %s", source, client_command[0]) csource.send_client_command(*client_command) - return respond(0, "client control command '%s' forwarded" % (client_command[0])) + return respond(0, "client control command '%s' forwarded to %s clients" % (client_command[0], count)) else: return respond(9, "internal state error: invalid command '%s'", command) diff --git a/src/xpra/server/server_core.py b/src/xpra/server/server_core.py index c5f36dae6a..c819f82bcd 100644 --- a/src/xpra/server/server_core.py +++ b/src/xpra/server/server_core.py @@ -119,6 +119,9 @@ def __init__(self): self.compression_level = 1 self.exit_with_client = False + #control mode: + self.control_commands = ["hello"] + self.init_packet_handlers() self.init_aliases() @@ -420,6 +423,11 @@ def _process_hello(self, proto, packet): log.info("processing info request from %s", proto._conn) self.send_hello_info(proto) return + command_req = c.strlistget("command_request") + if len(command_req)>0: + #call from UI thread: + self.idle_add(self.handle_command_request, proto, command_req) + return #continue processing hello packet: try: self.hello_oked(proto, packet, c, auth_caps) @@ -533,6 +541,32 @@ def hello_oked(self, proto, packet, c, auth_caps): pass + def control_command_response(self, proto, command, error=0, response=""): + log("control_command_response(%s)=%s", command, response) + hello = {"command_response" : (error, response)} + proto.send_now(("hello", hello)) + + def handle_command_request(self, proto, args): + try: + assert len(args)>0 + command = args[0] + if command not in self.control_commands: + log.warn("invalid command: %s (must be one of: %s)", command, self.control_commands) + return self.control_command_response(proto, command, 6, "invalid command") + self.do_handle_command_request(proto, command, args[1:]) + except Exception, e: + log.error("error processing command %s", args, exc_info=True) + proto.send_now(("hello", {"command_response" : (127, "error processing command: %s" % e)})) + + def do_handle_command_request(self, proto, command, args): + log("handle_command_request(%s, %s, %s)", proto, command, args) + if command=="hello": + return self.control_command_response(proto, command, 0, "hello") + assert command=="help" + return self.control_command_response(proto, command, 0, + "control supports: %s" % (", ".join(self.control_commands))) + + def accept_client(self, proto, c): #max packet size from client (the biggest we can get are clipboard packets) proto.max_packet_size = 1024*1024 #1MB diff --git a/src/xpra/server/source.py b/src/xpra/server/source.py index 8737c1ec18..0e79974ff5 100644 --- a/src/xpra/server/source.py +++ b/src/xpra/server/source.py @@ -1362,6 +1362,13 @@ def set_speed(self, speed): elog("set_speed(%s) prev_speed=%s, default_encoding_options=%s", speed, prev_speed, self.default_encoding_options) self.reconfigure(force_reload=(speed>99 and prev_speed<=99) or (speed<=99 and prev_speed>99)) + def full_quality_refresh(self, wid, window, damage_options): + if not self.can_send_window(window): + return + ws = self.window_sources.get(wid) + if ws: + ws.full_quality_refresh(window, damage_options) + def refresh(self, wid, window, opts): if not self.can_send_window(window): return