Skip to content

Commit

Permalink
vdk-meta-jobs: fix busyloop bug
Browse files Browse the repository at this point in the history
The status calls were basically done in busy loop while jobs are
running. This would be hammering the database which is not a good idea
:)

Signed-off-by: Antoni Ivanov <[email protected]>
  • Loading branch information
antoniivanov committed Oct 24, 2022
1 parent f5537df commit 8084fe6
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import os
import time
from typing import Dict

from taurus_datajob_api import ApiException
Expand All @@ -20,6 +22,9 @@ class TrackingDataJobExecutor:
def __init__(self, executor: IDataJobExecutor):
self._executor = executor
self._jobs_cache: Dict[str, TrackableJob] = dict()
self._time_between_status_check_seconds = int(
os.environ.get("VDK_META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS", "40")
)

def register_job(self, job: TrackableJob):
if job.job_name in self._jobs_cache:
Expand All @@ -33,7 +38,7 @@ def start_job(self, job_name: str) -> None:
"""
:param job_name: the job to start and track
"""
job = self._get_job(job_name)
job = self.__get_job(job_name)
job.start_attempt += 1
execution_id = self._executor.start_job(job.job_name, job.team_name)
log.info(f"Starting new data job execution with id {execution_id}")
Expand All @@ -43,11 +48,11 @@ def start_job(self, job_name: str) -> None:
job.job_name, job.team_name, job.execution_id
)
log.info(
f"Started data job {job_name}:\n{self._get_printable_details(job.details)}"
f"Started data job {job_name}:\n{self.__get_printable_details(job.details)}"
)

def finalize_job(self, job_name):
job = self._get_job(job_name)
job = self.__get_job(job_name)
details = self._executor.details_job(
job.job_name, job.team_name, job.execution_id
)
Expand All @@ -72,20 +77,24 @@ def finalize_job(self, job_name):
)

@staticmethod
def _get_printable_details(details):
def __get_printable_details(details):
del details["deployment"]
return json.dumps(details, default=lambda o: str(o), indent=2)

def _get_job(self, job_name) -> TrackableJob:
def __get_job(self, job_name) -> TrackableJob:
job: TrackableJob = self._jobs_cache.get(job_name)
if job is None:
raise IndexError(
f"The job {job_name} has not been registered. Use register_job first. "
)
return job

@staticmethod
def __is_job_submitted(job: TrackableJob):
return job.status is not None

def status(self, job_name: str) -> str:
job = self._get_job(job_name)
job = self.__get_job(job_name)
if job.status in ACTIVE_JOB_STATUSES:
job.status = self._executor.status_job(
job.job_name, job.team_name, job.execution_id
Expand All @@ -98,14 +107,24 @@ def get_finished_job_names(self):
# TODO: optimize
# Do not call the status every time (use TTL caching)
# Do not call all status at the same time - stagger them in time
# Or use GraphQL API to get status at once (batch)
for job in self._jobs_cache.values():
if job.status is not None and job.status in ACTIVE_JOB_STATUSES:
if (
self.__is_job_submitted(job)
and job.status in ACTIVE_JOB_STATUSES
and time.time() - job.last_status_time
> self._time_between_status_check_seconds
):
job.last_status_time = time.time()
job.status = self.status(job.job_name)

for job in self._jobs_cache.values():
if job.status is not None and job.status not in ACTIVE_JOB_STATUSES:
if self.__is_job_submitted(job) and job.status not in ACTIVE_JOB_STATUSES:
finalized_jobs.append(job.job_name)
return finalized_jobs

def get_all_jobs(self):
return list(self._jobs_cache.values())

def get_currently_running_jobs(self):
return [j for j in self._jobs_cache.values() if j.status in ACTIVE_JOB_STATUSES]
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ class TrackableJob(meta_job.SingleJob):
execution_id: str = None
details: dict = None
start_attempt = 0
last_status_time = 0
98 changes: 82 additions & 16 deletions projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import os
import time
from unittest import mock

from click.testing import Result
Expand All @@ -25,18 +27,23 @@ def _prepare(httpserver: PluginHTTPServer, jobs=None):
team_name = "team-awesome"
if jobs is None:
jobs = [
("job1", [200], "succeeded"),
("job2", [200], "succeeded"),
("job3", [200], "succeeded"),
("job4", [200], "succeeded"),
("job1", [200], "succeeded", 0),
("job2", [200], "succeeded", 0),
("job3", [200], "succeeded", 0),
("job4", [200], "succeeded", 0),
]

for job_name, request_responses, job_status in jobs:
started_jobs = dict()

for job_name, request_responses, job_status, *execution_duration in jobs:
request_responses.reverse()
execution_duration = execution_duration[0] if execution_duration else 0

def handler(location, statuses):
def handler(location, statuses, job_name):
def _handler_fn(r: Request):
status = statuses[0] if len(statuses) == 1 else statuses.pop()
if status < 300:
started_jobs[job_name] = time.time()
return Response(status=status, headers=dict(Location=location))

return _handler_fn
Expand All @@ -48,22 +55,38 @@ def _handler_fn(r: Request):
handler(
f"/data-jobs/for-team/{team_name}/jobs/{job_name}/executions/{job_name}",
request_responses,
job_name,
)
)

execution: DataJobExecution = DataJobExecution(
id=job_name,
job_name=job_name,
logs_url="http://url",
deployment=DataJobDeployment(),
start_time="2021-09-24T14:14:03.922Z",
status=job_status,
message="foo",
)
def exec_handler(job_name, job_status, execution_duration):
def _handler_fn(r: Request):
actual_job_status = job_status
if time.time() < started_jobs.get(job_name, 0) + execution_duration:
actual_job_status = "running"
execution: DataJobExecution = DataJobExecution(
id=job_name,
job_name=job_name,
logs_url="http://url",
deployment=DataJobDeployment(),
start_time="2021-09-24T14:14:03.922Z",
status=actual_job_status,
message="foo",
)
response_data = json.dumps(execution.to_dict(), indent=4)
return Response(
response_data,
status=200,
headers=None,
content_type="application/json",
)

return _handler_fn

httpserver.expect_request(
uri=f"/data-jobs/for-team/{team_name}/jobs/{job_name}/executions/{job_name}",
method="GET",
).respond_with_json(execution.to_dict())
).respond_with_handler(exec_handler(job_name, job_status, execution_duration))

return rest_api_url

Expand Down Expand Up @@ -181,3 +204,46 @@ def test_meta_job_cannot_start_job(httpserver: PluginHTTPServer):
cli_assert_equal(1, result)
# no other request should be tried as the meta job fails
assert len(httpserver.log) == 1


def test_meta_job_long_running(httpserver: PluginHTTPServer):
jobs = [
("job1", [200], "succeeded", 3), # execution duration is 3 seconds
("job2", [200], "succeeded"),
("job3", [200], "succeeded"),
("job4", [200], "succeeded"),
]
api_url = _prepare(httpserver, jobs)

with mock.patch.dict(
os.environ,
{
"VDK_CONTROL_SERVICE_REST_API_URL": api_url,
# we set 5 seconds more than execution duration of 3 set above
"VDK_META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS": "5",
},
):
# CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command
# and mock large parts of it - e.g passed our own plugins
runner = CliEntryBasedTestRunner(plugin_entry)

result: Result = runner.invoke(
["run", jobs_path_from_caller_directory("meta-job")]
)
cli_assert_equal(0, result)
job1_requests = [
req
for req, res in httpserver.log
if req.method == "GET" and req.base_url.endswith("job1")
]
# We have 1 call during start, 1 call at finish and 1 call that returns running and 1 that returns the final
# status. For total of 4
# NB: test (verification) that requires that deep implementation details knowledge is
# not a good idea but we need to verify that we are not hammering the API Server somehow ...
assert len(job1_requests) == 4

# let's make sure something else is not generating more requests then expected
# if implementation is changed the number below would likely change.
# If the new count is not that big we can edit it here to pass the test,
# if the new count is too big, we have an issue that need to be investigated.
assert len(httpserver.log) == 17

0 comments on commit 8084fe6

Please sign in to comment.