Skip to content

Commit

Permalink
Merge pull request #22 from lsst-sqre/tickets/DM-33513
Browse files Browse the repository at this point in the history
[DM-33513] Replace the backend with one using lsst.image_cutout_backend
  • Loading branch information
rra authored Feb 9, 2022
2 parents dd5fd8c + de21f52 commit 1980cd3
Show file tree
Hide file tree
Showing 30 changed files with 1,506 additions and 475 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
tox-${{ matrix.python }}-${{ hashFiles('requirements/*.txt') }}-
- name: Run tox
run: tox -e docker,coverage-report,typing
run: tox -e py,coverage-report,typing

build:
runs-on: ubuntu-latest
Expand Down
22 changes: 22 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,28 @@
Change log
##########

0.2.0 (2022-02-09)
==================

This is the initial production candidate.
Another release will be forthcoming to clean up some remaining issues, but this version contains the core functionality and uses a proper backend.

The database schema of this version is incompatible with 0.1.0.
The database must be wiped and recreated during the upgrade.

- Use ``lsst.image_cutout_backend`` as the backend instead of ``pipetask`` without conversion of coordinates to pixels.
- Data IDs are now Butler UUIDs instead of colon-separated tuples.
- Support POLYGON and CIRCLE stencils and stop supporting POS RANGE, matching the capabilities of the new backend.
- Use a separate S3 bucket to store the output rather than a Butler collection.
Eliminate use of Butler in the frontend, in favor of using that S3 bucket directly.
This eliminated the ``CUTOUT_BUTLER_COLLECTION`` configuration setting and adds new ``CUTOUT_STORAGE_URL`` and ``CUTOUT_TMPDIR`` configuration settings.
- Use a different method of generating signed S3 result URLs that works correctly with workload identity in a GKE cluster.
This adds a new ``CUTOUT_SERVICE_ACCOUNT`` configuration setting specifying the service account to use for URL signing.
The workload identity the service runs as must have the ``roles/iam.serviceAccountTokenCreator`` role so that it can create signed URLs.
- Add new ``--reset`` flag to ``vo-cutouts init`` to wipe the existing database.
- Stop using a FastAPI subapp.
This was causing problems for error handling, leading to exceptions thrown in the UWS handlers to turn into 500 errors with no logged exception and no error details.

0.1.0 (2021-11-11)
==================

Expand Down
9 changes: 3 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
.PHONY: update-deps
update-deps:
pip install --upgrade pip-tools pip setuptools
# Hashes are disabled until lsst.daf.butler is on PyPI.
# pip-compile --upgrade --build-isolation --generate-hashes --output-file requirements/main.txt requirements/main.in
# pip-compile --upgrade --build-isolation --generate-hashes --output-file requirements/dev.txt requirements/dev.in
pip-compile --upgrade --build-isolation --output-file requirements/main.txt requirements/main.in
pip-compile --upgrade --build-isolation --output-file requirements/dev.txt requirements/dev.in
pip install --upgrade pip-tools 'pip<22' setuptools
pip-compile --upgrade --build-isolation --generate-hashes --output-file requirements/main.txt requirements/main.in
pip-compile --upgrade --build-isolation --generate-hashes --output-file requirements/dev.txt requirements/dev.in

.PHONY: init
init:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ known_first_party = ["vocutouts", "tests"]
skip = ["docs/conf.py"]

[tool.pytest.ini_options]
asyncio_mode = "strict"
python_files = [
"tests/*.py",
"tests/*/*.py"
Expand Down
447 changes: 399 additions & 48 deletions requirements/dev.txt

Large diffs are not rendered by default.

17 changes: 1 addition & 16 deletions requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,10 @@ astropy
asyncpg
click
dramatiq[redis]
google-auth
google-cloud-storage
jinja2
psycopg2
safir
sqlalchemy[asyncio]
structlog

# Copied from requirements.txt for lsst.daf.butler.
boto3>=1.13
botocore>=1.15
click>7.0
httpx
pydantic
pyyaml>=5.1

# These are required by lsst.utils
psutil>=5.7

# Not yet available on PyPI.
git+https://github.com/lsst/daf_butler@main#egg=daf-butler
git+https://github.com/lsst/sphgeom@main#egg=lsst_sphgeom
git+https://github.com/lsst/utils@main#egg=lsst_utils
794 changes: 685 additions & 109 deletions requirements/main.txt

Large diffs are not rendered by default.

35 changes: 26 additions & 9 deletions src/vocutouts/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,45 @@
from typing import Any, Dict, List

__all__ = [
"cutout_range",
"cutout",
"job_completed",
"job_failed",
"job_started",
]


@dramatiq.actor(queue_name="cutout", max_retries=1)
def cutout_range(
def cutout(
job_id: str,
data_id: Dict[str, str],
ra_min: float,
ra_max: float,
dec_min: float,
dec_max: float,
) -> List[Dict[str, Any]]:
"""Stub for a range cutout.
dataset_ids: List[str],
stencils: List[Dict[str, Any]],
) -> List[Dict[str, str]]:
"""Stub for a circle cutout.
This is only a stub, existing to define the actor in the Dramatiq broker
used by the API frontend. The actual implementation is in
:py:mod:`vocutouts.workers` and must have the same signature.
Parameters
----------
job_id : `str`
The UWS job ID, used as the key for storing results.
dataset_ids : List[`str`]
The data objects on which to perform cutouts. These are opaque
identifiers passed as-is to the backend. The user will normally
discover them via some service such as ObsTAP.
stencils : List[Dict[`str`, Any]]
Serialized stencils for the cutouts to perform. These are
JSON-serializable (a requirement for Dramatiq) representations of the
`~vocutouts.models.stencils.Stencil` objects corresponding to the
user's request.
Returns
-------
result : List[Dict[`str`, `str`]]
The results of the job. This must be a list of dict representations
of `~vocutouts.uws.models.JobResult` objects.
Notes
-----
For the time being, retrying cutouts is disabled by setting
Expand Down
7 changes: 5 additions & 2 deletions src/vocutouts/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ def run(port: int) -> None:


@main.command()
@click.option(
"--reset", is_flag=True, help="Delete all existing database data."
)
@coroutine
async def init() -> None:
async def init(reset: bool) -> None:
"""Initialize the database storage."""
logger = structlog.get_logger(config.logger_name)
await initialize_database(config.uws_config(), logger)
await initialize_database(config.uws_config(), logger, reset)
22 changes: 20 additions & 2 deletions src/vocutouts/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ class Configuration:
"""Configuration for vocutouts."""

butler_repository: str = os.getenv("CUTOUT_BUTLER_REPOSITORY", "")
"""The Butler repository to use for results.
"""The Butler repository holding source images for cutouts.
Set with the ``CUTOUT_BUTLER_REPOSITORY`` environment variable. Setting
this is mandatory.
"""

storage_url: str = os.getenv("CUTOUT_STORAGE_URL", "")
"""The root URL to use to store cutout results.
This must be an ``s3`` URL pointing to a Google Cloud Storage bucket that
is writable by the backend and readable by the frontend.
"""

database_url: str = os.getenv("CUTOUT_DATABASE_URL", "")
"""The URL for the UWS job database.
Expand All @@ -38,6 +45,14 @@ class Configuration:
Set with the ``CUTOUT_DATABASE_PASSWORD`` environment variable.
"""

signing_service_account: str = os.getenv("CUTOUT_SERVICE_ACCOUNT", "")
"""Email of service account to use for signed URLs.
The default credentials that the application frontend runs with must have
the ``roles/iam.serviceAccountTokenCreator`` role on the service account
with this email.
"""

execution_duration: int = int(os.getenv("CUTOUT_TIMEOUT", "600"))
"""The timeout for a single cutout job.
Expand Down Expand Up @@ -72,6 +87,9 @@ class Configuration:
one minute.
"""

tmpdir: str = os.getenv("CUTOUT_TMPDIR", "/tmp")
"""Temporary directory to use for cutouts before uploading them to GCS."""

name: str = os.getenv("SAFIR_NAME", "cutout")
"""The application's name, which doubles as the root HTTP endpoint path.
Expand Down Expand Up @@ -99,13 +117,13 @@ class Configuration:
def uws_config(self) -> UWSConfig:
"""Convert to configuration for the UWS subsystem."""
return UWSConfig(
butler_repository=self.butler_repository,
execution_duration=self.execution_duration,
lifetime=self.lifetime,
database_url=self.database_url,
database_password=self.database_password,
redis_host=self.redis_host,
redis_password=self.redis_password,
signing_service_account=self.signing_service_account,
)


Expand Down
38 changes: 36 additions & 2 deletions src/vocutouts/handlers/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

from fastapi import APIRouter, Depends, Form, Query, Request, Response
from fastapi.responses import PlainTextResponse, RedirectResponse
from safir.dependencies.logger import logger_dependency
from safir.metadata import get_metadata
from structlog.stdlib import BoundLogger

from ..config import config
from ..dependencies.auth import auth_dependency
Expand Down Expand Up @@ -118,6 +120,7 @@ async def _sync_request(
user: str,
runid: Optional[str],
uws_factory: UWSFactory,
logger: BoundLogger,
) -> Response:
"""Process a sync request.
Expand All @@ -126,23 +129,42 @@ async def _sync_request(
# Create the job, start it, and wait for it to complete.
job_service = uws_factory.create_job_service()
job = await job_service.create(user, run_id=runid, params=params)
if runid:
logger = logger.bind(run_id=runid)
logger.info(
"Created job",
user=user,
job_id=job.job_id,
params=[p.to_dict() for p in params],
)
await job_service.start(user, job.job_id)
logger.info("Started job", user=user, job_id=job.job_id)
job = await job_service.get(
user, job.job_id, wait=config.sync_timeout, wait_for_completion=True
)

# Check for error states.
if job.phase not in (ExecutionPhase.COMPLETED, ExecutionPhase.ERROR):
logger.warning("Job timed out", user=user, job_id=job.job_id)
return PlainTextResponse(
f"Error Cutout did not complete in {config.sync_timeout}s",
status_code=400,
)
if job.error:
logger.warning(
"Job failed",
user=user,
job_id=job.job_id,
error_code=job.error.error_code.value,
error=job.error.message,
error_detail=job.error.detail,
)
response = f"{job.error.error_code.value} {job.error.message}\n"
if job.error.detail:
response += f"\n{job.error.detail}"
return PlainTextResponse(response, status_code=400)
if not job.results:
logger.warning("Job returned no results", user=user, job_id=job.job_id)
return PlainTextResponse(
"Error Job did not return any results", status_code=400
)
Expand Down Expand Up @@ -225,12 +247,13 @@ async def get_sync(
),
user: str = Depends(auth_dependency),
uws_factory: UWSFactory = Depends(uws_dependency),
logger: BoundLogger = Depends(logger_dependency),
) -> Response:
params = [
JobParameter(parameter_id=k.lower(), value=v, is_post=False)
for k, v in request.query_params.items()
]
return await _sync_request(params, user, runid, uws_factory)
return await _sync_request(params, user, runid, uws_factory, logger)


@external_router.post(
Expand Down Expand Up @@ -306,13 +329,14 @@ async def post_sync(
params: List[JobParameter] = Depends(uws_post_params_dependency),
user: str = Depends(auth_dependency),
uws_factory: UWSFactory = Depends(uws_dependency),
logger: BoundLogger = Depends(logger_dependency),
) -> Response:
runid = None
for param in params:
if param.parameter_id == "runid":
runid = param.value
params = [p for p in params if p.parameter_id != "runid"]
return await _sync_request(params, user, runid, uws_factory)
return await _sync_request(params, user, runid, uws_factory, logger)


@uws_router.post(
Expand Down Expand Up @@ -380,6 +404,7 @@ async def create_job(
params: List[JobParameter] = Depends(uws_post_params_dependency),
user: str = Depends(auth_dependency),
uws_factory: UWSFactory = Depends(uws_dependency),
logger: BoundLogger = Depends(logger_dependency),
) -> str:
runid = None
for param in params:
Expand All @@ -390,8 +415,17 @@ async def create_job(
# Create the job and optionally start it.
job_service = uws_factory.create_job_service()
job = await job_service.create(user, run_id=runid, params=params)
if runid:
logger = logger.bind(run_id=runid)
logger.info(
"Created job",
user=user,
job_id=job.job_id,
params=[p.to_dict() for p in params],
)
if phase == "RUN":
await job_service.start(user, job.job_id)
logger.info("Started job", user=user, job_id=job.job_id)

# Redirect to the new job.
return request.url_for("get_job", job_id=job.job_id)
Expand Down
28 changes: 14 additions & 14 deletions src/vocutouts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from safir.logging import configure_logging
from safir.middleware.x_forwarded import XForwardedMiddleware

from .actors import cutout_range
from .actors import cutout
from .config import config
from .handlers.external import external_router
from .handlers.internal import internal_router
Expand All @@ -33,34 +33,34 @@
name=config.logger_name,
)

app = FastAPI()
"""The main FastAPI application for vo-cutouts."""

# Define the external routes in a subapp so that it will serve its own OpenAPI
# interface definition and documentation URLs under the external URL.
_subapp = FastAPI(
app = FastAPI(
title="vo-cutouts",
description=metadata("vo-cutouts").get("Summary", ""),
version=metadata("vo-cutouts").get("Version", "0.0.0"),
openapi_url=f"/{config.name}/openapi.json",
docs_url=f"/{config.name}/docs",
redoc_url=f"/{config.name}/redoc",
)
_subapp.include_router(
external_router, responses={401: {"description": "Unauthenticated"}}
)
"""The main FastAPI application for vo-cutouts."""

# Attach the internal routes and subapp to the main application.
# Attach the routers.
app.include_router(internal_router)
app.mount(f"/{config.name}", _subapp)
app.include_router(
external_router,
prefix=f"/{config.name}",
responses={401: {"description": "Unauthenticated"}},
)


@app.on_event("startup")
async def startup_event() -> None:
app.add_middleware(XForwardedMiddleware)
app.add_middleware(CaseInsensitiveQueryMiddleware)
logger = structlog.get_logger(config.logger_name)
install_error_handlers(_subapp)
install_error_handlers(app)
await uws_dependency.initialize(
config=config.uws_config(),
policy=ImageCutoutPolicy(cutout_range),
policy=ImageCutoutPolicy(cutout, logger),
logger=logger,
)

Expand Down
Loading

0 comments on commit 1980cd3

Please sign in to comment.