Skip to content

Commit

Permalink
Replace zmq sockets with one DEALER-ROUTER socket
Browse files Browse the repository at this point in the history
The PUSH and PULL sockets being used caused hatch messages to get routed
to slaves that may have become unresponsive or crashed. This change
includes the client id in the messages sent out from the master which
ensures that hatch messages are going to slaves the are READY or
RUNNING.

This should also fix the issue #911 where slaves are not receiving the
stop message. I think these issues are a result of PUSH-PULL sockets
using a round robin approach.
  • Loading branch information
Jonathan McCall committed Dec 10, 2018
1 parent 79cd5a1 commit 3d4b927
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 25 deletions.
43 changes: 26 additions & 17 deletions locust/rpc/zmqrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,39 @@


class BaseSocket(object):
def __init__(self, sock_type):
context = zmq.Context()
self.socket = context.socket(sock_type)

def send(self, msg):
self.sender.send(msg.serialize())
self.socket.send(msg.serialize())

def send_multipart(self, client_id, msg):
print 'sending %s to %s' % (msg, client_id)
self.socket.send_multipart([client_id, msg.serialize()])

def recv(self):
data = self.receiver.recv()
return Message.unserialize(data)
data = self.socket.recv()
msg = Message.unserialize(data)
return msg

def recv_multipart(self):
try:
data = self.socket.recv_multipart()
addr = data[0]
msg = Message.unserialize(data[1])
return addr, msg
except Exception:
raise

class Server(BaseSocket):
def __init__(self, host, port):
context = zmq.Context()
self.receiver = context.socket(zmq.PULL)
self.receiver.bind("tcp://%s:%i" % (host, port))

self.sender = context.socket(zmq.PUSH)
self.sender.bind("tcp://%s:%i" % (host, port+1))

BaseSocket.__init__(self, zmq.ROUTER)
self.socket.bind("tcp://%s:%i" % (host, port))

class Client(BaseSocket):
def __init__(self, host, port):
context = zmq.Context()
self.receiver = context.socket(zmq.PULL)
self.receiver.connect("tcp://%s:%i" % (host, port+1))

self.sender = context.socket(zmq.PUSH)
self.sender.connect("tcp://%s:%i" % (host, port))
def __init__(self, host, port, identity):
BaseSocket.__init__(self, zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, identity)
self.socket.connect("tcp://%s:%i" % (host, port))

21 changes: 13 additions & 8 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ class SlaveNodesDict(dict):
def get_by_state(self, state):
return [c for c in six.itervalues(self) if c.state == state]

@property
def all(self):
return six.itervalues(self)

@property
def ready(self):
return self.get_by_state(STATE_INIT)
Expand Down Expand Up @@ -286,7 +290,7 @@ def start_hatching(self, locust_count, hatch_rate):
self.exceptions = {}
events.master_start_hatching.fire()

for client in six.itervalues(self.clients):
for client in (self.clients.ready + self.clients.running):
data = {
"hatch_rate":slave_hatch_rate,
"num_clients":slave_num_clients,
Expand All @@ -298,24 +302,25 @@ def start_hatching(self, locust_count, hatch_rate):
data["num_clients"] += 1
remaining -= 1

self.server.send(Message("hatch", data, None))
self.server.send_multipart(client.id, Message("hatch", data, None))

self.stats.start_time = time()
self.state = STATE_HATCHING

def stop(self):
for client in self.clients.hatching + self.clients.running:
self.server.send(Message("stop", None, None))
for client in self.clients.all:
self.server.send_multipart(client.id, Message("stop", None, None))
events.master_stop_hatching.fire()

def quit(self):
for client in six.itervalues(self.clients):
self.server.send(Message("quit", None, None))
for client in self.clients.all:
self.server.send_multipart(client.id, Message("quit", None, None))
self.greenlet.kill(block=True)

def client_listener(self):
while True:
msg = self.server.recv()
client_id, msg = self.server.recv_multipart()
msg.node_id = client_id
if msg.type == "client_ready":
id = msg.node_id
self.clients[id] = SlaveNode(id)
Expand Down Expand Up @@ -354,7 +359,7 @@ def __init__(self, *args, **kwargs):
super(SlaveLocustRunner, self).__init__(*args, **kwargs)
self.client_id = socket.gethostname() + "_" + uuid4().hex

self.client = rpc.Client(self.master_host, self.master_port)
self.client = rpc.Client(self.master_host, self.master_port, self.client_id)
self.greenlet = Group()

self.greenlet.spawn(self.worker).link_exception(callback=self.noop)
Expand Down

0 comments on commit 3d4b927

Please sign in to comment.