Skip to content

Commit

Permalink
Merge pull request #179 from lsst-sqre/tickets/DM-44763
Browse files Browse the repository at this point in the history
DM-44763: Move logging to the service layer
  • Loading branch information
rra authored Jun 14, 2024
2 parents a5f8a7d + 4a95b13 commit a8812b8
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 52 deletions.
71 changes: 24 additions & 47 deletions src/vocutouts/uws/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ async def get_job_list(
summary="Job details",
)
async def get_job(
*,
job_id: str,
*,
request: Request,
wait: Annotated[
int | None,
Expand Down Expand Up @@ -135,16 +135,14 @@ async def get_job(
summary="Delete a job",
)
async def delete_job(
*,
job_id: str,
*,
request: Request,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
job_service = uws_factory.create_job_service()
await job_service.delete(user, job_id)
logger.info("Deleted job", job_id=job_id)
return str(request.url_for("get_job_list"))


Expand All @@ -158,8 +156,8 @@ async def delete_job(
summary="Delete a job",
)
async def delete_job_via_post(
*,
job_id: str,
*,
request: Request,
action: Annotated[
Literal["DELETE"] | None,
Expand All @@ -173,7 +171,6 @@ async def delete_job_via_post(
],
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
# Work around the obnoxious requirement for case-insensitive parameters,
# which is also why the action parameter is declared as optional (but is
Expand All @@ -191,7 +188,6 @@ async def delete_job_via_post(
# Do the actual deletion.
job_service = uws_factory.create_job_service()
await job_service.delete(user, job_id)
logger.info("Deleted job", job_id=job_id)
return str(request.url_for("get_job_list"))


Expand All @@ -202,6 +198,7 @@ async def delete_job_via_post(
)
async def get_job_destruction(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -217,8 +214,8 @@ async def get_job_destruction(
summary="Change job destruction time",
)
async def post_job_destruction(
*,
job_id: str,
*,
request: Request,
destruction: Annotated[
datetime | None,
Expand All @@ -233,7 +230,6 @@ async def post_job_destruction(
],
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
# Work around the obnoxious requirement for case-insensitive parameters.
for param in params:
Expand All @@ -247,20 +243,9 @@ async def post_job_destruction(
if not destruction:
raise ParameterError("No new destruction time given")

# Update the destruction time. Note that the policy layer may modify the
# destruction time, so the time set may not match the input.
# update_destruction returns the actual time set, or None if the time was
# not changed.
# Update the destruction time.
job_service = uws_factory.create_job_service()
new_destruction = await job_service.update_destruction(
user, job_id, destruction
)
if new_destruction:
logger.info(
"Changed job destruction time",
job_id=job_id,
destruction=isodatetime(new_destruction),
)
await job_service.update_destruction(user, job_id, destruction)
return str(request.url_for("get_job", job_id=job_id))


Expand All @@ -273,8 +258,8 @@ async def post_job_destruction(
summary="Job error",
)
async def get_job_error(
*,
job_id: str,
*,
request: Request,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
Expand All @@ -294,6 +279,7 @@ async def get_job_error(
)
async def get_job_execution_duration(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -309,8 +295,8 @@ async def get_job_execution_duration(
summary="Change job execution duration",
)
async def post_job_execution_duration(
*,
job_id: str,
*,
request: Request,
executionduration: Annotated[
int | None,
Expand All @@ -325,7 +311,6 @@ async def post_job_execution_duration(
],
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
# Work around the obnoxious requirement for case-insensitive parameters.
for param in params:
Expand All @@ -346,15 +331,8 @@ async def post_job_execution_duration(
# update_execution_duration returns the new execution duration set, or
# None if it was not changed.
job_service = uws_factory.create_job_service()
new_executionduration = await job_service.update_execution_duration(
user, job_id, timedelta(seconds=executionduration)
)
if new_executionduration is not None:
logger.info(
"Changed job execution duration",
job_id=job_id,
duration=f"{new_executionduration}s",
)
duration = timedelta(seconds=executionduration)
await job_service.update_execution_duration(user, job_id, duration)
return str(request.url_for("get_job", job_id=job_id))


Expand All @@ -365,6 +343,7 @@ async def post_job_execution_duration(
)
async def get_job_owner(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -379,8 +358,8 @@ async def get_job_owner(
summary="Job parameters",
)
async def get_job_parameters(
*,
job_id: str,
*,
request: Request,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
Expand All @@ -398,6 +377,7 @@ async def get_job_parameters(
)
async def get_job_phase(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -413,8 +393,8 @@ async def get_job_phase(
summary="Start or abort job",
)
async def post_job_phase(
*,
job_id: str,
*,
request: Request,
phase: Annotated[
Literal["RUN", "ABORT"] | None,
Expand Down Expand Up @@ -442,14 +422,14 @@ async def post_job_phase(
if not phase:
raise ParameterError("No new phase given")

# Dramatiq doesn't support aborting jobs, so currently neither do we.
# We cannot abort arq jobs because we're using sync workers, and Python
# doesn't have any way to cancel them.
if phase == "ABORT":
raise PermissionDeniedError("Aborting jobs is not supported")

# The only remaining case is starting the job.
job_service = uws_factory.create_job_service()
metadata = await job_service.start(user, job_id, access_token)
logger.info("Started job", job_id=job_id, arq_job_id=metadata.id)
await job_service.start(user, job_id, access_token)
return str(request.url_for("get_job", job_id=job_id))


Expand All @@ -460,6 +440,7 @@ async def post_job_phase(
)
async def get_job_quote(
job_id: str,
*,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
) -> str:
Expand All @@ -479,8 +460,8 @@ async def get_job_quote(
summary="Job results",
)
async def get_job_results(
*,
job_id: str,
*,
request: Request,
user: Annotated[str, Depends(auth_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
Expand Down Expand Up @@ -512,19 +493,15 @@ def install_async_post_handler(
)
async def create_job(
*,
parameters: Annotated[list[UWSJobParameter], Depends(dependency)],
request: Request,
phase: Annotated[
Literal["RUN"] | None, Query(title="Immediately start job")
] = None,
parameters: Annotated[list[UWSJobParameter], Depends(dependency)],
runid: Annotated[str | None, Depends(runid_post_dependency)],
params: Annotated[
list[UWSJobParameter], Depends(uws_post_params_dependency)
],
request: Request,
user: Annotated[str, Depends(auth_dependency)],
token: Annotated[str, Depends(auth_delegated_token_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
logger: Annotated[BoundLogger, Depends(auth_logger_dependency)],
) -> str:
job_service = uws_factory.create_job_service()
job = await job_service.create(user, run_id=runid, params=parameters)
Expand Down Expand Up @@ -587,7 +564,6 @@ def install_sync_get_handler(
)
async def get_sync(
*,
params: Annotated[list[UWSJobParameter], Depends(dependency)],
runid: Annotated[
str | None,
Query(
Expand All @@ -599,6 +575,7 @@ async def get_sync(
),
),
] = None,
params: Annotated[list[UWSJobParameter], Depends(dependency)],
user: Annotated[str, Depends(auth_dependency)],
token: Annotated[str, Depends(auth_delegated_token_dependency)],
uws_factory: Annotated[UWSFactory, Depends(uws_dependency)],
Expand Down
25 changes: 20 additions & 5 deletions src/vocutouts/uws/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import datetime, timedelta

from safir.arq import ArqQueue, JobMetadata
from safir.datetime import current_datetime
from safir.datetime import current_datetime, isodatetime
from structlog.stdlib import BoundLogger

from .config import ParametersModel, UWSConfig
Expand Down Expand Up @@ -136,7 +136,8 @@ async def delete(
job = await self._storage.get(job_id)
if job.owner != user:
raise PermissionDeniedError(f"Access to job {job_id} denied")
return await self._storage.delete(job_id)
await self._storage.delete(job_id)
self._logger.info("Deleted job", user=user, job_id=job_id)

async def get(
self,
Expand Down Expand Up @@ -283,12 +284,10 @@ async def run_sync(
job = await self.create(user, params, run_id=runid)
params_model = self._validate_parameters(params)
logger = self._build_logger_for_job(job, params_model)
logger.info("Created job")

# Start the job and wait for it to complete.
metadata = await self.start(user, job.job_id, token)
logger = logger.bind(arq_job_id=metadata.id)
logger.info("Started job")
job = await self.get(
user,
job.job_id,
Expand Down Expand Up @@ -396,6 +395,12 @@ async def update_destruction(
if destruction == job.destruction_time:
return None
await self._storage.update_destruction(job_id, destruction)
self._logger.info(
"Changed job destruction time",
user=user,
job_id=job_id,
destruction=isodatetime(destruction),
)
return destruction

async def update_execution_duration(
Expand Down Expand Up @@ -442,6 +447,16 @@ async def update_execution_duration(
if duration == job.execution_duration:
return None
await self._storage.update_execution_duration(job_id, duration)
if duration.total_seconds() > 0:
duration_str = f"{duration.total_seconds()}s"
else:
duration_str = "unlimited"
self._logger.info(
"Changed job execution duration",
user=user,
job_id=job_id,
duration=duration_str,
)
return duration

def _build_logger_for_job(
Expand All @@ -461,7 +476,7 @@ def _build_logger_for_job(
BoundLogger
Logger with more bound metadata.
"""
logger = self._logger.bind(job_id=job.job_id)
logger = self._logger.bind(user=job.owner, job_id=job.job_id)
if job.run_id:
logger = logger.bind(run_id=job.run_id)
if params:
Expand Down

0 comments on commit a8812b8

Please sign in to comment.