Skip to content

Commit

Permalink
Fix bug that occurs while decreasing number of running users if the u…
Browse files Browse the repository at this point in the history
…sers are still hatching. #1168

Removed LocustRunner.num_clients in favour of using LocustRunner.user_count which should be less prone to errors (since it’s the actual number of locust greenlets). Introduced MasterLocustRunner.target_user_count which is needed so that we can rebalance locust users across the connected slaves while users are till hatching.
  • Loading branch information
heyman committed Jan 23, 2020
1 parent 015f1b6 commit 576f7ea
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 33 deletions.
63 changes: 30 additions & 33 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def __init__(self, locust_classes, options):
self.options = options
self.locust_classes = locust_classes
self.hatch_rate = options.hatch_rate
self.num_clients = options.num_clients
self.host = options.host
self.locusts = Group()
self.greenlet = Group()
Expand Down Expand Up @@ -118,27 +117,25 @@ def weight_locusts(self, amount):

return bucket

def spawn_locusts(self, spawn_count=None, wait=False):
if spawn_count is None:
spawn_count = self.num_clients

def spawn_locusts(self, spawn_count, wait=False):
bucket = self.weight_locusts(spawn_count)
spawn_count = len(bucket)
if self.state == STATE_INIT or self.state == STATE_STOPPED:
self.state = STATE_HATCHING
self.num_clients = spawn_count
else:
self.num_clients += spawn_count

logger.info("Hatching and swarming %i clients at the rate %g clients/s..." % (spawn_count, self.hatch_rate))

existing_count = len(self.locusts)
logger.info("Hatching and swarming %i users at the rate %g users/s (%i users already running)..." % (spawn_count, self.hatch_rate, existing_count))
occurrence_count = dict([(l.__name__, 0) for l in self.locust_classes])

def hatch():
sleep_time = 1.0 / self.hatch_rate
while True:
if not bucket:
logger.info("All locusts hatched: %s" % ", ".join(["%s: %d" % (name, count) for name, count in six.iteritems(occurrence_count)]))
events.hatch_complete.fire(user_count=self.num_clients)
logger.info("All locusts hatched: %s (%i already running)" % (
", ".join(["%s: %d" % (name, count) for name, count in six.iteritems(occurrence_count)]),
existing_count,
))
events.hatch_complete.fire(user_count=len(self.locusts))
return

locust = bucket.pop(random.randint(0, len(bucket)-1))
Expand Down Expand Up @@ -166,7 +163,6 @@ def kill_locusts(self, kill_count):
"""
bucket = self.weight_locusts(kill_count)
kill_count = len(bucket)
self.num_clients -= kill_count
logger.info("Killing %i locusts" % kill_count)
dying = []
for g in self.locusts:
Expand All @@ -176,7 +172,7 @@ def kill_locusts(self, kill_count):
bucket.remove(l)
break
self.kill_locust_greenlets(dying)
events.hatch_complete.fire(user_count=self.num_clients)
events.hatch_complete.fire(user_count=self.user_count)

def kill_locust_greenlets(self, greenlets):
"""
Expand Down Expand Up @@ -208,7 +204,7 @@ def monitor_cpu(self):
self.cpu_warning_emitted = True
gevent.sleep(CPU_MONITOR_INTERVAL)

def start_hatching(self, locust_count=None, hatch_rate=None, wait=False):
def start_hatching(self, locust_count, hatch_rate, wait=False):
if self.state != STATE_RUNNING and self.state != STATE_HATCHING:
self.stats.clear_all()
self.exceptions = {}
Expand All @@ -219,25 +215,20 @@ def start_hatching(self, locust_count=None, hatch_rate=None, wait=False):
# Dynamically changing the locust count
if self.state != STATE_INIT and self.state != STATE_STOPPED:
self.state = STATE_HATCHING
if self.num_clients > locust_count:
if self.user_count > locust_count:
# Kill some locusts
kill_count = self.num_clients - locust_count
kill_count = self.user_count - locust_count
self.kill_locusts(kill_count)
elif self.num_clients < locust_count:
elif self.user_count < locust_count:
# Spawn some locusts
if hatch_rate:
self.hatch_rate = hatch_rate
spawn_count = locust_count - self.num_clients
self.hatch_rate = hatch_rate
spawn_count = locust_count - self.user_count
self.spawn_locusts(spawn_count=spawn_count)
else:
events.hatch_complete.fire(user_count=self.num_clients)
events.hatch_complete.fire(user_count=self.user_count)
else:
if hatch_rate:
self.hatch_rate = hatch_rate
if locust_count is not None:
self.spawn_locusts(locust_count, wait=wait)
else:
self.spawn_locusts(wait=wait)
self.hatch_rate = hatch_rate
self.spawn_locusts(locust_count, wait=wait)

def start_stepload(self, locust_count, hatch_rate, step_locust_count, step_duration):
if locust_count < step_locust_count:
Expand Down Expand Up @@ -300,9 +291,12 @@ def on_locust_error(locust_instance, exception, tb):
self.log_exception("local", str(exception), formatted_tb)
events.locust_error += on_locust_error

def start_hatching(self, locust_count=None, hatch_rate=None, wait=False):
def start_hatching(self, locust_count, hatch_rate, wait=False):
if hatch_rate > 100:
logger.warning("Your selected hatch rate is very high (>100), and this is known to sometimes cause issues. Do you really need to ramp up that fast?")
if self.hatching_greenlet:
# kill existing hatching_greenlet before we start a new one
self.hatching_greenlet.kill(block=True)
self.hatching_greenlet = self.greenlet.spawn(lambda: super(LocalLocustRunner, self).start_hatching(locust_count, hatch_rate, wait=wait))


Expand All @@ -329,6 +323,7 @@ class MasterLocustRunner(DistributedLocustRunner):
def __init__(self, *args, **kwargs):
super(MasterLocustRunner, self).__init__(*args, **kwargs)
self.slave_cpu_warning_emitted = False
self.target_user_count = None

class SlaveNodesDict(dict):
def get_by_state(self, state):
Expand Down Expand Up @@ -381,13 +376,13 @@ def cpu_log_warning(self):
return warning_emitted

def start_hatching(self, locust_count, hatch_rate):
self.target_user_count = locust_count
num_slaves = len(self.clients.ready) + len(self.clients.running) + len(self.clients.hatching)
if not num_slaves:
logger.warning("You are running in distributed mode but have no slave servers connected. "
"Please connect slaves prior to swarming.")
return

self.num_clients = locust_count
self.hatch_rate = hatch_rate
slave_num_clients = locust_count // (num_slaves or 1)
slave_hatch_rate = float(hatch_rate) / (num_slaves or 1)
Expand Down Expand Up @@ -450,9 +445,9 @@ def client_listener(self):
id = msg.node_id
self.clients[id] = SlaveNode(id, heartbeat_liveness=self.heartbeat_liveness)
logger.info("Client %r reported as ready. Currently %i clients ready to swarm." % (id, len(self.clients.ready + self.clients.running + self.clients.hatching)))
# balance the load distribution when new client joins
if self.state == STATE_RUNNING or self.state == STATE_HATCHING:
self.start_hatching(self.num_clients, self.hatch_rate)
# balance the load distribution when new client joins
self.start_hatching(self.target_user_count, self.hatch_rate)
## emit a warning if the slave's clock seem to be out of sync with our clock
#if abs(time() - msg.data["time"]) > 5.0:
# warnings.warn("The slave node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")
Expand Down Expand Up @@ -540,9 +535,11 @@ def worker(self):
self.client.send(Message("hatching", None, self.client_id))
job = msg.data
self.hatch_rate = job["hatch_rate"]
#self.num_clients = job["num_clients"]
self.host = job["host"]
self.options.stop_timeout = job["stop_timeout"]
if self.hatching_greenlet:
# kill existing hatching greenlet before we launch new one
self.hatching_greenlet.kill(block=True)
self.hatching_greenlet = self.greenlet.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"]))
elif msg.type == "stop":
self.stop()
Expand Down
75 changes: 75 additions & 0 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,22 @@ def on_locust_error(*args, **kwargs):
self.assertEqual(1, User.setup_run_count)
self.assertEqual(1, User.locust_error_count)
self.assertEqual(3, User.task_run_count)

def test_change_user_count_during_hatching(self):
class User(Locust):
wait_time = constant(1)
class task_set(TaskSet):
@task
def my_task(self):
pass

runner = LocalLocustRunner([User], mocked_options())
runner.start_hatching(locust_count=10, hatch_rate=5, wait=False)
sleep(0.6)
runner.start_hatching(locust_count=5, hatch_rate=5, wait=False)
runner.hatching_greenlet.join()
self.assertEqual(5, len(runner.locusts))
runner.quit()


class TestMasterRunner(LocustTestCase):
Expand Down Expand Up @@ -405,6 +421,33 @@ class MyTestLocust(Locust):
self.assertEqual(30, master.stats.total.get_current_response_time_percentile(0.5))
self.assertEqual(3000, master.stats.total.get_current_response_time_percentile(0.95))

def test_rebalance_locust_users_on_slave_connect(self):
class MyTestLocust(Locust):
pass

with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = MasterLocustRunner(MyTestLocust, self.options)
server.mocked_send(Message("client_ready", None, "zeh_fake_client1"))
self.assertEqual(1, len(master.clients))
self.assertTrue("zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict")

master.start_hatching(100, 20)
self.assertEqual(1, len(server.outbox))
client_id, msg = server.outbox.pop()
self.assertEqual(100, msg.data["num_clients"])
self.assertEqual(20, msg.data["hatch_rate"])

# let another slave connect
server.mocked_send(Message("client_ready", None, "zeh_fake_client2"))
self.assertEqual(2, len(master.clients))
self.assertEqual(2, len(server.outbox))
client_id, msg = server.outbox.pop()
self.assertEqual(50, msg.data["num_clients"])
self.assertEqual(10, msg.data["hatch_rate"])
client_id, msg = server.outbox.pop()
self.assertEqual(50, msg.data["num_clients"])
self.assertEqual(10, msg.data["hatch_rate"])

def test_sends_hatch_data_to_ready_running_hatching_slaves(self):
'''Sends hatch job to running, ready, or hatching slaves'''
class MyTestLocust(Locust):
Expand Down Expand Up @@ -667,6 +710,38 @@ def the_task(self):
slave.locusts.join()
# check that locust user did not get to finish
self.assertEqual(1, MyTestLocust._test_state)

def test_change_user_count_during_hatching(self):
class User(Locust):
wait_time = constant(1)
class task_set(TaskSet):
@task
def my_task(self):
pass

with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:
options = mocked_options()
options.stop_timeout = None
slave = SlaveLocustRunner([User], options)

client.mocked_send(Message("hatch", {
"hatch_rate": 5,
"num_clients": 10,
"host": "",
"stop_timeout": None,
}, "dummy_client_id"))
sleep(0.6)
self.assertEqual(STATE_HATCHING, slave.state)
client.mocked_send(Message("hatch", {
"hatch_rate": 5,
"num_clients": 9,
"host": "",
"stop_timeout": None,
}, "dummy_client_id"))
sleep(0)
slave.hatching_greenlet.join()
self.assertEqual(9, len(slave.locusts))
slave.quit()



Expand Down

0 comments on commit 576f7ea

Please sign in to comment.