Skip to content

Commit

Permalink
Merge pull request #154 from SvenskaSpel/rename-synchronizer-to-distr…
Browse files Browse the repository at this point in the history
…ibutor

Rename synchronizer to distributor
  • Loading branch information
cyberw authored Dec 12, 2023
2 parents 7877666 + d8ef8c1 commit 6d977b8
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 16 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ locust --help
## Wait time
- Custom wait time functions ([example](examples/constant_total_ips_ex.py), [source](locust_plugins/wait_time.py))

## Debug
- Support for running a single User in the debugger (moved to [locust core](https://docs.locust.io/en/latest/running-in-debugger.html)!)
## Distributing test data
- Support for distributing test data from master to workers while maintaining test data order ([example](examples/distributor_ex.py), [source](locust_plugins/distributor.py))

## Transaction manager
- Support for logging transactions (aggregating multiple requests or other actions) ([example](examples/transaction_example.py), [source](locust_plugins/transaction_manager.py))
Expand Down
6 changes: 3 additions & 3 deletions examples/synchronizer_ex.py → examples/distributor_ex.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from locust_plugins.mongoreader import MongoLRUReader
from locust_plugins.csvreader import CSVDictReader
from locust_plugins import synchronizer
from locust_plugins import distributor
from locust import HttpUser, task, run_single_user, events
from locust.runners import WorkerRunner

Expand All @@ -14,15 +14,15 @@ def on_locust_init(environment, **_kwargs):
reader = CSVDictReader("ssn.tsv", delimiter="\t")
else:
reader = MongoLRUReader({"foo": "bar"}, "last_login")
synchronizer.register(environment, reader)
distributor.register(environment, reader)


class MyUser(HttpUser):
host = "http://www.example.com"

@task
def my_task(self):
customer = synchronizer.getdata(self)
customer = distributor.getdata(self)
self.client.get(f"/?{customer['ssn']}")


Expand Down
19 changes: 9 additions & 10 deletions locust_plugins/synchronizer.py → locust_plugins/distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,33 @@


# received on master
def _synchronizer_request(environment: Environment, msg, **kwargs):
assert _iterator
def _distributor_request(environment: Environment, msg, **kwargs):
data = next(_iterator)
environment.runner.send_message(
"synchronizer_response",
"distributor_response",
{"payload": data, "user_id": msg.data["user_id"]},
client_id=msg.data["client_id"],
)


# received on worker
def _synchronizer_response(environment: Environment, msg, **kwargs):
def _distributor_response(environment: Environment, msg, **kwargs):
_results[msg.data["user_id"]].set(msg.data)


def register(environment: Environment, iterator: Optional[Iterator[dict]]):
"""Register synchronizer method handlers and tie them to use the iterator that you pass.
"""Register distributor method handlers and tie them to use the iterator that you pass.
iterator is not used on workers, so you can leave it as None there.
"""
global _iterator
_iterator = iterator

runner = environment.runner
if not iterator and not isinstance(runner, WorkerRunner):
raise Exception("iterator is a mandatory parameter when not on a worker runner")
assert iterator or isinstance(runner, WorkerRunner), "iterator is a mandatory parameter when not on a worker runner"
if runner:
runner.register_message("synchronizer_request", _synchronizer_request)
runner.register_message("synchronizer_response", _synchronizer_response)
runner.register_message("distributor_request", _distributor_request)
runner.register_message("distributor_response", _distributor_response)


def getdata(user: User) -> Dict:
Expand All @@ -49,14 +47,15 @@ def getdata(user: User) -> Dict:
user (User): current user object (we use the object id of the User to keep track of who's waiting for which data)
"""
if not user.environment.runner: # no need to do anything clever if there is no runner
assert _iterator, "Did you forget to call register() before trying to get data?"
return next(_iterator)

if id(user) in _results:
logging.warning("This user was already waiting for data. Strange.")

_results[id(user)] = AsyncResult()
runner = user.environment.runner
runner.send_message("synchronizer_request", {"user_id": id(user), "client_id": runner.client_id})
runner.send_message("distributor_request", {"user_id": id(user), "client_id": runner.client_id})
data = _results[id(user)].get()["payload"] # this waits for the reply
del _results[id(user)]
return data
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ commands =
grep -m 1 'Manual' output.txt
grep -m 1 'LCP' output.txt
bash -ec "! grep 'object has no attribute' output.txt"
bash -ec "(cd examples && PYTHONUNBUFFERED=1 locust -f synchronizer_ex.py --headless -t 5 -u 4 --processes 4) |& tee output.txt || true"
bash -ec "(cd examples && PYTHONUNBUFFERED=1 locust -f distributor_ex.py --headless -t 5 -u 4 --processes 4) |& tee output.txt || true"
grep -m 1 '2099010101-1111' output.txt
locust -f examples/jmeter_listener_example.py --headless -t 1
locust-compose up -d
Expand Down

0 comments on commit 6d977b8

Please sign in to comment.