Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-44763: Move logging to the service layer #179

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 24 additions & 47 deletions src/vocutouts/uws/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ async def get_job_list(
summary="Job details",
)
async def get_job(
*,
job_id: str,
*,
request: Request,
wait: Annotated[
int | None,
Expand Down Expand Up @@ -135,16 +135,14 @@ async def get_job(
summary="Delete a job",
)
async def delete_job(
*,
job_id: str,
*,
request: Request,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
job_service = uws_factory.create_job_service()
await job_service.delete(user, job_id)
logger.info("Deleted job", job_id=job_id)
return str(request.url_for("get_job_list"))


Expand All @@ -158,8 +156,8 @@ async def delete_job(
summary="Delete a job",
)
async def delete_job_via_post(
*,
job_id: str,
*,
request: Request,
action: Annotated[
Literal["DELETE"] | None,
Expand All @@ -173,7 +171,6 @@ async def delete_job_via_post(
],
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
# Work around the obnoxious requirement for case-insensitive parameters,
# which is also why the action parameter is declared as optional (but is
Expand All @@ -191,7 +188,6 @@ async def delete_job_via_post(
# Do the actual deletion.
job_service = uws_factory.create_job_service()
await job_service.delete(user, job_id)
logger.info("Deleted job", job_id=job_id)
return str(request.url_for("get_job_list"))


Expand All @@ -202,6 +198,7 @@ async def delete_job_via_post(
)
async def get_job_destruction(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -217,8 +214,8 @@ async def get_job_destruction(
summary="Change job destruction time",
)
async def post_job_destruction(
*,
job_id: str,
*,
request: Request,
destruction: Annotated[
datetime | None,
Expand All @@ -233,7 +230,6 @@ async def post_job_destruction(
],
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
# Work around the obnoxious requirement for case-insensitive parameters.
for param in params:
Expand All @@ -247,20 +243,9 @@ async def post_job_destruction(
if not destruction:
raise ParameterError("No new destruction time given")

# Update the destruction time. Note that the policy layer may modify the
# destruction time, so the time set may not match the input.
# update_destruction returns the actual time set, or None if the time was
# not changed.
# Update the destruction time.
job_service = uws_factory.create_job_service()
new_destruction = await job_service.update_destruction(
user, job_id, destruction
)
if new_destruction:
logger.info(
"Changed job destruction time",
job_id=job_id,
destruction=isodatetime(new_destruction),
)
await job_service.update_destruction(user, job_id, destruction)
return str(request.url_for("get_job", job_id=job_id))


Expand All @@ -273,8 +258,8 @@ async def post_job_destruction(
summary="Job error",
)
async def get_job_error(
*,
job_id: str,
*,
request: Request,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
Expand All @@ -294,6 +279,7 @@ async def get_job_error(
)
async def get_job_execution_duration(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -309,8 +295,8 @@ async def get_job_execution_duration(
summary="Change job execution duration",
)
async def post_job_execution_duration(
*,
job_id: str,
*,
request: Request,
executionduration: Annotated[
int | None,
Expand All @@ -325,7 +311,6 @@ async def post_job_execution_duration(
],
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
# Work around the obnoxious requirement for case-insensitive parameters.
for param in params:
Expand All @@ -346,15 +331,8 @@ async def post_job_execution_duration(
# update_execution_duration returns the new execution duration set, or
# None if it was not changed.
job_service = uws_factory.create_job_service()
new_executionduration = await job_service.update_execution_duration(
user, job_id, timedelta(seconds=executionduration)
)
if new_executionduration is not None:
logger.info(
"Changed job execution duration",
job_id=job_id,
duration=f"{new_executionduration}s",
)
duration = timedelta(seconds=executionduration)
await job_service.update_execution_duration(user, job_id, duration)
return str(request.url_for("get_job", job_id=job_id))


Expand All @@ -365,6 +343,7 @@ async def post_job_execution_duration(
)
async def get_job_owner(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -379,8 +358,8 @@ async def get_job_owner(
summary="Job parameters",
)
async def get_job_parameters(
*,
job_id: str,
*,
request: Request,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
Expand All @@ -398,6 +377,7 @@ async def get_job_parameters(
)
async def get_job_phase(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -413,8 +393,8 @@ async def get_job_phase(
summary="Start or abort job",
)
async def post_job_phase(
*,
job_id: str,
*,
request: Request,
phase: Annotated[
Literal["RUN", "ABORT"] | None,
Expand Down Expand Up @@ -442,14 +422,14 @@ async def post_job_phase(
if not phase:
raise ParameterError("No new phase given")

# Dramatiq doesn't support aborting jobs, so currently neither do we.
# We cannot abort arq jobs because we're using sync workers, and Python
# doesn't have any way to cancel them.
if phase == "ABORT":
raise PermissionDeniedError("Aborting jobs is not supported")

# The only remaining case is starting the job.
job_service = uws_factory.create_job_service()
metadata = await job_service.start(user, job_id, access_token)
logger.info("Started job", job_id=job_id, arq_job_id=metadata.id)
await job_service.start(user, job_id, access_token)
return str(request.url_for("get_job", job_id=job_id))


Expand All @@ -460,6 +440,7 @@ async def post_job_phase(
)
async def get_job_quote(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -479,8 +460,8 @@ async def get_job_quote(
summary="Job results",
)
async def get_job_results(
*,
job_id: str,
*,
request: Request,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
Expand Down Expand Up @@ -512,19 +493,15 @@ def install_async_post_handler(
)
async def create_job(
*,
parameters: Annotated[list[UWSJobParameter], Depends(dependency)],
request: Request,
phase: Annotated[
Literal["RUN"] | None, Query(title="Immediately start job")
] = None,
parameters: Annotated[list[UWSJobParameter], Depends(dependency)],
runid: Annotated[str | None, Depends(runid_post_dependency)],
params: Annotated[
list[UWSJobParameter], Depends(uws_post_params_dependency)
],
request: Request,
user: Annotated[str, Depends(auth_dependency)],
token: Annotated[str, Depends(auth_delegated_token_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
job_service = uws_factory.create_job_service()
job = await job_service.create(user, run_id=runid, params=parameters)
Expand Down Expand Up @@ -587,7 +564,6 @@ def install_sync_get_handler(
)
async def get_sync(
*,
params: Annotated[list[UWSJobParameter], Depends(dependency)],
runid: Annotated[
str | None,
Query(
Expand All @@ -599,6 +575,7 @@ async def get_sync(
),
),
] = None,
params: Annotated[list[UWSJobParameter], Depends(dependency)],
user: Annotated[str, Depends(auth_dependency)],
token: Annotated[str, Depends(auth_delegated_token_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
Expand Down
25 changes: 20 additions & 5 deletions src/vocutouts/uws/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import datetime, timedelta

from safir.arq import ArqQueue, JobMetadata
from safir.datetime import current_datetime
from safir.datetime import current_datetime, isodatetime
from structlog.stdlib import BoundLogger

from .config import ParametersModel, UWSConfig
Expand Down Expand Up @@ -136,7 +136,8 @@ async def delete(
job = await self._storage.get(job_id)
if job.owner != user:
raise PermissionDeniedError(f"Access to job {job_id} denied")
return await self._storage.delete(job_id)
await self._storage.delete(job_id)
self._logger.info("Deleted job", user=user, job_id=job_id)

async def get(
self,
Expand Down Expand Up @@ -283,12 +284,10 @@ async def run_sync(
job = await self.create(user, params, run_id=runid)
params_model = self._validate_parameters(params)
logger = self._build_logger_for_job(job, params_model)
logger.info("Created job")

# Start the job and wait for it to complete.
metadata = await self.start(user, job.job_id, token)
logger = logger.bind(arq_job_id=metadata.id)
logger.info("Started job")
job = await self.get(
user,
job.job_id,
Expand Down Expand Up @@ -396,6 +395,12 @@ async def update_destruction(
if destruction == job.destruction_time:
return None
await self._storage.update_destruction(job_id, destruction)
self._logger.info(
"Changed job destruction time",
user=user,
job_id=job_id,
destruction=isodatetime(destruction),
)
return destruction

async def update_execution_duration(
Expand Down Expand Up @@ -442,6 +447,16 @@ async def update_execution_duration(
if duration == job.execution_duration:
return None
await self._storage.update_execution_duration(job_id, duration)
if duration.total_seconds() > 0:
duration_str = f"{duration.total_seconds()}s"
else:
duration_str = "unlimited"
self._logger.info(
"Changed job execution duration",
user=user,
job_id=job_id,
duration=duration_str,
)
return duration

def _build_logger_for_job(
Expand All @@ -461,7 +476,7 @@ def _build_logger_for_job(
BoundLogger
Logger with more bound metadata.
"""
logger = self._logger.bind(job_id=job.job_id)
logger = self._logger.bind(user=job.owner, job_id=job.job_id)
if job.run_id:
logger = logger.bind(run_id=job.run_id)
if params:
Expand Down