Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-meta-jobs: implement a limit on starting jobs at once #1681

Merged
merged 55 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
9fcfed7
[vdk-plugins] vdk-meta-jobs: implement a limit on starting jobs at once
yonitoo Feb 24, 2023
5d9c9c4
[vdk-plugins] vdk-meta-jobs: fix some minor issues
yonitoo Feb 27, 2023
30b5b53
[vdk-plugins] vdk-meta-jobs: remove redundant code and fix validation
yonitoo Feb 27, 2023
442b33f
[vdk-plugins] vdk-meta-jobs: add toposort at the start of validation
yonitoo Feb 28, 2023
5742cde
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 28, 2023
8960e23
vdk-meta-jobs: address some PR comments and fix minor issues
yonitoo Mar 1, 2023
b6752af
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 1, 2023
72bdec0
vdk-meta-jobs: switch to the new approach with pre-start check and delay
yonitoo Mar 1, 2023
848e3e8
vdk-meta-jobs: fix a minor technical error
yonitoo Mar 1, 2023
aefa076
control-service: don't include needless service token (#1679)
antoniivanov Feb 28, 2023
0d65470
build(deps): Bump io.micrometer:micrometer-core from 1.10.3 to 1.10.4…
dependabot[bot] Feb 28, 2023
b424a0b
versatile-data-kit: auto-merge dependabot PRs
antoniivanov Feb 28, 2023
fedfd91
versatile-data-kit: update dependabot
antoniivanov Feb 28, 2023
75fb99f
build(deps): Bump net.bytebuddy:byte-buddy from 1.13.0 to 1.14.0 in /…
dependabot[bot] Feb 28, 2023
1bdda95
versatile-data-kit: pre-commit hook for (S)CSS/JS/TS/HTML formatting …
ivakoleva Mar 1, 2023
c84fd29
vdk-impala: Add optional parameter for staging table prefix (#1666)
sbuldeev Mar 1, 2023
dcee74b
frontend: gitlab ci variables for change locations (#1685)
ivakoleva Mar 1, 2023
93ba9da
vdk-trino: stabilize vdk-trino tests (#1677)
antoniivanov Mar 1, 2023
7ec5009
versatile-data-kit: update auto merge dependabot configuration
antoniivanov Mar 1, 2023
9750bed
versatile-data-kit: revert update auto merge accidental commit
antoniivanov Mar 1, 2023
b17a045
versatile-data-kit: dependabot auto-merge fix (#1688)
ivakoleva Mar 1, 2023
28678e5
vdk-jupyter: remove py-to-ts-interfaces because of build problems (#1…
duyguHsnHsn Mar 1, 2023
3327f3c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 1, 2023
6182878
Merge branch 'main' into person/ysalambashev/starting-meta-jobs-limit
antoniivanov Mar 1, 2023
6240eae
vdk-meta-jobs: address some review comments
yonitoo Mar 2, 2023
7aa2444
Merge branch 'main' into person/ysalambashev/starting-meta-jobs-limit
yonitoo Mar 2, 2023
9c2383e
vdk-meta-jobs: fix tet name and print statement
yonitoo Mar 2, 2023
a06f65c
vdk-meta-jobs: change unit test
yonitoo Mar 7, 2023
fef8778
Merge branch 'main' into person/ysalambashev/starting-meta-jobs-limit
yonitoo Mar 7, 2023
610e199
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 7, 2023
6c954d9
vdk-meta-jobs: address review comments
yonitoo Mar 7, 2023
40d1ad1
vdk-meta-jobs: Address review comments
yonitoo Mar 8, 2023
c54bb26
Merge branch 'main' into person/ysalambashev/starting-meta-jobs-limit
yonitoo Mar 8, 2023
c60619f
vdk-meta-jobs: fix limit logic
yonitoo Mar 9, 2023
c9d3fdf
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 9, 2023
6aca632
vdk-meta-jobs: fix the unit test for the concurrent running jobs limit
yonitoo Mar 10, 2023
bf4b7d6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 10, 2023
4656290
vdk-meta-jobs: remove the old unit test
yonitoo Mar 10, 2023
af62fba
Merge branch 'main' into person/ysalambashev/starting-meta-jobs-limit
yonitoo Mar 10, 2023
5315b5d
vdk-meta-jobs: extract the limit hit check out of the topo sorter loop
yonitoo Mar 10, 2023
59892d9
vdk-meta-jobs: remove test_meta_dag unused imports
yonitoo Mar 10, 2023
aafa831
vdk-meta-jobs: revert newlines
yonitoo Mar 10, 2023
4b0a0e0
vdk-meta-jobs: fix the bug with the delayed jobs
yonitoo Mar 14, 2023
d316e3c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 14, 2023
63a5f11
vdk-meta-jobs: try to fix the infinite loop in start_delayed_jobs()
yonitoo Mar 15, 2023
4f518f2
Merge branch 'main' into person/ysalambashev/starting-meta-jobs-limit
yonitoo Mar 15, 2023
33c3ff7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 15, 2023
d36e44a
vdk-meta-jobs: adapt to the meta config and lower time between status…
yonitoo Mar 16, 2023
78640bb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 16, 2023
44554b3
vdk-meta-jobs: remove unused public method
yonitoo Mar 16, 2023
a6f73ae
vdk-meta-jobs: simplify the approach
yonitoo Mar 16, 2023
e1032ba
vdk-meta-jobs: add meta_jobs_max_concurrent_running_jobs to dummy config
yonitoo Mar 16, 2023
4d4db62
Merge branch 'main' into person/ysalambashev/starting-meta-jobs-limit
yonitoo Mar 17, 2023
64ae49e
vdk-meta-jobs: remove redundant comments and fix codacy fail
yonitoo Mar 17, 2023
324dd33
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS = (
"META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS"
)
META_JOBS_MAX_CONCURRENT_RUNNING_JOBS = "META_JOBS_MAX_CONCURRENT_RUNNING_JOBS"


class MetaPluginConfiguration:
Expand All @@ -35,6 +36,9 @@ def meta_jobs_dag_execution_check_time_period_seconds(self):
def meta_jobs_time_between_status_check_seconds(self):
return self.__config.get_value(META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS)

def meta_jobs_max_concurrent_running_jobs(self):
return self.__config.get_value(META_JOBS_MAX_CONCURRENT_RUNNING_JOBS)


def add_definitions(config_builder: ConfigurationBuilder):
config_builder.add(
Expand Down Expand Up @@ -79,3 +83,13 @@ def add_definitions(config_builder: ConfigurationBuilder):
"consuming too many resources. It's advisable to use the default value and avoid changing it."
),
)
config_builder.add(
key=META_JOBS_MAX_CONCURRENT_RUNNING_JOBS,
default_value=15,
description=(
"This sets the maximum number of concurrent running jobs. When at full capacity, any ready-to-start job "
"would be delayed until a running job is completed. The limit is determined by this configuration option. "
"Setting an appropriate value helps to limit the generation of too many API calls or consuming too many "
"resources. It's advisable to use the default value for this variable."
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ def __init__(self, team_name: str, meta_config: MetaPluginConfiguration):
min_ready_time_seconds=meta_config.meta_jobs_delayed_jobs_min_delay_seconds(),
randomize_delay_seconds=meta_config.meta_jobs_delayed_jobs_randomized_added_delay_seconds(),
)
self._max_concurrent_running_jobs = (
meta_config.meta_jobs_max_concurrent_running_jobs()
)
self._finished_jobs = []
self._dag_execution_check_time_period_seconds = (
meta_config.meta_jobs_dag_execution_check_time_period_seconds()
Expand All @@ -55,7 +58,6 @@ def execute_dag(self):
for node in self._topological_sorter.get_ready():
self._start_job(node)
self._start_delayed_jobs()

finished_jobs = self._get_finished_jobs()
self._finalize_jobs(finished_jobs)
if not finished_jobs:
Expand All @@ -78,7 +80,10 @@ def _finalize_jobs(self, finalized_jobs):
self._finished_jobs.append(node)

def _start_delayed_jobs(self):
while True:
while (
len(self._job_executor.get_currently_running_jobs())
< self._max_concurrent_running_jobs
):
job = self._delayed_starting_jobs.dequeue()
if job is None:
break
Expand Down Expand Up @@ -107,7 +112,15 @@ def default_serialization(o: Any) -> Any:

def _start_job(self, node):
try:
self._job_executor.start_job(node)
curr_running_jobs = len(self._job_executor.get_currently_running_jobs())
if curr_running_jobs >= self._max_concurrent_running_jobs:
log.info(
"Starting job fail - too many concurrently running jobs. Currently running: "
f"{curr_running_jobs}, limit: {self._max_concurrent_running_jobs}. Will be re-tried later"
)
self._delayed_starting_jobs.enqueue(node)
else:
self._job_executor.start_job(node)
except ApiException as e:
if e.status == 409:
log.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[owner]
team = team-awesome
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.api.job_input import IJobInput
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
log.info(f"Dummy arguments {job_input.get_arguments()}")

jobs = [
{"job_name": f"job{i}", "depends_on": [] if i == 1 else ["job1"]}
for i in range(1, 8)
]
MetaJobInput().run_meta_job(jobs)
50 changes: 50 additions & 0 deletions projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,53 @@ def test_meta_job_circular_dependency(httpserver: PluginHTTPServer):
# no other request should be tried as the meta job fails
assert isinstance(result.exception, UserCodeError)
assert len(httpserver.log) == 0


def test_meta_job_concurrent_running_jobs_limit(httpserver: PluginHTTPServer):
jobs = [("job" + str(i), [200], "succeeded", 1) for i in range(1, 8)]
api_url = _prepare(httpserver, jobs)

with mock.patch.dict(
os.environ,
{
"VDK_CONTROL_SERVICE_REST_API_URL": api_url,
"VDK_META_JOBS_MAX_CONCURRENT_RUNNING_JOBS": "2",
"VDK_META_JOBS_DELAYED_JOBS_MIN_DELAY_SECONDS": "1",
"VDK_META_JOBS_DELAYED_JOBS_RANDOMIZED_ADDED_DELAY_SECONDS": "1",
"VDK_META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS": "1",
},
):
# CliEntryBasedTestRunner (provided by vdk-test-utils) gives a way to simulate vdk command
# and mock large parts of it - e.g passed our own plugins
runner = CliEntryBasedTestRunner(plugin_entry)

result: Result = runner.invoke(
["run", jobs_path_from_caller_directory("meta-job-exceed-limit")]
)

expected_max_running_jobs = int(
os.getenv("VDK_META_JOBS_MAX_CONCURRENT_RUNNING_JOBS", "2")
)
# keep track of the number of running jobs at any given time
running_jobs = set()
# track the latest request in order to reset the number of running jobs when such finish
for request, response in httpserver.log:
if "executions" in request.path:
if request.method == "POST":
running_jobs.add(execution["job_name"])
assert (
len(running_jobs) <= expected_max_running_jobs
) # assert that max concurrent running jobs is
# not exceeded
if request.method == "GET":
execution = json.loads(response.response[0])
if execution["status"] == "succeeded":
running_jobs.discard(
execution["job_name"]
) # back-to-back GET reqs means the jobs have been
# finalized successfully

cli_assert_equal(0, result)

# assert that all the jobs finished successfully
assert len(running_jobs) == 0