Skip to content

Commit

Permalink
Enforce IP addresses for actors
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmitterdorfer committed Jun 28, 2017
1 parent b147050 commit 57c4c2d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
19 changes: 19 additions & 0 deletions esrally/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,21 @@ def use_offline_actor_system():
__SYSTEM_BASE = "multiprocQueueBase"


def resolve(hostname_or_ip):
if hostname_or_ip and hostname_or_ip.startswith("127"):
return hostname_or_ip

import socket
addrinfo = socket.getaddrinfo(hostname_or_ip, 22, 0, 0, socket.IPPROTO_TCP)
for family, socktype, proto, canonname, sockaddr in addrinfo:
# we're interested in the IPv4 address
if family == socket.AddressFamily.AF_INET:
ip, _ = sockaddr
if ip[:3] != "127":
return ip
return None


def bootstrap_actor_system(try_join=False, prefer_local_only=False, local_ip=None, coordinator_ip=None):
system_base = __SYSTEM_BASE
try:
Expand All @@ -178,6 +193,10 @@ def bootstrap_actor_system(try_join=False, prefer_local_only=False, local_ip=Non
raise exceptions.SystemSetupError("coordinator IP is required")
if not local_ip:
raise exceptions.SystemSetupError("local IP is required")
# always resolve the public IP here, even if a DNS name is given. Otherwise Thespian will be unhappy
local_ip = resolve(local_ip)
coordinator_ip = resolve(coordinator_ip)

coordinator = local_ip == coordinator_ip

capabilities = {"coordinator": coordinator}
Expand Down
17 changes: 9 additions & 8 deletions esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ def receiveMessage(self, msg, sender):
if len(hosts) == 0:
raise exceptions.LaunchError("No target hosts are configured.")
for host in hosts:
ip = host["host"]
host_or_ip = host["host"]
port = int(host["port"])
# user may specify "localhost" on the command line but the problem is that we auto-register the actor system
# with "ip": "127.0.0.1" so we convert this special case automatically. In all other cases the user needs to
# start the actor system on the other host and is aware that the parameter for the actor system and the
# --target-hosts parameter need to match.
if ip == "localhost" or ip == "127.0.0.1":
if host_or_ip == "localhost" or host_or_ip == "127.0.0.1":
m = self.createActor(LocalNodeMechanicActor,
globalName="/rally/mechanic/worker/localhost",
targetActorRequirements={"coordinator": True})
Expand All @@ -161,18 +161,19 @@ def receiveMessage(self, msg, sender):
else:
logger.error("User tried to benchmark against %s but no external Rally daemon has been started." % hosts)
raise exceptions.SystemSetupError("To benchmark remote hosts (e.g. %s) you need to start the Rally daemon "
"on each machine including this one." % ip)
already_running = actor.actor_system_already_running(ip=ip)
logger.info("Actor system on [%s] already running? [%s]" % (ip, str(already_running)))
"on each machine including this one." % host_or_ip)
already_running = actor.actor_system_already_running(ip=host_or_ip)
logger.info("Actor system on [%s] already running? [%s]" % (host_or_ip, str(already_running)))
if not already_running:
console.println("Waiting for Rally daemon on [%s] " % ip, end="", flush=True)
while not actor.actor_system_already_running(ip=ip):
console.println("Waiting for Rally daemon on [%s] " % host_or_ip, end="", flush=True)
while not actor.actor_system_already_running(ip=host_or_ip):
console.println(".", end="", flush=True)
time.sleep(3)
if not already_running:
console.println(" [OK]")
ip = actor.resolve(host_or_ip)
m = self.createActor(RemoteNodeMechanicActor,
globalName="/rally/mechanic/worker/%s" % ip,
globalName="/rally/mechanic/worker/%s" % host_or_ip,
targetActorRequirements={"ip": ip})
mechanics_and_start_message.append((m, msg.with_port(port)))
self.mechanics.append(m)
Expand Down

0 comments on commit 57c4c2d

Please sign in to comment.