Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-44763: Reflect the lack of job cancellation support #174

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.d/20240613_185120_rra_DM_44763.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Backwards-incompatible changes

- Cancelling or aborting jobs is not supported by the combination of arq and sync worker functions. Properly reflect this in job metadata by forcing execution duration to 0 to indicate that no limit is applied. Substantially increase the default arq job timeout since the timeout will be ineffective anyway.
- Drop the `CUTOUT_TIMEOUT` configuration option since we have no way of enforcing a timeout on jobs.
14 changes: 7 additions & 7 deletions src/vocutouts/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ class Config(BaseSettings):
)

sync_timeout: timedelta = Field(
timedelta(minutes=1), title="Timeout for sync requests"
)

timeout: timedelta = Field(
timedelta(minutes=10), title="Timeout for cutout jobs"
timedelta(minutes=1),
title="Timeout for sync requests",
description=(
"The job will continue running as an async job beyond this"
" timeout since cancellation of jobs is not currently supported."
),
)

tmpdir: Path = Field(Path("/tmp"), title="Temporary directory for workers")
Expand Down Expand Up @@ -178,7 +179,7 @@ def _validate_arq_queue_url(cls, v: RedisDsn) -> RedisDsn:
)
return v

@field_validator("lifetime", "sync_timeout", "timeout", mode="before")
@field_validator("lifetime", "sync_timeout", mode="before")
@classmethod
def _parse_as_seconds(cls, v: int | str | timedelta) -> int | timedelta:
"""Convert timedelta strings so they are parsed as seconds."""
Expand Down Expand Up @@ -213,7 +214,6 @@ def uws_config(self) -> UWSConfig:
return UWSConfig(
arq_mode=self.arq_mode,
arq_redis_settings=self.arq_redis_settings,
execution_duration=self.timeout,
lifetime=self.lifetime,
parameters_type=CutoutParameters,
signing_service_account=self.service_account,
Expand Down
8 changes: 5 additions & 3 deletions src/vocutouts/uws/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

from datetime import timedelta
from typing import Any

from fastapi import APIRouter, FastAPI, Request
Expand All @@ -13,7 +12,7 @@

from . import schema
from .config import UWSConfig
from .constants import UWS_QUEUE_NAME
from .constants import UWS_DATABASE_TIMEOUT, UWS_QUEUE_NAME
from .exceptions import UWSError
from .handlers import (
install_async_post_handler,
Expand Down Expand Up @@ -95,10 +94,13 @@ async def startup(ctx: dict[Any, Any]) -> None:
async def shutdown(ctx: dict[Any, Any]) -> None:
await close_uws_worker_context(ctx)

# Running 10 jobs simultaneously is the arq default as of arq 0.26.0
# and seems reasonable for database workers.
return WorkerSettings(
functions=[uws_job_started, uws_job_completed],
redis_settings=self._config.arq_redis_settings,
job_timeout=timedelta(seconds=30),
job_timeout=UWS_DATABASE_TIMEOUT,
max_jobs=10,
queue_name=UWS_QUEUE_NAME,
on_startup=startup,
on_shutdown=shutdown,
Expand Down
21 changes: 12 additions & 9 deletions src/vocutouts/uws/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,6 @@ class encapsulates the configuration of the UWS component that may vary by
database_url: str
"""URL for the metadata database."""

execution_duration: timedelta
"""Maximum execution time in seconds.

Jobs that run longer than this length of time will be automatically
aborted.
"""

lifetime: timedelta
"""The lifetime of jobs.

Expand Down Expand Up @@ -127,6 +120,15 @@ class encapsulates the configuration of the UWS component that may vary by
database_password: SecretStr | None = None
"""Password for the database."""

execution_duration: timedelta = timedelta(seconds=0)
"""Maximum execution time in seconds.

Jobs that run longer than this length of time should be automatically
aborted. However, currently the backend does not support cancelling jobs,
and therefore the only correct value is 0, which indicates that the
execution duration of the job is unlimited.
"""

slack_webhook: SecretStr | None = None
"""Slack incoming webhook for reporting errors."""

Expand Down Expand Up @@ -167,8 +169,9 @@ class encapsulates the configuration of the UWS component that may vary by

If provided, called with the requested execution duration and the current
job record and should return the new execution duration time. Otherwise,
any execution duration time shorter than the configured maximum timeout
will be allowed.
the execution duration may not be changed. Note that the current backend
does not support cancelling jobs and therefore does not support execution
duration values other than 0.
"""

wait_timeout: timedelta = timedelta(minutes=1)
Expand Down
4 changes: 4 additions & 0 deletions src/vocutouts/uws/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

__all__ = [
"JOB_RESULT_TIMEOUT",
"UWS_DATABASE_TIMEOUT",
"UWS_QUEUE_NAME",
]

JOB_RESULT_TIMEOUT = timedelta(seconds=5)
"""How long to poll arq for job results before giving up."""

UWS_DATABASE_TIMEOUT = timedelta(seconds=30)
"""Timeout on workers that update the UWS database."""

UWS_QUEUE_NAME = "uws:queue"
"""Name of the arq queue for internal UWS messages."""
11 changes: 7 additions & 4 deletions src/vocutouts/uws/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,12 +430,15 @@ async def update_execution_duration(
if job.owner != user:
raise PermissionDeniedError(f"Access to job {job_id} denied")

# Validate the new value.
# Validate the new value. Only support changes to execution duration
# if a validator is set, which is a signal that the application
# supports cancellation of jobs. The current implementation does not
# support cancelling jobs and therefore cannot enforce a timeout, so
# an execution duration of 0 is currently the only correct value.
if validator := self._config.validate_execution_duration:
duration = validator(duration, job)
elif duration > self._config.execution_duration:
duration = self._config.execution_duration

else:
return None
if duration == job.execution_duration:
return None
await self._storage.update_execution_duration(job_id, duration)
Expand Down
45 changes: 39 additions & 6 deletions src/vocutouts/uws/uwsworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class returned by other functions.
job_timeout: SecondsTimedelta
"""Maximum timeout for all jobs."""

max_jobs: int
"""Maximum number of jobs that can be run at one time."""

queue_name: str = default_queue_name
"""Name of arq queue to listen to for jobs."""

Expand All @@ -97,9 +100,6 @@ class returned by other functions.
on_shutdown: StartupShutdown | None = None
"""Coroutine to run on shutdown."""

allow_abort_jobs: bool = False
"""Whether to allow jobs to be aborted."""


@dataclass
class WorkerJobInfo:
Expand All @@ -115,7 +115,11 @@ class WorkerJobInfo:
"""Delegated Gafaelfawr token to act on behalf of the user."""

timeout: timedelta
"""Maximum execution time for the job."""
"""Maximum execution time for the job.

Currently, this is ignored, since the backend workers do not support
cancellation.
"""

run_id: str | None = None
"""User-supplied run ID, if any."""
Expand Down Expand Up @@ -164,6 +168,27 @@ def build_worker(
UWS worker configuration.
logger
Logger to use for messages.

Notes
-----
Timeouts and aborting jobs unfortunately are not supported due to
limitations in `concurrent.futures.ThreadPoolExecutor`. Once a thread has
been started, there is no way to stop it until it completes on its own.
Therefore, no job timeout is set or supported, and the timeout set on the
job (which comes from executionduration) is ignored.

Fixing this appears to be difficult since Python's `threading.Thread`
simply does not support cancellation. It would probably require rebuilding
the worker model on top of processes and killing those processes on
timeout. That would pose problems for cleanup of any temporary resources
created by the process such as temporary files, since Python cleanup code
would not be run.

The best fix would be for backend code to be rewritten to be async, so
await would become a cancellation point (although this still may not be
enough for compute-heavy code that doesn't use await frequently). However,
the Rubin pipelines code is all sync, so async worker support has not yet
been added due to lack of demand.
"""

async def startup(ctx: dict[Any, Any]) -> None:
Expand Down Expand Up @@ -218,11 +243,19 @@ async def run(
finally:
await arq.enqueue("job_completed", info.job_id)

# Job timeouts are not actually supported since we have no way of stopping
# the sync worker. A timeout will just leave the previous worker running
# and will block all future jobs. Set it to an extremely long value, since
# it can't be disabled entirely.
#
# Since the worker is running sync jobs, run one job per pod since they
# will be serialized anyway and no parallelism is possible. If async
# worker support is added, consider making this configurable.
return WorkerSettings(
functions=[func(run, name=worker.__qualname__)],
redis_settings=config.arq_redis_settings,
job_timeout=config.timeout,
job_timeout=3600,
max_jobs=1,
on_startup=startup,
on_shutdown=shutdown,
allow_abort_jobs=True,
)
4 changes: 2 additions & 2 deletions tests/handlers/async_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<uws:ownerId>someone</uws:ownerId>
<uws:phase>PENDING</uws:phase>
<uws:creationTime>[DATE]</uws:creationTime>
<uws:executionDuration>600</uws:executionDuration>
<uws:executionDuration>0</uws:executionDuration>
<uws:destruction>[DATE]</uws:destruction>
<uws:parameters>
<uws:parameter id="id" isPost="true">1:2:band:value</uws:parameter>
Expand All @@ -50,7 +50,7 @@
<uws:creationTime>[DATE]</uws:creationTime>
<uws:startTime>[DATE]</uws:startTime>
<uws:endTime>[DATE]</uws:endTime>
<uws:executionDuration>600</uws:executionDuration>
<uws:executionDuration>0</uws:executionDuration>
<uws:destruction>[DATE]</uws:destruction>
<uws:parameters>
<uws:parameter id="id" isPost="true">1:2:band:value</uws:parameter>
Expand Down
1 change: 0 additions & 1 deletion tests/support/uws.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def build_uws_config() -> UWSConfig:
async_post_dependency=_post_dependency,
database_url=database_url,
database_password=SecretStr(os.environ["POSTGRES_PASSWORD"]),
execution_duration=timedelta(minutes=10),
lifetime=timedelta(days=1),
parameters_type=SimpleParameters,
signing_service_account="[email protected]",
Expand Down
12 changes: 4 additions & 8 deletions tests/uws/job_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<uws:ownerId>user</uws:ownerId>
<uws:phase>{}</uws:phase>
<uws:creationTime>{}</uws:creationTime>
<uws:executionDuration>{}</uws:executionDuration>
<uws:executionDuration>0</uws:executionDuration>
<uws:destruction>{}</uws:destruction>
<uws:parameters>
<uws:parameter id="name" isPost="true">Jane</uws:parameter>
Expand All @@ -56,7 +56,7 @@
<uws:creationTime>{}</uws:creationTime>
<uws:startTime>{}</uws:startTime>
<uws:endTime>{}</uws:endTime>
<uws:executionDuration>600</uws:executionDuration>
<uws:executionDuration>0</uws:executionDuration>
<uws:destruction>{}</uws:destruction>
<uws:parameters>
<uws:parameter id="name" isPost="true">Jane</uws:parameter>
Expand Down Expand Up @@ -120,7 +120,6 @@ async def test_job_run(
"1",
"PENDING",
isodatetime(job.creation_time),
"600",
isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)),
)

Expand Down Expand Up @@ -155,7 +154,6 @@ async def test_job_run(
"1",
"QUEUED",
isodatetime(job.creation_time),
"600",
isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)),
)
await runner.mark_in_progress("user", "1")
Expand Down Expand Up @@ -239,7 +237,6 @@ async def test_job_api(
"1",
"PENDING",
isodatetime(job.creation_time),
"600",
isodatetime(destruction_time),
)

Expand All @@ -257,7 +254,7 @@ async def test_job_api(
)
assert r.status_code == 200
assert r.headers["Content-Type"] == "text/plain; charset=utf-8"
assert r.text == "600"
assert r.text == "0"

r = await client.get(
"/test/jobs/1/owner", headers={"X-Auth-Request-User": "user"}
Expand Down Expand Up @@ -297,6 +294,7 @@ async def test_job_api(
assert r.status_code == 303
assert r.headers["Location"] == "https://example.com/test/jobs/1"

# Changing the execution duration is not supported.
r = await client.post(
"/test/jobs/1/executionduration",
headers={"X-Auth-Request-User": "user"},
Expand All @@ -315,7 +313,6 @@ async def test_job_api(
"1",
"PENDING",
isodatetime(job.creation_time),
"300",
isodatetime(now),
)

Expand Down Expand Up @@ -346,7 +343,6 @@ async def test_job_api(
"2",
"PENDING",
isodatetime(job.creation_time),
"600",
isodatetime(job.destruction_time),
)
r = await client.post(
Expand Down
2 changes: 1 addition & 1 deletion tests/uws/job_error_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<uws:creationTime>{}</uws:creationTime>
<uws:startTime>{}</uws:startTime>
<uws:endTime>{}</uws:endTime>
<uws:executionDuration>600</uws:executionDuration>
<uws:executionDuration>0</uws:executionDuration>
<uws:destruction>{}</uws:destruction>
<uws:parameters>
<uws:parameter id="name">Sarah</uws:parameter>
Expand Down
6 changes: 3 additions & 3 deletions tests/uws/long_polling_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<uws:ownerId>user</uws:ownerId>
<uws:phase>{}</uws:phase>
<uws:creationTime>{}</uws:creationTime>
<uws:executionDuration>600</uws:executionDuration>
<uws:executionDuration>0</uws:executionDuration>
<uws:destruction>{}</uws:destruction>
<uws:parameters>
<uws:parameter id="name">Naomi</uws:parameter>
Expand All @@ -47,7 +47,7 @@
<uws:phase>EXECUTING</uws:phase>
<uws:creationTime>{}</uws:creationTime>
<uws:startTime>{}</uws:startTime>
<uws:executionDuration>600</uws:executionDuration>
<uws:executionDuration>0</uws:executionDuration>
<uws:destruction>{}</uws:destruction>
<uws:parameters>
<uws:parameter id="name">Naomi</uws:parameter>
Expand All @@ -69,7 +69,7 @@
<uws:creationTime>{}</uws:creationTime>
<uws:startTime>{}</uws:startTime>
<uws:endTime>{}</uws:endTime>
<uws:executionDuration>600</uws:executionDuration>
<uws:executionDuration>0</uws:executionDuration>
<uws:destruction>{}</uws:destruction>
<uws:parameters>
<uws:parameter id="name">Naomi</uws:parameter>
Expand Down
2 changes: 0 additions & 2 deletions tests/uws/workers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def hello(
assert settings.functions[0].name == hello.__qualname__
assert settings.redis_settings == uws_config.arq_redis_settings
assert settings.queue_name == default_queue_name
assert settings.allow_abort_jobs
assert settings.on_startup
assert settings.on_shutdown

Expand Down Expand Up @@ -150,7 +149,6 @@ async def test_build_uws_worker(
assert callable(job_completed)
assert settings.redis_settings == uws_config.arq_redis_settings
assert settings.queue_name == UWS_QUEUE_NAME
assert not settings.allow_abort_jobs
assert settings.on_startup
assert settings.on_shutdown

Expand Down