Skip to content

Commit

Permalink
vdk-meta-jobs: implement a limit on concurrent running jobs (#1681)
Browse files Browse the repository at this point in the history
What:
Create a check that checks for every job if there are more than a
configurable amount of concurrent running jobs (15 by default) in a
Meta Jobs DAG. If so, the ones that exceed the limit would be delayed.

Why: We want to limit the number of concurrent running Meta jobs in
order not to overload the SC

Tests: unit test is introduced and all the rest are passing

Signed-off-by: Yoan Salambashev
[[email protected]](mailto:[email protected])

---------

Signed-off-by: Yoan Salambashev <[email protected]>
  • Loading branch information
yonitoo authored Mar 17, 2023
1 parent 96548f6 commit 7109a4b
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS = (
"META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS"
)
META_JOBS_MAX_CONCURRENT_RUNNING_JOBS = "META_JOBS_MAX_CONCURRENT_RUNNING_JOBS"


class MetaPluginConfiguration:
Expand All @@ -35,6 +36,9 @@ def meta_jobs_dag_execution_check_time_period_seconds(self):
def meta_jobs_time_between_status_check_seconds(self):
return self.__config.get_value(META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS)

def meta_jobs_max_concurrent_running_jobs(self):
return self.__config.get_value(META_JOBS_MAX_CONCURRENT_RUNNING_JOBS)


def add_definitions(config_builder: ConfigurationBuilder):
config_builder.add(
Expand Down Expand Up @@ -79,3 +83,13 @@ def add_definitions(config_builder: ConfigurationBuilder):
"consuming too many resources. It's advisable to use the default value and avoid changing it."
),
)
config_builder.add(
key=META_JOBS_MAX_CONCURRENT_RUNNING_JOBS,
default_value=15,
description=(
"This sets the maximum number of concurrent running jobs. When at full capacity, any ready-to-start job "
"would be delayed until a running job is completed. The limit is determined by this configuration option. "
"Setting an appropriate value helps to limit the generation of too many API calls or consuming too many "
"resources. It's advisable to use the default value for this variable."
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ def __init__(self, team_name: str, meta_config: MetaPluginConfiguration):
min_ready_time_seconds=meta_config.meta_jobs_delayed_jobs_min_delay_seconds(),
randomize_delay_seconds=meta_config.meta_jobs_delayed_jobs_randomized_added_delay_seconds(),
)
self._max_concurrent_running_jobs = (
meta_config.meta_jobs_max_concurrent_running_jobs()
)
self._finished_jobs = []
self._dag_execution_check_time_period_seconds = (
meta_config.meta_jobs_dag_execution_check_time_period_seconds()
Expand All @@ -55,7 +58,6 @@ def execute_dag(self):
for node in self._topological_sorter.get_ready():
self._start_job(node)
self._start_delayed_jobs()

finished_jobs = self._get_finished_jobs()
self._finalize_jobs(finished_jobs)
if not finished_jobs:
Expand All @@ -78,7 +80,10 @@ def _finalize_jobs(self, finalized_jobs):
self._finished_jobs.append(node)

def _start_delayed_jobs(self):
while True:
while (
len(self._job_executor.get_currently_running_jobs())
< self._max_concurrent_running_jobs
):
job = self._delayed_starting_jobs.dequeue()
if job is None:
break
Expand Down Expand Up @@ -107,7 +112,15 @@ def default_serialization(o: Any) -> Any:

def _start_job(self, node):
try:
self._job_executor.start_job(node)
curr_running_jobs = len(self._job_executor.get_currently_running_jobs())
if curr_running_jobs >= self._max_concurrent_running_jobs:
log.info(
"Starting job fail - too many concurrently running jobs. Currently running: "
f"{curr_running_jobs}, limit: {self._max_concurrent_running_jobs}. Will be re-tried later"
)
self._delayed_starting_jobs.enqueue(node)
else:
self._job_executor.start_job(node)
except ApiException as e:
if e.status == 409:
log.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = team-awesome
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.api.job_input import IJobInput
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
log.info(f"Dummy arguments {job_input.get_arguments()}")

jobs = [
{"job_name": f"job{i}", "depends_on": [] if i == 1 else ["job1"]}
for i in range(1, 8)
]
MetaJobInput().run_meta_job(jobs)
3 changes: 3 additions & 0 deletions projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ def meta_jobs_dag_execution_check_time_period_seconds(self):
def meta_jobs_time_between_status_check_seconds(self):
return 40

def meta_jobs_max_concurrent_running_jobs(self):
return 15


def test_execute_dag_happy_case():
job1 = dict(job_name="job1", depends_on=[])
Expand Down
44 changes: 44 additions & 0 deletions projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,47 @@ def test_meta_job_circular_dependency(httpserver: PluginHTTPServer):
# no other request should be tried as the meta job fails
assert isinstance(result.exception, UserCodeError)
assert len(httpserver.log) == 0


def test_meta_job_concurrent_running_jobs_limit(httpserver: PluginHTTPServer):
jobs = [("job" + str(i), [200], "succeeded", 1) for i in range(1, 8)]
api_url = _prepare(httpserver, jobs)

with mock.patch.dict(
os.environ,
{
"VDK_CONTROL_SERVICE_REST_API_URL": api_url,
"VDK_META_JOBS_MAX_CONCURRENT_RUNNING_JOBS": "2",
"VDK_META_JOBS_DELAYED_JOBS_MIN_DELAY_SECONDS": "1",
"VDK_META_JOBS_DELAYED_JOBS_RANDOMIZED_ADDED_DELAY_SECONDS": "1",
"VDK_META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS": "1",
},
):
# CliEntryBasedTestRunner (provided by vdk-test-utils) gives a way to simulate vdk command
# and mock large parts of it - e.g passed our own plugins
runner = CliEntryBasedTestRunner(plugin_entry)

result: Result = runner.invoke(
["run", jobs_path_from_caller_directory("meta-job-exceed-limit")]
)

expected_max_running_jobs = int(
os.getenv("VDK_META_JOBS_MAX_CONCURRENT_RUNNING_JOBS", "2")
)
# keep track of the number of running jobs at any given time
running_jobs = set()
for request, response in httpserver.log:
if "executions" in request.path:
if request.method == "POST":
job_name = request.path.split("/jobs/")[1].split("/")[0]
running_jobs.add(job_name)
assert (
len(running_jobs) <= expected_max_running_jobs
) # assert that max concurrent running jobs is not exceeded
if request.method == "GET":
execution = json.loads(response.response[0])
if execution["status"] == "succeeded":
running_jobs.discard(execution["job_name"])
cli_assert_equal(0, result)
# assert that all the jobs finished successfully
assert len(running_jobs) == 0

0 comments on commit 7109a4b

Please sign in to comment.