Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create an ES client per simulated client instead of per worker. #1516

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1662,14 +1662,10 @@ def es_clients(all_hosts, all_client_options):
es[cluster_name] = client.EsClientFactory(cluster_hosts, all_client_options[cluster_name]).create_async()
return es

# Properly size the internal connection pool to match the number of expected clients but allow the user
# to override it if needed.
client_count = len(self.task_allocations)
es = es_clients(self.cfg.opts("client", "hosts").all_hosts, self.cfg.opts("client", "options").with_max_connections(client_count))

self.logger.info("Task assertions enabled: %s", str(self.assertions_enabled))
runner.enable_assertions(self.assertions_enabled)

clients = []
aws = []
# A parameter source should only be created once per task - it is partitioned later on per client.
params_per_task = {}
Expand All @@ -1679,6 +1675,8 @@ def es_clients(all_hosts, all_client_options):
param_source = track.operation_parameters(self.track, task)
params_per_task[task] = param_source
schedule = schedule_for(task_allocation, params_per_task[task])
es = es_clients(self.cfg.opts("client", "hosts").all_hosts, self.cfg.opts("client", "options"))
clients.append(es)
async_executor = AsyncExecutor(
client_id, task, schedule, es, self.sampler, self.cancel, self.complete, task.error_behavior(self.abort_on_error)
)
Expand All @@ -1693,8 +1691,9 @@ def es_clients(all_hosts, all_client_options):
await asyncio.get_event_loop().shutdown_asyncgens()
shutdown_asyncgens_end = time.perf_counter()
self.logger.info("Total time to shutdown asyncgens: %f seconds.", (shutdown_asyncgens_end - run_end))
for e in es.values():
await e.transport.close()
for c in clients:
for es in c.values():
await es.close()
transport_close_end = time.perf_counter()
self.logger.info("Total time to close transports: %f seconds.", (shutdown_asyncgens_end - transport_close_end))

Expand Down