Skip to content

Commit

Permalink
* move control commands support code to ServerCore so we can implemen…
Browse files Browse the repository at this point in the history
…t them in proxy server

* proxy server supports: "help", "hello" and "stop" commands (stop by proxied display)
* support multiple clients with all control commands (where appropriate)
* ensure we call "handle_command_request" from the UI thread since we may touch UI objects
* add "refresh" server command which does a full quality refresh of all windows

git-svn-id: https://xpra.org/svn/Xpra/trunk@5316 3bb7dfac-3a0b-4e04-842a-767bc560f471
  • Loading branch information
totaam committed Jan 30, 2014
1 parent f3c7aab commit 2b7b436
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 107 deletions.
31 changes: 26 additions & 5 deletions src/xpra/server/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ProxyServer(ServerCore):
"""

def __init__(self):
log("ProxyServer.__init__()")
debug("ProxyServer.__init__()")
ServerCore.__init__(self)
self._max_connections = MAX_CONCURRENT_CONNECTIONS
self.main_loop = None
Expand All @@ -49,14 +49,15 @@ def __init__(self):
self.timeout_add = gobject.timeout_add
self.source_remove = gobject.source_remove
self._socket_timeout = PROXY_SOCKET_TIMEOUT
self.control_commands = ["hello", "stop"]
#ensure we cache the platform info before intercepting SIGCHLD
#as this will cause a fork and SIGCHLD to be emitted:
from xpra.version_util import get_platform_info
get_platform_info()
signal.signal(signal.SIGCHLD, self.sigchld)

def init(self, opts):
log("ProxyServer.init(%s)", opts)
debug("ProxyServer.init(%s)", opts)
if not opts.auth:
raise Exception("The proxy server requires an authentication mode")
ServerCore.init(self, opts)
Expand All @@ -71,15 +72,35 @@ def do_run(self):
self.main_loop = gobject.MainLoop()
self.main_loop.run()

def do_handle_command_request(self, proto, command, args):
if command in ("help", "hello"):
return ServerCore.do_handle_command_request(self, proto, command, args)
assert command=="stop"
if len(args)!=1:
return ServerCore.control_command_response(self, proto, command, 4, "invalid number of arguments, usage: 'xpra control stop DISPLAY'")
display = args[0]
debug("stop command: will try to find proxy process for display %s", display)
for process, v in list(self.processes.items()):
disp,mq = v
if disp==display:
pid = process.pid
log.info("stop command: found process %s with pid %s for display %s, sending it 'stop' request", process, pid, display)
mq.put("stop")
return self.control_command_response(proto, command, 0, "stopped proxy process with pid %s" % pid)
return self.control_command_response(proto, command, 14, "no proxy found for display %s" % display)


def stop_all_proxies(self):
processes = self.processes
self.processes = {}
log("stop_all_proxies() will stop proxy processes: %s", processes)
debug("stop_all_proxies() will stop proxy processes: %s", processes)
for process, v in processes.items():
if not process.is_alive():
continue
disp,mq = v
log("stop_all_proxies() stopping process %s for display %s", process, disp)
debug("stop_all_proxies() stopping process %s for display %s", process, disp)
mq.put("stop")
log("stop_all_proxies() done")
debug("stop_all_proxies() done")

def cleanup(self):
self.stop_all_proxies()
Expand Down
205 changes: 103 additions & 102 deletions src/xpra/server/server_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ def __init__(self):
self.lossless_mode_encodings = []
self.default_encoding = None

#control mode:
self.control_commands = ["hello", "help",
"compression", "encoder", "refresh",
"sound-output",
"scaling",
"suspend", "resume", "name",
"client"]

self.init_encodings()
self.init_packet_handlers()
self.init_aliases()
Expand Down Expand Up @@ -443,10 +451,6 @@ def hello_oked(self, proto, packet, c, auth_caps):
if c.boolget("info_request", False):
self.send_hello_info(proto)
return
command_req = c.strlistget("command_request")
if len(command_req)>0:
self.handle_command_request(proto, command_req)
return

#"normal" connection, so log welcome message:
log.info("Handshake complete; enabling connection")
Expand Down Expand Up @@ -621,17 +625,8 @@ def send_hello(self, server_source, root_w, root_h, key_repeat, server_cipher):
server_source.hello(capabilities)


def handle_command_request(self, proto, args):
try:
self.do_handle_command_request(proto, args)
except Exception, e:
log.error("error processing command %s", args, exc_info=True)
proto.send_now(("hello", {"command_response" : (127, "error processing command: %s" % e)}))

def do_handle_command_request(self, proto, args):
assert len(args)>0
log("handle_command_request(%s, %s)", proto, args)
command = args[0]
def do_handle_command_request(self, proto, command, args):
log("handle_command_request(%s, %s, %s)", proto, command, args)
def respond(error=0, response=""):
log("command request response(%s)=%s", command, response)
hello = {"command_response" : (error, response)}
Expand All @@ -643,117 +638,123 @@ def arg_err(n, msg):
def success():
respond(0, "success")

commands = ("hello",
"compression", "encoder",
"sound-output",
"scaling",
"suspend", "resume", "name",
"client")
if command=="help":
return respond(0, "control supports: %s" % (", ".join(commands)))

if command not in commands:
return respond(6, "invalid command")

if command=="hello":
return respond(0, "hello")
#from here on, we assume the command applies to the
#current client connection, of which there must only be one:
sss = list(self._server_sources.items())
if len(sss)==0:
return respond(2, "no client connected")
elif len(sss)>1:
return respond(3, "more than one client connected")
cproto, csource = sss[0]

def may_forward_client_command(client_command):
if client_command[0] not in csource.control_commands:
log.info("not forwarded to client (not supported)")
return False
csource.send_client_command(*client_command)
return True

log("handle_command_request will apply to client: %s", csource)
if command=="compression":
if len(args)!=2:
return argn_err(2)
compression = args[1].lower()
sources = list(self._server_sources.values())
protos = list(self._server_sources.keys())
def forward_all_clients(client_command):
""" forwards the command to all clients """
for source in sources:
""" forwards to *the* client, if there is *one* """
if client_command[0] not in source.control_commands:
log.info("client command '%s' not forwarded to client %s (not supported)", client_command, source)
return False
source.send_client_command(*client_command)

#handle commands that either don't require a client,
#or can work on more than one connected client:
if command in ("help", "hello"):
#generic case:
return ServerCore.do_handle_command_request(self, proto, command, args)
elif command=="name":
if len(args)!=1:
return argn_err(1)
self.session_name = args[0]
log.info("changed session name: %s", self.session_name)
forward_all_clients(["name"])
return respond(0, "session name set")
elif command=="compression":
if len(args)!=1:
return argn_err(1)
compression = args[0].lower()
opts = ("lz4", "zlib")
if compression=="lz4":
cproto.enable_lz4()
may_forward_client_command(["enable_lz4"])
for cproto in protos:
cproto.enable_lz4()
forward_all_clients(["enable_lz4"])
return success()
elif compression=="zlib":
cproto.enable_zlib()
may_forward_client_command(["enable_zlib"])
for cproto in protos:
cproto.enable_zlib()
forward_all_clients(["enable_zlib"])
return success()
return arg_err(1, "must be one of: %s" % (", ".join(opts)))
elif command=="encoder":
if len(args)!=2:
return argn_err(2)
encoder = args[1].lower()
if len(args)!=1:
return argn_err(1)
encoder = args[0].lower()
opts = ("bencode", "rencode")
if encoder=="bencode":
cproto.enable_bencode()
may_forward_client_command(["enable_bencode"])
for cproto in protos:
cproto.enable_bencode()
forward_all_clients(["enable_bencode"])
return success()
elif encoder=="rencode":
cproto.enable_rencode()
may_forward_client_command(["enable_rencode"])
for cproto in protos:
cproto.enable_rencode()
forward_all_clients(["enable_rencode"])
return success()
return arg_err(1, "must be one of: %s" % (", ".join(opts)))
elif command=="sound-output":
if len(args)<2:
if len(args)<1:
return argn_err("more than 1")
msg = csource.sound_control(*args[1:])
return respond(0, msg)
msg = []
for csource in sources:
msg.append("%s : %s" % (csource, csource.sound_control(*args[1:])))
return respond(0, ", ".join(msg))
elif command=="suspend":
csource.suspend(True, self._id_to_window)
return respond(0, "suspended")
for csource in sources:
csource.suspend(True, self._id_to_window)
return respond(0, "suspended %s clients" % len(sources))
elif command=="resume":
csource.resume(True, self._id_to_window)
return respond(0, "resumed")
for csource in sources:
csource.resume(True, self._id_to_window)
return respond(0, "resumed %s clients" % len(sources))
elif command=="refresh":
widwin = list(self._id_to_window.items())
for wid, window in widwin:
for csource in sources:
csource.full_quality_refresh(wid, window, {})
return respond(0, "refreshed %s window for %s clients" % (len(widwin), len(sources)))
elif command=="scaling":
if len(args)!=3:
return argn_err(3)
if args[1]=="*":
wids = csource.window_sources.keys()
if len(args)!=2:
return argn_err(2)
from xpra.server.window_video_source import parse_scaling_value
try:
scaling = parse_scaling_value(args[1])
except:
return respond(11, "invalid scaling value %s" % args[1])
if args[0]=="*":
wids = list(self._id_to_window.keys())
else:
try:
wid = int(args[1])
csource.window_sources[wid]
wid = int(args[0])
window = self._id_to_window[wid]
wids = [wid]
except:
return respond(10, "cannot find window id %s" % args[1])
try:
from xpra.server.window_video_source import parse_scaling_value
scaling = parse_scaling_value(args[2])
except:
return respond(11, "invalid scaling value %s" % args[2])
for wid in wids:
window = self._id_to_window.get(wid)
if not window:
continue
ws = csource.window_sources.get(wid)
if ws:
ws.set_scaling(scaling)
csource.refresh(wid, window, {})
return respond(0, "scaling set to %s" % str(scaling))
elif command=="name":
if len(args)!=2:
return argn_err(1)
self.session_name = args[1]
log.info("changed session name: %s", self.session_name)
may_forward_client_command(["name"])
return respond(0, "session name set")
return respond(10, "cannot find window id %s" % args[0])

for csource in sources:
for wid in wids:
window = self._id_to_window.get(wid)
if not window:
continue
ws = csource.window_sources.get(wid)
if ws:
ws.set_scaling(scaling)
csource.refresh(wid, window, {})
return respond(0, "scaling set to %s on window %s for %s clients" % (str(scaling), args[0], len(sources)))
elif command=="client":
if len(args)<2:
return argn_err("at least 2")
client_command = args[1:]
if client_command[0] not in csource.control_commands:
return respond(12, "client does not support control command '%s'" % client_command[0])
if len(args)==0:
return argn_err("at least 1")
client_command = args
count = 0
for source in sources:
if client_command[0] in source.control_commands:
count += 1
source.send_client_command(*client_command)
else:
log.warn("client %s does not support client command %s", source, client_command[0])
csource.send_client_command(*client_command)
return respond(0, "client control command '%s' forwarded" % (client_command[0]))
return respond(0, "client control command '%s' forwarded to %s clients" % (client_command[0], count))
else:
return respond(9, "internal state error: invalid command '%s'", command)

Expand Down
34 changes: 34 additions & 0 deletions src/xpra/server/server_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ def __init__(self):
self.compression_level = 1
self.exit_with_client = False

#control mode:
self.control_commands = ["hello"]

self.init_packet_handlers()
self.init_aliases()

Expand Down Expand Up @@ -420,6 +423,11 @@ def _process_hello(self, proto, packet):
log.info("processing info request from %s", proto._conn)
self.send_hello_info(proto)
return
command_req = c.strlistget("command_request")
if len(command_req)>0:
#call from UI thread:
self.idle_add(self.handle_command_request, proto, command_req)
return
#continue processing hello packet:
try:
self.hello_oked(proto, packet, c, auth_caps)
Expand Down Expand Up @@ -533,6 +541,32 @@ def hello_oked(self, proto, packet, c, auth_caps):
pass


def control_command_response(self, proto, command, error=0, response=""):
log("control_command_response(%s)=%s", command, response)
hello = {"command_response" : (error, response)}
proto.send_now(("hello", hello))

def handle_command_request(self, proto, args):
try:
assert len(args)>0
command = args[0]
if command not in self.control_commands:
log.warn("invalid command: %s (must be one of: %s)", command, self.control_commands)
return self.control_command_response(proto, command, 6, "invalid command")
self.do_handle_command_request(proto, command, args[1:])
except Exception, e:
log.error("error processing command %s", args, exc_info=True)
proto.send_now(("hello", {"command_response" : (127, "error processing command: %s" % e)}))

def do_handle_command_request(self, proto, command, args):
log("handle_command_request(%s, %s, %s)", proto, command, args)
if command=="hello":
return self.control_command_response(proto, command, 0, "hello")
assert command=="help"
return self.control_command_response(proto, command, 0,
"control supports: %s" % (", ".join(self.control_commands)))


def accept_client(self, proto, c):
#max packet size from client (the biggest we can get are clipboard packets)
proto.max_packet_size = 1024*1024 #1MB
Expand Down
7 changes: 7 additions & 0 deletions src/xpra/server/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,13 @@ def set_speed(self, speed):
elog("set_speed(%s) prev_speed=%s, default_encoding_options=%s", speed, prev_speed, self.default_encoding_options)
self.reconfigure(force_reload=(speed>99 and prev_speed<=99) or (speed<=99 and prev_speed>99))

def full_quality_refresh(self, wid, window, damage_options):
if not self.can_send_window(window):
return
ws = self.window_sources.get(wid)
if ws:
ws.full_quality_refresh(window, damage_options)

def refresh(self, wid, window, opts):
if not self.can_send_window(window):
return
Expand Down

0 comments on commit 2b7b436

Please sign in to comment.