Skip to content

Commit

Permalink
Pass access tokens down to job dispatch
Browse files Browse the repository at this point in the history
Retrieve delegated access tokens from Gafaelfawr headers, and pass them down to the job dispatch layer.  This will be used to pass the access token to the backend for use by the Butler client.
  • Loading branch information
dhirving committed Jan 26, 2024
1 parent 6e89cdd commit af49300
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 11 deletions.
17 changes: 13 additions & 4 deletions src/vocutouts/handlers/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from fastapi import APIRouter, Depends, Form, Query, Request, Response
from fastapi.responses import PlainTextResponse, RedirectResponse
from safir.dependencies.gafaelfawr import (
auth_delegated_token_dependency,
auth_dependency,
auth_logger_dependency,
)
Expand Down Expand Up @@ -123,6 +124,7 @@ async def _sync_request(
runid: str | None,
uws_factory: UWSFactory,
logger: BoundLogger,
access_token: str,
) -> Response:
"""Process a sync request.
Expand All @@ -136,7 +138,7 @@ async def _sync_request(
logger.info(
"Created job", job_id=job.job_id, params=[p.to_dict() for p in params]
)
await job_service.start(user, job.job_id)
await job_service.start(user, job.job_id, access_token)
logger.info("Started job", job_id=job.job_id)
job = await job_service.get(
user, job.job_id, wait=config.sync_timeout, wait_for_completion=True
Expand Down Expand Up @@ -244,14 +246,17 @@ async def get_sync(
),
),
user: str = Depends(auth_dependency),
access_token: str = Depends(auth_delegated_token_dependency),
uws_factory: UWSFactory = Depends(uws_dependency),
logger: BoundLogger = Depends(auth_logger_dependency),
) -> Response:
params = [
JobParameter(parameter_id=k.lower(), value=v, is_post=False)
for k, v in request.query_params.items()
]
return await _sync_request(params, user, runid, uws_factory, logger)
return await _sync_request(
params, user, runid, uws_factory, logger, access_token
)


@external_router.post(
Expand Down Expand Up @@ -326,6 +331,7 @@ async def post_sync(
),
params: list[JobParameter] = Depends(uws_post_params_dependency),
user: str = Depends(auth_dependency),
access_token: str = Depends(auth_delegated_token_dependency),
uws_factory: UWSFactory = Depends(uws_dependency),
logger: BoundLogger = Depends(auth_logger_dependency),
) -> Response:
Expand All @@ -334,7 +340,9 @@ async def post_sync(
if param.parameter_id == "runid":
runid = param.value
params = [p for p in params if p.parameter_id != "runid"]
return await _sync_request(params, user, runid, uws_factory, logger)
return await _sync_request(
params, user, runid, uws_factory, logger, access_token
)


@uws_router.post(
Expand Down Expand Up @@ -401,6 +409,7 @@ async def create_job(
),
params: list[JobParameter] = Depends(uws_post_params_dependency),
user: str = Depends(auth_dependency),
access_token: str = Depends(auth_delegated_token_dependency),
uws_factory: UWSFactory = Depends(uws_dependency),
logger: BoundLogger = Depends(auth_logger_dependency),
) -> str:
Expand All @@ -419,7 +428,7 @@ async def create_job(
"Created job", job_id=job.job_id, params=[p.to_dict() for p in params]
)
if phase == "RUN":
await job_service.start(user, job.job_id)
await job_service.start(user, job.job_id, access_token)
logger.info("Started job", job_id=job.job_id)

# Redirect to the new job.
Expand Down
5 changes: 4 additions & 1 deletion src/vocutouts/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ def __init__(self, actor: Actor, logger: BoundLogger) -> None:
self._actor = actor
self._logger = logger

def dispatch(self, job: Job) -> Message:
def dispatch(self, job: Job, access_token: str) -> Message:
"""Dispatch a cutout request to the backend.
Parameters
----------
job
The submitted job description.
access_token
Gafaelfawr access token used to authenticate to Butler server
in the backend.
Returns
-------
Expand Down
4 changes: 3 additions & 1 deletion src/vocutouts/uws/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from fastapi import APIRouter, Depends, Form, Query, Request, Response
from fastapi.responses import PlainTextResponse, RedirectResponse
from safir.dependencies.gafaelfawr import (
auth_delegated_token_dependency,
auth_dependency,
auth_logger_dependency,
)
Expand Down Expand Up @@ -390,6 +391,7 @@ async def post_job_phase(
),
params: list[JobParameter] = Depends(uws_post_params_dependency),
user: str = Depends(auth_dependency),
access_token: str = Depends(auth_delegated_token_dependency),
uws_factory: UWSFactory = Depends(uws_dependency),
logger: BoundLogger = Depends(auth_logger_dependency),
) -> str:
Expand All @@ -410,7 +412,7 @@ async def post_job_phase(

# The only remaining case is starting the job.
job_service = uws_factory.create_job_service()
await job_service.start(user, job_id)
await job_service.start(user, job_id, access_token)
logger.info("Started job", job_id=job_id)
return str(request.url_for("get_job", job_id=job_id))

Expand Down
5 changes: 4 additions & 1 deletion src/vocutouts/uws/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class UWSPolicy(ABC):
"""

@abstractmethod
def dispatch(self, job: Job) -> Message:
def dispatch(self, job: Job, access_token: str) -> Message:
"""Dispatch a job to a backend worker.
This method is responsible for converting UWS job parameters to the
Expand All @@ -38,6 +38,9 @@ def dispatch(self, job: Job) -> Message:
----------
job
The job to start.
access_token
Gafaelfawr access token used to authenticate to services used
by the backend on the user's behalf.
Returns
-------
Expand Down
9 changes: 7 additions & 2 deletions src/vocutouts/uws/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ async def list_jobs(
user, phases=phases, after=after, count=count
)

async def start(self, user: str, job_id: str) -> Message:
async def start(
self, user: str, job_id: str, access_token: str
) -> Message:
"""Start execution of a job.
Parameters
Expand All @@ -252,6 +254,9 @@ async def start(self, user: str, job_id: str) -> Message:
User on behalf of whom this operation is performed.
job_id
Identifier of the job to start.
access_token
Gafaelfawr access token used to authenticate to services used
by the backend on the user's behalf.
Returns
-------
Expand All @@ -269,7 +274,7 @@ async def start(self, user: str, job_id: str) -> Message:
raise PermissionDeniedError(f"Access to job {job_id} denied")
if job.phase not in (ExecutionPhase.PENDING, ExecutionPhase.HELD):
raise InvalidPhaseError("Cannot start job in phase {job.phase}")
message = self._policy.dispatch(job)
message = self._policy.dispatch(job, access_token)
await self._storage.mark_queued(job_id, message.message_id)
return message

Expand Down
7 changes: 6 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ async def app() -> AsyncIterator[FastAPI]:
@pytest_asyncio.fixture
async def client(app: FastAPI) -> AsyncIterator[AsyncClient]:
"""Return an ``httpx.AsyncClient`` configured to talk to the test app."""
async with AsyncClient(app=app, base_url="https://example.com/") as client:
async with AsyncClient(
app=app,
base_url="https://example.com/",
# Mock the Gafaelfawr delegated token header.
headers={"x-auth-request-token": "sometoken"},
) as client:
yield client


Expand Down
2 changes: 1 addition & 1 deletion tests/support/uws.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def __init__(self, actor: Actor) -> None:
super().__init__()
self.actor = actor

def dispatch(self, job: Job) -> Message:
def dispatch(self, job: Job, access_token: str) -> Message:
return self.actor.send_with_options(
args=(job.job_id,),
on_success=job_completed,
Expand Down

0 comments on commit af49300

Please sign in to comment.