From 4a95b132b8345e78b26ad57513220d1c0abf66e1 Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Thu, 13 Jun 2024 20:46:57 -0700 Subject: [PATCH] Move logging to the service layer Unify all of the logging into the UWS service layer and move it out of the handlers. Ensure that the logging includes the username. --- src/vocutouts/uws/handlers.py | 71 ++++++++++++----------------------- src/vocutouts/uws/service.py | 25 +++++++++--- 2 files changed, 44 insertions(+), 52 deletions(-) diff --git a/src/vocutouts/uws/handlers.py b/src/vocutouts/uws/handlers.py index 6280098..ff6ede4 100644 --- a/src/vocutouts/uws/handlers.py +++ b/src/vocutouts/uws/handlers.py @@ -92,8 +92,8 @@ async def get_job_list( summary="Job details", ) async def get_job( - *, job_id: str, + *, request: Request, wait: Annotated[ int | None, @@ -135,16 +135,14 @@ async def get_job( summary="Delete a job", ) async def delete_job( - *, job_id: str, + *, request: Request, user: Annotated[str, Depends(auth_dependency)], uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], - logger: Annotated[BoundLogger, Depends(auth_logger_dependency)], ) -> str: job_service = uws_factory.create_job_service() await job_service.delete(user, job_id) - logger.info("Deleted job", job_id=job_id) return str(request.url_for("get_job_list")) @@ -158,8 +156,8 @@ async def delete_job( summary="Delete a job", ) async def delete_job_via_post( - *, job_id: str, + *, request: Request, action: Annotated[ Literal["DELETE"] | None, @@ -173,7 +171,6 @@ async def delete_job_via_post( ], user: Annotated[str, Depends(auth_dependency)], uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], - logger: Annotated[BoundLogger, Depends(auth_logger_dependency)], ) -> str: # Work around the obnoxious requirement for case-insensitive parameters, # which is also why the action parameter is declared as optional (but is @@ -191,7 +188,6 @@ async def delete_job_via_post( # Do the actual deletion. job_service = uws_factory.create_job_service() await job_service.delete(user, job_id) - logger.info("Deleted job", job_id=job_id) return str(request.url_for("get_job_list")) @@ -202,6 +198,7 @@ async def delete_job_via_post( ) async def get_job_destruction( job_id: str, + *, user: Annotated[str, Depends(auth_dependency)], uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], ) -> str: @@ -217,8 +214,8 @@ async def get_job_destruction( summary="Change job destruction time", ) async def post_job_destruction( - *, job_id: str, + *, request: Request, destruction: Annotated[ datetime | None, @@ -233,7 +230,6 @@ async def post_job_destruction( ], user: Annotated[str, Depends(auth_dependency)], uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], - logger: Annotated[BoundLogger, Depends(auth_logger_dependency)], ) -> str: # Work around the obnoxious requirement for case-insensitive parameters. for param in params: @@ -247,20 +243,9 @@ async def post_job_destruction( if not destruction: raise ParameterError("No new destruction time given") - # Update the destruction time. Note that the policy layer may modify the - # destruction time, so the time set may not match the input. - # update_destruction returns the actual time set, or None if the time was - # not changed. + # Update the destruction time. job_service = uws_factory.create_job_service() - new_destruction = await job_service.update_destruction( - user, job_id, destruction - ) - if new_destruction: - logger.info( - "Changed job destruction time", - job_id=job_id, - destruction=isodatetime(new_destruction), - ) + await job_service.update_destruction(user, job_id, destruction) return str(request.url_for("get_job", job_id=job_id)) @@ -273,8 +258,8 @@ async def post_job_destruction( summary="Job error", ) async def get_job_error( - *, job_id: str, + *, request: Request, user: Annotated[str, Depends(auth_dependency)], uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], @@ -294,6 +279,7 @@ async def get_job_error( ) async def get_job_execution_duration( job_id: str, + *, user: Annotated[str, Depends(auth_dependency)], uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], ) -> str: @@ -309,8 +295,8 @@ async def get_job_execution_duration( summary="Change job execution duration", ) async def post_job_execution_duration( - *, job_id: str, + *, request: Request, executionduration: Annotated[ int | None, @@ -325,7 +311,6 @@ async def post_job_execution_duration( ], user: Annotated[str, Depends(auth_dependency)], uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], - logger: Annotated[BoundLogger, Depends(auth_logger_dependency)], ) -> str: # Work around the obnoxious requirement for case-insensitive parameters. for param in params: @@ -346,15 +331,8 @@ async def post_job_execution_duration( # update_execution_duration returns the new execution duration set, or # None if it was not changed. job_service = uws_factory.create_job_service() - new_executionduration = await job_service.update_execution_duration( - user, job_id, timedelta(seconds=executionduration) - ) - if new_executionduration is not None: - logger.info( - "Changed job execution duration", - job_id=job_id, - duration=f"{new_executionduration}s", - ) + duration = timedelta(seconds=executionduration) + await job_service.update_execution_duration(user, job_id, duration) return str(request.url_for("get_job", job_id=job_id)) @@ -365,6 +343,7 @@ async def post_job_execution_duration( ) async def get_job_owner( job_id: str, + *, user: Annotated[str, Depends(auth_dependency)], uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], ) -> str: @@ -379,8 +358,8 @@ async def get_job_owner( summary="Job parameters", ) async def get_job_parameters( - *, job_id: str, + *, request: Request, user: Annotated[str, Depends(auth_dependency)], uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], @@ -398,6 +377,7 @@ async def get_job_parameters( ) async def get_job_phase( job_id: str, + *, user: Annotated[str, Depends(auth_dependency)], uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], ) -> str: @@ -413,8 +393,8 @@ async def get_job_phase( summary="Start or abort job", ) async def post_job_phase( - *, job_id: str, + *, request: Request, phase: Annotated[ Literal["RUN", "ABORT"] | None, @@ -442,14 +422,14 @@ async def post_job_phase( if not phase: raise ParameterError("No new phase given") - # Dramatiq doesn't support aborting jobs, so currently neither do we. + # We cannot abort arq jobs because we're using sync workers, and Python + # doesn't have any way to cancel them. if phase == "ABORT": raise PermissionDeniedError("Aborting jobs is not supported") # The only remaining case is starting the job. job_service = uws_factory.create_job_service() - metadata = await job_service.start(user, job_id, access_token) - logger.info("Started job", job_id=job_id, arq_job_id=metadata.id) + await job_service.start(user, job_id, access_token) return str(request.url_for("get_job", job_id=job_id)) @@ -460,6 +440,7 @@ async def post_job_phase( ) async def get_job_quote( job_id: str, + *, user: Annotated[str, Depends(auth_dependency)], uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], ) -> str: @@ -479,8 +460,8 @@ async def get_job_quote( summary="Job results", ) async def get_job_results( - *, job_id: str, + *, request: Request, user: Annotated[str, Depends(auth_dependency)], uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], @@ -512,19 +493,15 @@ def install_async_post_handler( ) async def create_job( *, - parameters: Annotated[list[UWSJobParameter], Depends(dependency)], + request: Request, phase: Annotated[ Literal["RUN"] | None, Query(title="Immediately start job") ] = None, + parameters: Annotated[list[UWSJobParameter], Depends(dependency)], runid: Annotated[str | None, Depends(runid_post_dependency)], - params: Annotated[ - list[UWSJobParameter], Depends(uws_post_params_dependency) - ], - request: Request, user: Annotated[str, Depends(auth_dependency)], token: Annotated[str, Depends(auth_delegated_token_dependency)], uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], - logger: Annotated[BoundLogger, Depends(auth_logger_dependency)], ) -> str: job_service = uws_factory.create_job_service() job = await job_service.create(user, run_id=runid, params=parameters) @@ -587,7 +564,6 @@ def install_sync_get_handler( ) async def get_sync( *, - params: Annotated[list[UWSJobParameter], Depends(dependency)], runid: Annotated[ str | None, Query( @@ -599,6 +575,7 @@ async def get_sync( ), ), ] = None, + params: Annotated[list[UWSJobParameter], Depends(dependency)], user: Annotated[str, Depends(auth_dependency)], token: Annotated[str, Depends(auth_delegated_token_dependency)], uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], diff --git a/src/vocutouts/uws/service.py b/src/vocutouts/uws/service.py index 6170b9c..2872aca 100644 --- a/src/vocutouts/uws/service.py +++ b/src/vocutouts/uws/service.py @@ -6,7 +6,7 @@ from datetime import datetime, timedelta from safir.arq import ArqQueue, JobMetadata -from safir.datetime import current_datetime +from safir.datetime import current_datetime, isodatetime from structlog.stdlib import BoundLogger from .config import ParametersModel, UWSConfig @@ -136,7 +136,8 @@ async def delete( job = await self._storage.get(job_id) if job.owner != user: raise PermissionDeniedError(f"Access to job {job_id} denied") - return await self._storage.delete(job_id) + await self._storage.delete(job_id) + self._logger.info("Deleted job", user=user, job_id=job_id) async def get( self, @@ -283,12 +284,10 @@ async def run_sync( job = await self.create(user, params, run_id=runid) params_model = self._validate_parameters(params) logger = self._build_logger_for_job(job, params_model) - logger.info("Created job") # Start the job and wait for it to complete. metadata = await self.start(user, job.job_id, token) logger = logger.bind(arq_job_id=metadata.id) - logger.info("Started job") job = await self.get( user, job.job_id, @@ -396,6 +395,12 @@ async def update_destruction( if destruction == job.destruction_time: return None await self._storage.update_destruction(job_id, destruction) + self._logger.info( + "Changed job destruction time", + user=user, + job_id=job_id, + destruction=isodatetime(destruction), + ) return destruction async def update_execution_duration( @@ -442,6 +447,16 @@ async def update_execution_duration( if duration == job.execution_duration: return None await self._storage.update_execution_duration(job_id, duration) + if duration.total_seconds() > 0: + duration_str = f"{duration.total_seconds()}s" + else: + duration_str = "unlimited" + self._logger.info( + "Changed job execution duration", + user=user, + job_id=job_id, + duration=duration_str, + ) return duration def _build_logger_for_job( @@ -461,7 +476,7 @@ def _build_logger_for_job( BoundLogger Logger with more bound metadata. """ - logger = self._logger.bind(job_id=job.job_id) + logger = self._logger.bind(user=job.owner, job_id=job.job_id) if job.run_id: logger = logger.bind(run_id=job.run_id) if params: