Skip to content

Commit

Permalink
Move logging to the service layer
Browse files Browse the repository at this point in the history
Unify all of the logging into the UWS service layer and move it out
of the handlers. Ensure that the logging includes the username.
  • Loading branch information
rra committed Jun 14, 2024
1 parent a5f8a7d commit 4a95b13
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 52 deletions.
71 changes: 24 additions & 47 deletions src/vocutouts/uws/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ async def get_job_list(
summary="Job details",
)
async def get_job(
*,
job_id: str,
*,
request: Request,
wait: Annotated[
int | None,
Expand Down Expand Up @@ -135,16 +135,14 @@ async def get_job(
summary="Delete a job",
)
async def delete_job(
*,
job_id: str,
*,
request: Request,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
job_service = uws_factory.create_job_service()
await job_service.delete(user, job_id)
logger.info("Deleted job", job_id=job_id)
return str(request.url_for("get_job_list"))


Expand All @@ -158,8 +156,8 @@ async def delete_job(
summary="Delete a job",
)
async def delete_job_via_post(
*,
job_id: str,
*,
request: Request,
action: Annotated[
Literal["DELETE"] | None,
Expand All @@ -173,7 +171,6 @@ async def delete_job_via_post(
],
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
# Work around the obnoxious requirement for case-insensitive parameters,
# which is also why the action parameter is declared as optional (but is
Expand All @@ -191,7 +188,6 @@ async def delete_job_via_post(
# Do the actual deletion.
job_service = uws_factory.create_job_service()
await job_service.delete(user, job_id)
logger.info("Deleted job", job_id=job_id)
return str(request.url_for("get_job_list"))


Expand All @@ -202,6 +198,7 @@ async def delete_job_via_post(
)
async def get_job_destruction(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -217,8 +214,8 @@ async def get_job_destruction(
summary="Change job destruction time",
)
async def post_job_destruction(
*,
job_id: str,
*,
request: Request,
destruction: Annotated[
datetime | None,
Expand All @@ -233,7 +230,6 @@ async def post_job_destruction(
],
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
# Work around the obnoxious requirement for case-insensitive parameters.
for param in params:
Expand All @@ -247,20 +243,9 @@ async def post_job_destruction(
if not destruction:
raise ParameterError("No new destruction time given")

# Update the destruction time. Note that the policy layer may modify the
# destruction time, so the time set may not match the input.
# update_destruction returns the actual time set, or None if the time was
# not changed.
# Update the destruction time.
job_service = uws_factory.create_job_service()
new_destruction = await job_service.update_destruction(
user, job_id, destruction
)
if new_destruction:
logger.info(
"Changed job destruction time",
job_id=job_id,
destruction=isodatetime(new_destruction),
)
await job_service.update_destruction(user, job_id, destruction)
return str(request.url_for("get_job", job_id=job_id))


Expand All @@ -273,8 +258,8 @@ async def post_job_destruction(
summary="Job error",
)
async def get_job_error(
*,
job_id: str,
*,
request: Request,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
Expand All @@ -294,6 +279,7 @@ async def get_job_error(
)
async def get_job_execution_duration(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -309,8 +295,8 @@ async def get_job_execution_duration(
summary="Change job execution duration",
)
async def post_job_execution_duration(
*,
job_id: str,
*,
request: Request,
executionduration: Annotated[
int | None,
Expand All @@ -325,7 +311,6 @@ async def post_job_execution_duration(
],
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
# Work around the obnoxious requirement for case-insensitive parameters.
for param in params:
Expand All @@ -346,15 +331,8 @@ async def post_job_execution_duration(
# update_execution_duration returns the new execution duration set, or
# None if it was not changed.
job_service = uws_factory.create_job_service()
new_executionduration = await job_service.update_execution_duration(
user, job_id, timedelta(seconds=executionduration)
)
if new_executionduration is not None:
logger.info(
"Changed job execution duration",
job_id=job_id,
duration=f"{new_executionduration}s",
)
duration = timedelta(seconds=executionduration)
await job_service.update_execution_duration(user, job_id, duration)
return str(request.url_for("get_job", job_id=job_id))


Expand All @@ -365,6 +343,7 @@ async def post_job_execution_duration(
)
async def get_job_owner(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -379,8 +358,8 @@ async def get_job_owner(
summary="Job parameters",
)
async def get_job_parameters(
*,
job_id: str,
*,
request: Request,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
Expand All @@ -398,6 +377,7 @@ async def get_job_parameters(
)
async def get_job_phase(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -413,8 +393,8 @@ async def get_job_phase(
summary="Start or abort job",
)
async def post_job_phase(
*,
job_id: str,
*,
request: Request,
phase: Annotated[
Literal["RUN", "ABORT"] | None,
Expand Down Expand Up @@ -442,14 +422,14 @@ async def post_job_phase(
if not phase:
raise ParameterError("No new phase given")

# Dramatiq doesn't support aborting jobs, so currently neither do we.
# We cannot abort arq jobs because we're using sync workers, and Python
# doesn't have any way to cancel them.
if phase == "ABORT":
raise PermissionDeniedError("Aborting jobs is not supported")

# The only remaining case is starting the job.
job_service = uws_factory.create_job_service()
metadata = await job_service.start(user, job_id, access_token)
logger.info("Started job", job_id=job_id, arq_job_id=metadata.id)
await job_service.start(user, job_id, access_token)
return str(request.url_for("get_job", job_id=job_id))


Expand All @@ -460,6 +440,7 @@ async def post_job_phase(
)
async def get_job_quote(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -479,8 +460,8 @@ async def get_job_quote(
summary="Job results",
)
async def get_job_results(
*,
job_id: str,
*,
request: Request,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
Expand Down Expand Up @@ -512,19 +493,15 @@ def install_async_post_handler(
)
async def create_job(
*,
parameters: Annotated[list[UWSJobParameter], Depends(dependency)],
request: Request,
phase: Annotated[
Literal["RUN"] | None, Query(title="Immediately start job")
] = None,
parameters: Annotated[list[UWSJobParameter], Depends(dependency)],
runid: Annotated[str | None, Depends(runid_post_dependency)],
params: Annotated[
list[UWSJobParameter], Depends(uws_post_params_dependency)
],
request: Request,
user: Annotated[str, Depends(auth_dependency)],
token: Annotated[str, Depends(auth_delegated_token_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
job_service = uws_factory.create_job_service()
job = await job_service.create(user, run_id=runid, params=parameters)
Expand Down Expand Up @@ -587,7 +564,6 @@ def install_sync_get_handler(
)
async def get_sync(
*,
params: Annotated[list[UWSJobParameter], Depends(dependency)],
runid: Annotated[
str | None,
Query(
Expand All @@ -599,6 +575,7 @@ async def get_sync(
),
),
] = None,
params: Annotated[list[UWSJobParameter], Depends(dependency)],
user: Annotated[str, Depends(auth_dependency)],
token: Annotated[str, Depends(auth_delegated_token_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
Expand Down
25 changes: 20 additions & 5 deletions src/vocutouts/uws/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import datetime, timedelta

from safir.arq import ArqQueue, JobMetadata
from safir.datetime import current_datetime
from safir.datetime import current_datetime, isodatetime
from structlog.stdlib import BoundLogger

from .config import ParametersModel, UWSConfig
Expand Down Expand Up @@ -136,7 +136,8 @@ async def delete(
job = await self._storage.get(job_id)
if job.owner != user:
raise PermissionDeniedError(f"Access to job {job_id} denied")
return await self._storage.delete(job_id)
await self._storage.delete(job_id)
self._logger.info("Deleted job", user=user, job_id=job_id)

async def get(
self,
Expand Down Expand Up @@ -283,12 +284,10 @@ async def run_sync(
job = await self.create(user, params, run_id=runid)
params_model = self._validate_parameters(params)
logger = self._build_logger_for_job(job, params_model)
logger.info("Created job")

# Start the job and wait for it to complete.
metadata = await self.start(user, job.job_id, token)
logger = logger.bind(arq_job_id=metadata.id)
logger.info("Started job")
job = await self.get(
user,
job.job_id,
Expand Down Expand Up @@ -396,6 +395,12 @@ async def update_destruction(
if destruction == job.destruction_time:
return None
await self._storage.update_destruction(job_id, destruction)
self._logger.info(
"Changed job destruction time",
user=user,
job_id=job_id,
destruction=isodatetime(destruction),
)
return destruction

async def update_execution_duration(
Expand Down Expand Up @@ -442,6 +447,16 @@ async def update_execution_duration(
if duration == job.execution_duration:
return None
await self._storage.update_execution_duration(job_id, duration)
if duration.total_seconds() > 0:
duration_str = f"{duration.total_seconds()}s"
else:
duration_str = "unlimited"
self._logger.info(
"Changed job execution duration",
user=user,
job_id=job_id,
duration=duration_str,
)
return duration

def _build_logger_for_job(
Expand All @@ -461,7 +476,7 @@ def _build_logger_for_job(
BoundLogger
Logger with more bound metadata.
"""
logger = self._logger.bind(job_id=job.job_id)
logger = self._logger.bind(user=job.owner, job_id=job.job_id)
if job.run_id:
logger = logger.bind(run_id=job.run_id)
if params:
Expand Down

0 comments on commit 4a95b13

Please sign in to comment.