From 8084fe6e3a47d65bbaa33343cac74db7bfde197d Mon Sep 17 00:00:00 2001 From: Antoni Ivanov Date: Thu, 20 Oct 2022 00:55:17 +0300 Subject: [PATCH] vdk-meta-jobs: fix busyloop bug The status calls were basically done in busy loop while jobs are running. This would be hammering the database which is not a good idea :) Signed-off-by: Antoni Ivanov --- .../meta_jobs/cached_data_job_executor.py | 35 +++++-- .../src/vdk/plugin/meta_jobs/meta.py | 1 + .../vdk-meta-jobs/tests/test_meta_job.py | 98 ++++++++++++++++--- 3 files changed, 110 insertions(+), 24 deletions(-) diff --git a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/cached_data_job_executor.py b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/cached_data_job_executor.py index 06310655ef..ba8a6fccf0 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/cached_data_job_executor.py +++ b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/cached_data_job_executor.py @@ -2,6 +2,8 @@ # SPDX-License-Identifier: Apache-2.0 import json import logging +import os +import time from typing import Dict from taurus_datajob_api import ApiException @@ -20,6 +22,9 @@ class TrackingDataJobExecutor: def __init__(self, executor: IDataJobExecutor): self._executor = executor self._jobs_cache: Dict[str, TrackableJob] = dict() + self._time_between_status_check_seconds = int( + os.environ.get("VDK_META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS", "40") + ) def register_job(self, job: TrackableJob): if job.job_name in self._jobs_cache: @@ -33,7 +38,7 @@ def start_job(self, job_name: str) -> None: """ :param job_name: the job to start and track """ - job = self._get_job(job_name) + job = self.__get_job(job_name) job.start_attempt += 1 execution_id = self._executor.start_job(job.job_name, job.team_name) log.info(f"Starting new data job execution with id {execution_id}") @@ -43,11 +48,11 @@ def start_job(self, job_name: str) -> None: job.job_name, job.team_name, job.execution_id ) log.info( - f"Started data job {job_name}:\n{self._get_printable_details(job.details)}" + f"Started data job {job_name}:\n{self.__get_printable_details(job.details)}" ) def finalize_job(self, job_name): - job = self._get_job(job_name) + job = self.__get_job(job_name) details = self._executor.details_job( job.job_name, job.team_name, job.execution_id ) @@ -72,11 +77,11 @@ def finalize_job(self, job_name): ) @staticmethod - def _get_printable_details(details): + def __get_printable_details(details): del details["deployment"] return json.dumps(details, default=lambda o: str(o), indent=2) - def _get_job(self, job_name) -> TrackableJob: + def __get_job(self, job_name) -> TrackableJob: job: TrackableJob = self._jobs_cache.get(job_name) if job is None: raise IndexError( @@ -84,8 +89,12 @@ def _get_job(self, job_name) -> TrackableJob: ) return job + @staticmethod + def __is_job_submitted(job: TrackableJob): + return job.status is not None + def status(self, job_name: str) -> str: - job = self._get_job(job_name) + job = self.__get_job(job_name) if job.status in ACTIVE_JOB_STATUSES: job.status = self._executor.status_job( job.job_name, job.team_name, job.execution_id @@ -98,14 +107,24 @@ def get_finished_job_names(self): # TODO: optimize # Do not call the status every time (use TTL caching) # Do not call all status at the same time - stagger them in time + # Or use GraphQL API to get status at once (batch) for job in self._jobs_cache.values(): - if job.status is not None and job.status in ACTIVE_JOB_STATUSES: + if ( + self.__is_job_submitted(job) + and job.status in ACTIVE_JOB_STATUSES + and time.time() - job.last_status_time + > self._time_between_status_check_seconds + ): + job.last_status_time = time.time() job.status = self.status(job.job_name) for job in self._jobs_cache.values(): - if job.status is not None and job.status not in ACTIVE_JOB_STATUSES: + if self.__is_job_submitted(job) and job.status not in ACTIVE_JOB_STATUSES: finalized_jobs.append(job.job_name) return finalized_jobs def get_all_jobs(self): return list(self._jobs_cache.values()) + + def get_currently_running_jobs(self): + return [j for j in self._jobs_cache.values() if j.status in ACTIVE_JOB_STATUSES] diff --git a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta.py b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta.py index 3d3171e647..d9ad18cbe4 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta.py +++ b/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta.py @@ -46,3 +46,4 @@ class TrackableJob(meta_job.SingleJob): execution_id: str = None details: dict = None start_attempt = 0 + last_status_time = 0 diff --git a/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py b/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py index 1f4ade9ddd..67429e05a5 100644 --- a/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py +++ b/projects/vdk-plugins/vdk-meta-jobs/tests/test_meta_job.py @@ -1,6 +1,8 @@ # Copyright 2021 VMware, Inc. # SPDX-License-Identifier: Apache-2.0 +import json import os +import time from unittest import mock from click.testing import Result @@ -25,18 +27,23 @@ def _prepare(httpserver: PluginHTTPServer, jobs=None): team_name = "team-awesome" if jobs is None: jobs = [ - ("job1", [200], "succeeded"), - ("job2", [200], "succeeded"), - ("job3", [200], "succeeded"), - ("job4", [200], "succeeded"), + ("job1", [200], "succeeded", 0), + ("job2", [200], "succeeded", 0), + ("job3", [200], "succeeded", 0), + ("job4", [200], "succeeded", 0), ] - for job_name, request_responses, job_status in jobs: + started_jobs = dict() + + for job_name, request_responses, job_status, *execution_duration in jobs: request_responses.reverse() + execution_duration = execution_duration[0] if execution_duration else 0 - def handler(location, statuses): + def handler(location, statuses, job_name): def _handler_fn(r: Request): status = statuses[0] if len(statuses) == 1 else statuses.pop() + if status < 300: + started_jobs[job_name] = time.time() return Response(status=status, headers=dict(Location=location)) return _handler_fn @@ -48,22 +55,38 @@ def _handler_fn(r: Request): handler( f"/data-jobs/for-team/{team_name}/jobs/{job_name}/executions/{job_name}", request_responses, + job_name, ) ) - execution: DataJobExecution = DataJobExecution( - id=job_name, - job_name=job_name, - logs_url="http://url", - deployment=DataJobDeployment(), - start_time="2021-09-24T14:14:03.922Z", - status=job_status, - message="foo", - ) + def exec_handler(job_name, job_status, execution_duration): + def _handler_fn(r: Request): + actual_job_status = job_status + if time.time() < started_jobs.get(job_name, 0) + execution_duration: + actual_job_status = "running" + execution: DataJobExecution = DataJobExecution( + id=job_name, + job_name=job_name, + logs_url="http://url", + deployment=DataJobDeployment(), + start_time="2021-09-24T14:14:03.922Z", + status=actual_job_status, + message="foo", + ) + response_data = json.dumps(execution.to_dict(), indent=4) + return Response( + response_data, + status=200, + headers=None, + content_type="application/json", + ) + + return _handler_fn + httpserver.expect_request( uri=f"/data-jobs/for-team/{team_name}/jobs/{job_name}/executions/{job_name}", method="GET", - ).respond_with_json(execution.to_dict()) + ).respond_with_handler(exec_handler(job_name, job_status, execution_duration)) return rest_api_url @@ -181,3 +204,46 @@ def test_meta_job_cannot_start_job(httpserver: PluginHTTPServer): cli_assert_equal(1, result) # no other request should be tried as the meta job fails assert len(httpserver.log) == 1 + + +def test_meta_job_long_running(httpserver: PluginHTTPServer): + jobs = [ + ("job1", [200], "succeeded", 3), # execution duration is 3 seconds + ("job2", [200], "succeeded"), + ("job3", [200], "succeeded"), + ("job4", [200], "succeeded"), + ] + api_url = _prepare(httpserver, jobs) + + with mock.patch.dict( + os.environ, + { + "VDK_CONTROL_SERVICE_REST_API_URL": api_url, + # we set 5 seconds more than execution duration of 3 set above + "VDK_META_JOBS_TIME_BETWEEN_STATUS_CHECK_SECONDS": "5", + }, + ): + # CliEntryBasedTestRunner (provided by vdk-test-utils) gives a away to simulate vdk command + # and mock large parts of it - e.g passed our own plugins + runner = CliEntryBasedTestRunner(plugin_entry) + + result: Result = runner.invoke( + ["run", jobs_path_from_caller_directory("meta-job")] + ) + cli_assert_equal(0, result) + job1_requests = [ + req + for req, res in httpserver.log + if req.method == "GET" and req.base_url.endswith("job1") + ] + # We have 1 call during start, 1 call at finish and 1 call that returns running and 1 that returns the final + # status. For total of 4 + # NB: test (verification) that requires that deep implementation details knowledge is + # not a good idea but we need to verify that we are not hammering the API Server somehow ... + assert len(job1_requests) == 4 + + # let's make sure something else is not generating more requests then expected + # if implementation is changed the number below would likely change. + # If the new count is not that big we can edit it here to pass the test, + # if the new count is too big, we have an issue that need to be investigated. + assert len(httpserver.log) == 17