Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ERA5T Support - Cloud Run Job Migration #95

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
ARG py_version=3.9
ARG py_version=3.8
FROM apache/beam_python${py_version}_sdk:2.40.0 as beam_sdk
FROM continuumio/miniconda3:latest

Expand Down
59 changes: 17 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,6 @@ This feature is works in 4 parts.
1. Acquiring raw data from CDS, facilitated by [`weather-dl`](https://weather-tools.readthedocs.io/en/latest/weather_dl/README.html) tool.
2. Splitting raw data using [`weather-sp`](https://weather-tools.readthedocs.io/en/latest/weather_sp/README.html).
3. Ingest this splitted data into a zarr file.
4. [**WIP**] Ingest [`AR`](gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3) data into BigQuery with the assistance of the [`weather-mv`](https://weather-tools.readthedocs.io/en/latest/weather_mv/README.html).

#### How to Run.
1. Set up a Cloud project with sufficient permissions to use cloud storage (such as [GCS](https://cloud.google.com/storage)) and a Beam runner (such as [Dataflow](https://cloud.google.com/dataflow)).
Expand All @@ -756,26 +755,18 @@ This feature is works in 4 parts.
3. Add the all `Copernicus` licenses into the [secret-manager](https://cloud.google.com/secret-manager) with value likes this: {"api_url": "URL", "api_key": "KEY"}
> NOTE: for every API_KEY there must be unique secret-key.

4. Update all of these variable in [docker-file](data_automate/Dockerfile).
4. Update all of these variable in [docker-file](deployment/constants.py).
* `PROJECT`
* `REGION`
* `BUCKET`
* `MANIFEST_LOCATION`
* `API_KEY_*`
* In case of multiple API keys, API_KEY must follow this format: `API_KEY_*`. here * can be numeric value i.e. 1, 2.
* API_KEY_* value is the resource name of [secret-manager key](https://cloud.google.com/secret-manager) and it's value looks like this :: ```projects/PROJECT_NAME/secrets/SECRET_KEY_NAME/versions/1```
* `WEATHER_TOOLS_SDK_CONTAINER_IMAGE`
* Is made using this [dockerfile](https://github.com/google/weather-tools/blob/setuptools/Dockerfile) and is stored in a docker registry.
* `ARCO_ERA5_SDK_CONTAINER_IMAGE`
* `BQ_TABLES_LIST`
* `REGION_LIST`

> * In case of multiple API keys, API_KEY must follow this format: `API_KEY_*`. here * can be numeric value i.e. 1, 2.
> * API_KEY_* value is the resource name of [secret-manager key](https://cloud.google.com/secret-manager) and it's value looks like this :: ```projects/PROJECT_NAME/secrets/SECRET_KEY_NAME/versions/1```
> * `BQ_TABLES_LIST` is list of the BigQuery table in which data is ingested and it's value is like this ::
```'["PROJECT.DATASET.TABLE1", "PROJECT.DATASET.TABLE2", ..., "PROJECT.DATASET.TABLE6"]'```.
> * `REGION_LIST` is list of the GCP_region in which the job of ingestion will run ::
```'["us-east1", "us-west4",..., "us-west2"]'```.
> * Size of `BQ_TABLES_LIST` and `REGION_LIST` must be **6** as total 6 zarr file processed in the current pipeline and also, data ingestion in Bigquery are corresponding to `ZARR_FILES_LIST` of [raw-to-zarr-to-bq.py](/arco-era5/src/raw-to-zarr-to-bq.py) so add table name in `BQ_TABLES_LIST` accordingly.
> * `WEATHER_TOOLS_SDK_CONTAINER_IMAGE` is made using this [dockerfile](https://github.com/google/weather-tools/blob/setuptools/Dockerfile) and is stored in a docker registry.
> * `ARCO_ERA5_SDK_CONTAINER_IMAGE` is made using this [dockerfile](https://github.com/google-research/arco-era5/blob/main/Dockerfile) and is stored in a registry.
* Is made using this [dockerfile](https://github.com/google-research/arco-era5/blob/main/Dockerfile) and is stored in a registry.


5. Create docker image.
Expand All @@ -787,37 +778,21 @@ export REPO=<repo> eg:arco-era5-raw-to-zarr-to-bq
gcloud builds submit . --tag "gcr.io/$PROJECT_ID/$REPO:latest"
```

7. Create a VM using above created docker-image
6. Run script to create cloud run jobs. [create_job](deployment/create_job.py)
```
export ZONE=<zone> eg: us-central1-a
export SERVICE_ACCOUNT=<service account> # Let's keep this as Compute Engine Default Service Account
export IMAGE_PATH=<container-image-path> # The above created image-path

gcloud compute instances create-with-container arco-era5-raw-to-zarr-to-bq \ --project=$PROJECT_ID \
--zone=$ZONE \
--machine-type=n2-standard-4 \
--network-interface=network-tier=PREMIUM,subnet=default \
--maintenance-policy=MIGRATE \
--provisioning-model=STANDARD \
--service-account=$SERVICE_ACCOUNT \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--image=projects/cos-cloud/global/images/cos-stable-109-17800-0-45 \
--boot-disk-size=200GB \
--boot-disk-type=pd-balanced \
--boot-disk-device-name=arco-era5-raw-to-zarr-to-bq \
--container-image=$IMAGE_PATH \
--container-restart-policy=on-failure \
--container-tty \
--no-shielded-secure-boot \
--shielded-vtpm \
--shielded-integrity-monitoring \
--labels=goog-ec-src=vm_add-gcloud,container-vm=cos-stable-109-17800-0-45 \
--metadata-from-file=startup-script=start-up.sh
python deployment/create_job.py
```

8. Once VM is created, the script will execute on `7th day of every month` as this is default set in the [cron-file](data_automate/cron-file).Also you can see the logs after connecting to VM through SSH.
> Log will be shown at this(`/var/log/cron.log`) file.
> Better if we SSH after 5-10 minutes of VM creation.
7. There will be 5 different cloud run jobs.
- `arco-era5-zarr-ingestion` - For zarr data ingestion.
- `arco-era5t-daily-executor` - Triggers daily to process era5t-daily data.
- `arco-era5t-monthly-executor` - Triggers monthly to process era5t-monthly data.
- `arco-era5-sanity` - Sanity job to validate the data era5 vs era5t and replace in case of difference.
- `arco-era5-executor` - Triggers every month to run a sanity job for every zarr available.
8. Set up cloud schedulers to trigger above jobs on specified frequencies.
- `arco-era5t-daily-executor` - Schedule a daily trigger for `era5t-daily` data.
- `arco-era5t-monthly-executor` - Schedule a monthly trigger for `era5t-monthly` data.
- `arco-era5-executor` - Schedule a monthly trigger to do era5 vs era5t sanity for `current_month - 3` month.
### Making the dataset "High Resolution" & beyond...

This phase of the project is under active development! If you would like to lend a hand in any way, please check out our
Expand Down
41 changes: 41 additions & 0 deletions deployment/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

FROM continuumio/miniconda3:latest

# Add the mamba solver for faster builds
RUN conda install -n base conda-libmamba-solver
RUN conda config --set solver libmamba

# Clone the weather-tools and create conda env using environment.yml of weather-tools.
ARG weather_tools_git_rev=main
RUN git clone https://github.com/google/weather-tools.git /weather
WORKDIR /weather
RUN git checkout "${weather_tools_git_rev}"
RUN rm -r /weather/weather_*/test_data/
RUN conda env create -f environment.yml --debug

# Activate the conda env and update the PATH
ARG CONDA_ENV_NAME=weather-tools
RUN echo "source activate ${CONDA_ENV_NAME}" >> ~/.bashrc
ENV PATH /opt/conda/envs/${CONDA_ENV_NAME}/bin:$PATH
RUN pip install -e .

# (TODO): Replace branch with main before merge.
ARG arco_era5_git_rev=era5t-support
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the branch name before merging this PR.

RUN git clone https://github.com/google-research/arco-era5.git /arco-era5
WORKDIR /arco-era5
RUN git checkout "${arco_era5_git_rev}"
RUN pip install -e .
68 changes: 68 additions & 0 deletions deployment/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
PROJECT = "<PROJECT_ID>"
REGION = "<REGION>"
BUCKET = "<BUCKET_NAME>"
API_KEY_1 = "projects/PROJECT/secrets/SECRET_NAME/versions/1"
API_KEY_2 = "projects/PROJECT/secrets/SECRET_NAME/versions/1"
API_KEY_3 = "projects/PROJECT/secrets/SECRET_NAME/versions/1"
API_KEY_4 = "projects/PROJECT/secrets/SECRET_NAME/versions/1"
WEATHER_TOOLS_SDK_CONTAINER_IMAGE = "WEATHER_TOOLS_SDK_CONTAINER_IMAGE"
ARCO_ERA5_SDK_CONTAINER_IMAGE = "ARCO_ERA5_SDK_CONTAINER_IMAGE"
MANIFEST_LOCATION = "fs://manifest?projectId=PROJECT_ID"
TEMP_PATH_FOR_RAW_DATA="<BUCKET_PATH>"
ROOT_PATH = "<BUCKET_PATH>"

TASK_COUNT = 1
MAX_RETRIES = 1

CONTAINER_NAME = "arco-era5-container"
CLOUD_RUN_CONTAINER_IMAGE = "CLOUD_RUN_CONTAINER_IMAGE"
CONTAINER_COMMAND = ["python"]
CONTAINER_CPU_LIMIT = "8000m"

INGESTION_JOB_ID = "arco-era5-zarr-ingestion"
SANITY_JOB_ID = "arco-era5-sanity"

EXECUTOR_JOB_ID = "arco-era5-executor"
DAILY_EXECUTOR_JOB_ID = "arco-era5t-daily-executor"
MONTHLY_EXECUTOR_JOB_ID = "arco-era5t-monthly-executor"

EXECUTOR_JOB_CONTAINER_ARGS = [
"src/raw-to-zarr-to-bq.py"
]
DAILY_EXECUTOR_JOB_CONTAINER_ARGS = [
"src/raw-to-zarr-to-bq.py", "--mode", "daily"
]
MONTHLY_EXECUTOR_JOB_CONTAINER_ARGS = [
"src/raw-to-zarr-to-bq.py", "--mode", "monthly"
]

JOB_CONTAINER_ENV_VARIABLES = [
{"name": "PROJECT", "value": PROJECT},
{"name": "REGION", "value": REGION},
{"name": "BUCKET", "value": BUCKET},
{"name": "WEATHER_TOOLS_SDK_CONTAINER_IMAGE", "value": WEATHER_TOOLS_SDK_CONTAINER_IMAGE},
{"name": "MANIFEST_LOCATION", "value": MANIFEST_LOCATION},
{"name": "PYTHON_PATH", "value": "python"},
{"name": "TEMP_PATH_FOR_RAW_DATA", "value": TEMP_PATH_FOR_RAW_DATA},
{"name": "INGESTION_JOB_ID", "value": INGESTION_JOB_ID},
{"name": "SANITY_JOB_ID", "value": SANITY_JOB_ID},
{"name": "ROOT_PATH", "value": ROOT_PATH},
{"name": "ARCO_ERA5_SDK_CONTAINER_IMAGE", "value": ARCO_ERA5_SDK_CONTAINER_IMAGE}
]

ERA5_API_KEYS = [
{"name": "API_KEY_1", "value": API_KEY_3},
{"name": "API_KEY_2", "value": API_KEY_4}
]

ERA5T_API_KEYS = [
{"name": "API_KEY_1", "value": API_KEY_1},
{"name": "API_KEY_2", "value": API_KEY_2}
]

CONTAINER_MEMORY_LIMIT = "34G"

# Timeout for a day
TIMEOUT_SECONDS = 86400

CONTAINER_MEMORY_LIMIT = "34G"
111 changes: 111 additions & 0 deletions deployment/create_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.cloud import run_v2
import constants


def create_job(job_id: str, template: run_v2.ExecutionTemplate):
# Create a client
client = run_v2.JobsClient()

# Initialize request argument(s)
job = run_v2.Job()
job.template = template

request = run_v2.CreateJobRequest(
parent=f"projects/{constants.PROJECT}/locations/{constants.REGION}",
job=job,
job_id=job_id,
)

# Make the request
operation = client.create_job(request=request)

print("Waiting for operation to complete...")

response = operation.result()

# Handle the response
print(response)


def era5_job_creator(
job_id: str,
timeout_sec: int,
container_args: list = None,
contaner_env: list = None,
container_memory_limit: str = None,
max_retries: int = 0
):
# Create an ExecutionTemplate
template = run_v2.ExecutionTemplate()
template.task_count = constants.TASK_COUNT

template.template.timeout = {"seconds": timeout_sec}
template.template.max_retries = max_retries

# Set container details
container = run_v2.Container()
container.name = constants.CONTAINER_NAME
container.image = constants.CLOUD_RUN_CONTAINER_IMAGE
container.command = constants.CONTAINER_COMMAND
container.args = container_args

# Set environment variables (example)
container.env = contaner_env

# Set resource limits (example)
container.resources.limits = {
"cpu": constants.CONTAINER_CPU_LIMIT,
"memory": container_memory_limit,
}

# Add the container to the template
template.template.containers.append(container)

create_job(job_id, template)


if __name__ == "__main__":
era5_job_creator(
job_id=constants.SANITY_JOB_ID,
timeout_sec=constants.TIMEOUT_SECONDS,
container_memory_limit=constants.CONTAINER_MEMORY_LIMIT,
)
era5_job_creator(
job_id=constants.INGESTION_JOB_ID,
timeout_sec=constants.TIMEOUT_SECONDS,
container_memory_limit=constants.CONTAINER_MEMORY_LIMIT,
)
era5_job_creator(
job_id=constants.DAILY_EXECUTOR_JOB_ID,
timeout_sec=constants.TIMEOUT_SECONDS,
container_args=constants.DAILY_EXECUTOR_JOB_CONTAINER_ARGS,
contaner_env=constants.JOB_CONTAINER_ENV_VARIABLES + constants.ERA5T_API_KEYS,
container_memory_limit=constants.CONTAINER_MEMORY_LIMIT,
)
era5_job_creator(
job_id=constants.MONTHLY_EXECUTOR_JOB_ID,
timeout_sec=constants.TIMEOUT_SECONDS,
container_args=constants.MONTHLY_EXECUTOR_JOB_CONTAINER_ARGS,
contaner_env=constants.JOB_CONTAINER_ENV_VARIABLES + constants.ERA5T_API_KEYS,
container_memory_limit=constants.CONTAINER_MEMORY_LIMIT,
)
era5_job_creator(
job_id=constants.EXECUTOR_JOB_ID,
timeout_sec=constants.TIMEOUT_SECONDS,
container_args=constants.EXECUTOR_JOB_CONTAINER_ARGS,
contaner_env=constants.JOB_CONTAINER_ENV_VARIABLES + constants.ERA5_API_KEYS,
container_memory_limit=constants.CONTAINER_MEMORY_LIMIT,
)
3 changes: 2 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ name: era5
channels:
- conda-forge
dependencies:
- python=3.9.17
- python=3.8.19
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Python 3.8 is concerning to me. Python 3.8 is very old, already at end of life. This means it no longer receives security updates: https://devguide.python.org/versions/

Even Python 3.9 will soon be at end of life. We should strive to use newer versions of Python which are actively maintained.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed @shoyer. The plan is to remove this environment.yml and Docker entirely from the repository. As most of the code is working without these on dataflow. Also we'll definitely upgrade with the newer python version and latest zarr version (v3) in future.

- metview-batch==5.17.0
- pip:
- cython==0.29.34
- setuptools==70.3.0
- metview
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ def run(self):
'immutabledict',
'xarray-beam',
'scipy',
'fsspec==2023.4.0'
'fsspec==2023.4.0',
'google-cloud-run'
],
tests_require=['pytest'],
cmdclass={
Expand Down
9 changes: 6 additions & 3 deletions src/arco_era5/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .constant import variables_full_names, zarr_files
from .constant import ARCO_ERA5_ZARR_FILES, variables_full_names, zarr_files
from .data_availability import check_data_availability
from .ingest_data_in_zarr import ingest_data_in_zarr_dataflow_job
from .download import raw_data_download_dataflow_job, data_splitting_dataflow_job
from .ingest_data_in_zarr import perform_data_operations
from .pangeo import run, parse_args
from .resize_zarr import resize_zarr_target, update_zarr_metadata
from .source_data import (
Expand All @@ -33,6 +34,7 @@
LoadTemporalDataForDateDoFn,
_read_nc_dataset
)
from .sanity import generate_raw_paths, OpenLocal, run_sanity_job, update_zarr
from .update_ar import UpdateSlice as ARUpdateSlice
from .update_co import GenerateOffset, UpdateSlice as COUpdateSlice, generate_input_paths
from .update_config_files import (
Expand All @@ -49,5 +51,6 @@
replace_non_alphanumeric_with_hyphen,
subprocess_run,
convert_to_date,
parse_arguments_raw_to_zarr_to_bq
parse_arguments_raw_to_zarr_to_bq,
ExecTypes
)
9 changes: 9 additions & 0 deletions src/arco_era5/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,12 @@
zarr_files = {'ml_wind': 'gs://gcp-public-data-arco-era5/co/model-level-wind.zarr-v2/',
'ml_moisture': 'gs://gcp-public-data-arco-era5/co/model-level-moisture.zarr-v2/',
'sl_surface': 'gs://gcp-public-data-arco-era5/co/single-level-surface.zarr-v2/'}

ARCO_ERA5_ZARR_FILES = {
"ar": "gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3",
"ml_moisture": "gs://gcp-public-data-arco-era5/co/model-level-moisture.zarr-v2",
"ml_wind": "gs://gcp-public-data-arco-era5/co/model-level-wind.zarr-v2",
"sl_forecast": "gs://gcp-public-data-arco-era5/co/single-level-forecast.zarr-v2",
"sl_reanalysis": "gs://gcp-public-data-arco-era5/co/single-level-reanalysis.zarr-v2",
"sl_surface": "gs://gcp-public-data-arco-era5/co/single-level-surface.zarr-v2"
}
Loading