diff --git a/changelog.d/20240613_185120_rra_DM_44763.md b/changelog.d/20240613_185120_rra_DM_44763.md
new file mode 100644
index 0000000..6860976
--- /dev/null
+++ b/changelog.d/20240613_185120_rra_DM_44763.md
@@ -0,0 +1,4 @@
+### Backwards-incompatible changes
+
+- Cancelling or aborting jobs is not supported by the combination of arq and sync worker functions. Properly reflect this in job metadata by forcing execution duration to 0 to indicate that no limit is applied. Substantially increase the default arq job timeout since the timeout will be ineffective anyway.
+- Drop the `CUTOUT_TIMEOUT` configuration option since we have no way of enforcing a timeout on jobs.
diff --git a/src/vocutouts/config.py b/src/vocutouts/config.py
index 6b884ea..f23935d 100644
--- a/src/vocutouts/config.py
+++ b/src/vocutouts/config.py
@@ -104,11 +104,12 @@ class Config(BaseSettings):
)
sync_timeout: timedelta = Field(
- timedelta(minutes=1), title="Timeout for sync requests"
- )
-
- timeout: timedelta = Field(
- timedelta(minutes=10), title="Timeout for cutout jobs"
+ timedelta(minutes=1),
+ title="Timeout for sync requests",
+ description=(
+ "The job will continue running as an async job beyond this"
+ " timeout since cancellation of jobs is not currently supported."
+ ),
)
tmpdir: Path = Field(Path("/tmp"), title="Temporary directory for workers")
@@ -178,7 +179,7 @@ def _validate_arq_queue_url(cls, v: RedisDsn) -> RedisDsn:
)
return v
- @field_validator("lifetime", "sync_timeout", "timeout", mode="before")
+ @field_validator("lifetime", "sync_timeout", mode="before")
@classmethod
def _parse_as_seconds(cls, v: int | str | timedelta) -> int | timedelta:
"""Convert timedelta strings so they are parsed as seconds."""
@@ -213,7 +214,6 @@ def uws_config(self) -> UWSConfig:
return UWSConfig(
arq_mode=self.arq_mode,
arq_redis_settings=self.arq_redis_settings,
- execution_duration=self.timeout,
lifetime=self.lifetime,
parameters_type=CutoutParameters,
signing_service_account=self.service_account,
diff --git a/src/vocutouts/uws/app.py b/src/vocutouts/uws/app.py
index b47c5cf..fa74d34 100644
--- a/src/vocutouts/uws/app.py
+++ b/src/vocutouts/uws/app.py
@@ -2,7 +2,6 @@
from __future__ import annotations
-from datetime import timedelta
from typing import Any
from fastapi import APIRouter, FastAPI, Request
@@ -13,7 +12,7 @@
from . import schema
from .config import UWSConfig
-from .constants import UWS_QUEUE_NAME
+from .constants import UWS_DATABASE_TIMEOUT, UWS_QUEUE_NAME
from .exceptions import UWSError
from .handlers import (
install_async_post_handler,
@@ -95,10 +94,13 @@ async def startup(ctx: dict[Any, Any]) -> None:
async def shutdown(ctx: dict[Any, Any]) -> None:
await close_uws_worker_context(ctx)
+ # Running 10 jobs simultaneously is the arq default as of arq 0.26.0
+ # and seems reasonable for database workers.
return WorkerSettings(
functions=[uws_job_started, uws_job_completed],
redis_settings=self._config.arq_redis_settings,
- job_timeout=timedelta(seconds=30),
+ job_timeout=UWS_DATABASE_TIMEOUT,
+ max_jobs=10,
queue_name=UWS_QUEUE_NAME,
on_startup=startup,
on_shutdown=shutdown,
diff --git a/src/vocutouts/uws/config.py b/src/vocutouts/uws/config.py
index 5797a1e..3f6567b 100644
--- a/src/vocutouts/uws/config.py
+++ b/src/vocutouts/uws/config.py
@@ -91,13 +91,6 @@ class encapsulates the configuration of the UWS component that may vary by
database_url: str
"""URL for the metadata database."""
- execution_duration: timedelta
- """Maximum execution time in seconds.
-
- Jobs that run longer than this length of time will be automatically
- aborted.
- """
-
lifetime: timedelta
"""The lifetime of jobs.
@@ -127,6 +120,15 @@ class encapsulates the configuration of the UWS component that may vary by
database_password: SecretStr | None = None
"""Password for the database."""
+ execution_duration: timedelta = timedelta(seconds=0)
+ """Maximum execution time in seconds.
+
+ Jobs that run longer than this length of time should be automatically
+ aborted. However, currently the backend does not support cancelling jobs,
+ and therefore the only correct value is 0, which indicates that the
+ execution duration of the job is unlimited.
+ """
+
slack_webhook: SecretStr | None = None
"""Slack incoming webhook for reporting errors."""
@@ -167,8 +169,9 @@ class encapsulates the configuration of the UWS component that may vary by
If provided, called with the requested execution duration and the current
job record and should return the new execution duration time. Otherwise,
- any execution duration time shorter than the configured maximum timeout
- will be allowed.
+ the execution duration may not be changed. Note that the current backend
+ does not support cancelling jobs and therefore does not support execution
+ duration values other than 0.
"""
wait_timeout: timedelta = timedelta(minutes=1)
diff --git a/src/vocutouts/uws/constants.py b/src/vocutouts/uws/constants.py
index 9d7ddf5..cec101b 100644
--- a/src/vocutouts/uws/constants.py
+++ b/src/vocutouts/uws/constants.py
@@ -6,11 +6,15 @@
__all__ = [
"JOB_RESULT_TIMEOUT",
+ "UWS_DATABASE_TIMEOUT",
"UWS_QUEUE_NAME",
]
JOB_RESULT_TIMEOUT = timedelta(seconds=5)
"""How long to poll arq for job results before giving up."""
+UWS_DATABASE_TIMEOUT = timedelta(seconds=30)
+"""Timeout on workers that update the UWS database."""
+
UWS_QUEUE_NAME = "uws:queue"
"""Name of the arq queue for internal UWS messages."""
diff --git a/src/vocutouts/uws/service.py b/src/vocutouts/uws/service.py
index 411f013..6170b9c 100644
--- a/src/vocutouts/uws/service.py
+++ b/src/vocutouts/uws/service.py
@@ -430,12 +430,15 @@ async def update_execution_duration(
if job.owner != user:
raise PermissionDeniedError(f"Access to job {job_id} denied")
- # Validate the new value.
+ # Validate the new value. Only support changes to execution duration
+ # if a validator is set, which is a signal that the application
+ # supports cancellation of jobs. The current implementation does not
+ # support cancelling jobs and therefore cannot enforce a timeout, so
+ # an execution duration of 0 is currently the only correct value.
if validator := self._config.validate_execution_duration:
duration = validator(duration, job)
- elif duration > self._config.execution_duration:
- duration = self._config.execution_duration
-
+ else:
+ return None
if duration == job.execution_duration:
return None
await self._storage.update_execution_duration(job_id, duration)
diff --git a/src/vocutouts/uws/uwsworker.py b/src/vocutouts/uws/uwsworker.py
index b79155c..386316e 100644
--- a/src/vocutouts/uws/uwsworker.py
+++ b/src/vocutouts/uws/uwsworker.py
@@ -88,6 +88,9 @@ class returned by other functions.
job_timeout: SecondsTimedelta
"""Maximum timeout for all jobs."""
+ max_jobs: int
+ """Maximum number of jobs that can be run at one time."""
+
queue_name: str = default_queue_name
"""Name of arq queue to listen to for jobs."""
@@ -97,9 +100,6 @@ class returned by other functions.
on_shutdown: StartupShutdown | None = None
"""Coroutine to run on shutdown."""
- allow_abort_jobs: bool = False
- """Whether to allow jobs to be aborted."""
-
@dataclass
class WorkerJobInfo:
@@ -115,7 +115,11 @@ class WorkerJobInfo:
"""Delegated Gafaelfawr token to act on behalf of the user."""
timeout: timedelta
- """Maximum execution time for the job."""
+ """Maximum execution time for the job.
+
+ Currently, this is ignored, since the backend workers do not support
+ cancellation.
+ """
run_id: str | None = None
"""User-supplied run ID, if any."""
@@ -164,6 +168,27 @@ def build_worker(
UWS worker configuration.
logger
Logger to use for messages.
+
+ Notes
+ -----
+ Timeouts and aborting jobs unfortunately are not supported due to
+ limitations in `concurrent.futures.ThreadPoolExecutor`. Once a thread has
+ been started, there is no way to stop it until it completes on its own.
+ Therefore, no job timeout is set or supported, and the timeout set on the
+ job (which comes from executionduration) is ignored.
+
+ Fixing this appears to be difficult since Python's `threading.Thread`
+ simply does not support cancellation. It would probably require rebuilding
+ the worker model on top of processes and killing those processes on
+ timeout. That would pose problems for cleanup of any temporary resources
+ created by the process such as temporary files, since Python cleanup code
+ would not be run.
+
+ The best fix would be for backend code to be rewritten to be async, so
+ await would become a cancellation point (although this still may not be
+ enough for compute-heavy code that doesn't use await frequently). However,
+ the Rubin pipelines code is all sync, so async worker support has not yet
+ been added due to lack of demand.
"""
async def startup(ctx: dict[Any, Any]) -> None:
@@ -218,11 +243,19 @@ async def run(
finally:
await arq.enqueue("job_completed", info.job_id)
+ # Job timeouts are not actually supported since we have no way of stopping
+ # the sync worker. A timeout will just leave the previous worker running
+ # and will block all future jobs. Set it to an extremely long value, since
+ # it can't be disabled entirely.
+ #
+ # Since the worker is running sync jobs, run one job per pod since they
+ # will be serialized anyway and no parallelism is possible. If async
+ # worker support is added, consider making this configurable.
return WorkerSettings(
functions=[func(run, name=worker.__qualname__)],
redis_settings=config.arq_redis_settings,
- job_timeout=config.timeout,
+ job_timeout=3600,
+ max_jobs=1,
on_startup=startup,
on_shutdown=shutdown,
- allow_abort_jobs=True,
)
diff --git a/tests/handlers/async_test.py b/tests/handlers/async_test.py
index e905a8f..79886ae 100644
--- a/tests/handlers/async_test.py
+++ b/tests/handlers/async_test.py
@@ -26,7 +26,7 @@
someone
PENDING
[DATE]
- 600
+ 0
[DATE]
1:2:band:value
@@ -50,7 +50,7 @@
[DATE]
[DATE]
[DATE]
- 600
+ 0
[DATE]
1:2:band:value
diff --git a/tests/support/uws.py b/tests/support/uws.py
index fb23b75..2d79111 100644
--- a/tests/support/uws.py
+++ b/tests/support/uws.py
@@ -67,7 +67,6 @@ def build_uws_config() -> UWSConfig:
async_post_dependency=_post_dependency,
database_url=database_url,
database_password=SecretStr(os.environ["POSTGRES_PASSWORD"]),
- execution_duration=timedelta(minutes=10),
lifetime=timedelta(days=1),
parameters_type=SimpleParameters,
signing_service_account="signer@example.com",
diff --git a/tests/uws/job_api_test.py b/tests/uws/job_api_test.py
index 5c0b096..7db1323 100644
--- a/tests/uws/job_api_test.py
+++ b/tests/uws/job_api_test.py
@@ -33,7 +33,7 @@
user
{}
{}
- {}
+ 0
{}
Jane
@@ -56,7 +56,7 @@
{}
{}
{}
- 600
+ 0
{}
Jane
@@ -120,7 +120,6 @@ async def test_job_run(
"1",
"PENDING",
isodatetime(job.creation_time),
- "600",
isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)),
)
@@ -155,7 +154,6 @@ async def test_job_run(
"1",
"QUEUED",
isodatetime(job.creation_time),
- "600",
isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)),
)
await runner.mark_in_progress("user", "1")
@@ -239,7 +237,6 @@ async def test_job_api(
"1",
"PENDING",
isodatetime(job.creation_time),
- "600",
isodatetime(destruction_time),
)
@@ -257,7 +254,7 @@ async def test_job_api(
)
assert r.status_code == 200
assert r.headers["Content-Type"] == "text/plain; charset=utf-8"
- assert r.text == "600"
+ assert r.text == "0"
r = await client.get(
"/test/jobs/1/owner", headers={"X-Auth-Request-User": "user"}
@@ -297,6 +294,7 @@ async def test_job_api(
assert r.status_code == 303
assert r.headers["Location"] == "https://example.com/test/jobs/1"
+ # Changing the execution duration is not supported.
r = await client.post(
"/test/jobs/1/executionduration",
headers={"X-Auth-Request-User": "user"},
@@ -315,7 +313,6 @@ async def test_job_api(
"1",
"PENDING",
isodatetime(job.creation_time),
- "300",
isodatetime(now),
)
@@ -346,7 +343,6 @@ async def test_job_api(
"2",
"PENDING",
isodatetime(job.creation_time),
- "600",
isodatetime(job.destruction_time),
)
r = await client.post(
diff --git a/tests/uws/job_error_test.py b/tests/uws/job_error_test.py
index 82c362f..3d94e22 100644
--- a/tests/uws/job_error_test.py
+++ b/tests/uws/job_error_test.py
@@ -27,7 +27,7 @@
{}
{}
{}
- 600
+ 0
{}
Sarah
diff --git a/tests/uws/long_polling_test.py b/tests/uws/long_polling_test.py
index 72d2574..44b96fc 100644
--- a/tests/uws/long_polling_test.py
+++ b/tests/uws/long_polling_test.py
@@ -26,7 +26,7 @@
user
{}
{}
- 600
+ 0
{}
Naomi
@@ -47,7 +47,7 @@
EXECUTING
{}
{}
- 600
+ 0
{}
Naomi
@@ -69,7 +69,7 @@
{}
{}
{}
- 600
+ 0
{}
Naomi
diff --git a/tests/uws/workers_test.py b/tests/uws/workers_test.py
index 8f8ce59..a742459 100644
--- a/tests/uws/workers_test.py
+++ b/tests/uws/workers_test.py
@@ -69,7 +69,6 @@ def hello(
assert settings.functions[0].name == hello.__qualname__
assert settings.redis_settings == uws_config.arq_redis_settings
assert settings.queue_name == default_queue_name
- assert settings.allow_abort_jobs
assert settings.on_startup
assert settings.on_shutdown
@@ -150,7 +149,6 @@ async def test_build_uws_worker(
assert callable(job_completed)
assert settings.redis_settings == uws_config.arq_redis_settings
assert settings.queue_name == UWS_QUEUE_NAME
- assert not settings.allow_abort_jobs
assert settings.on_startup
assert settings.on_shutdown