From f63fc58a1911e9f50a6cc82d57cec70848bbfcad Mon Sep 17 00:00:00 2001 From: Darshan Date: Mon, 17 Feb 2025 11:20:33 +0000 Subject: [PATCH 1/5] Initial Commit For ERA5T Support --- Dockerfile | 2 +- README.md | 55 ++---- deployment/Dockerfile | 40 ++++ deployment/create_job.py | 111 +++++++++++ environment.yml | 3 +- raw/{ => daily}/era5_ml_dve.cfg | 0 raw/{ => daily}/era5_ml_o3q.cfg | 0 raw/{ => daily}/era5_ml_qrqs.cfg | 0 raw/{ => daily}/era5_ml_tw.cfg | 0 raw/{ => daily}/era5_pl_hourly.cfg | 0 raw/{ => daily}/era5_sl_hourly.cfg | 0 raw/{ => monthly}/era5_ml_lnsp.cfg | 0 raw/{ => monthly}/era5_ml_zs.cfg | 0 raw/{ => monthly}/era5_sfc.cfg | 0 raw/{ => monthly}/era5_sfc_cape.cfg | 0 raw/{ => monthly}/era5_sfc_cisst.cfg | 0 raw/{ => monthly}/era5_sfc_pcp.cfg | 0 raw/{ => monthly}/era5_sfc_rad.cfg | 0 raw/{ => monthly}/era5_sfc_soil.cfg | 0 raw/{ => monthly}/era5_sfc_tcol.cfg | 0 setup.py | 3 +- src/arco_era5/__init__.py | 9 +- src/arco_era5/constant.py | 9 + src/arco_era5/data_availability.py | 51 ++--- src/arco_era5/download.py | 74 +++++++ src/arco_era5/ingest_data_in_zarr.py | 102 +++++++--- src/arco_era5/resize_zarr.py | 18 +- src/arco_era5/sanity.py | 155 +++++++++++++++ src/arco_era5/update_co.py | 65 ++----- src/arco_era5/update_config_files.py | 133 +++++++++---- src/arco_era5/update_config_files_test.py | 10 +- src/arco_era5/utils.py | 67 +++++++ src/era5-sanity.py | 70 +++++++ src/raw-to-zarr-to-bq.py | 223 +++++++--------------- src/update-ar-data.py | 4 +- src/update-co-data.py | 4 +- 36 files changed, 854 insertions(+), 354 deletions(-) create mode 100644 deployment/Dockerfile create mode 100644 deployment/create_job.py rename raw/{ => daily}/era5_ml_dve.cfg (100%) rename raw/{ => daily}/era5_ml_o3q.cfg (100%) rename raw/{ => daily}/era5_ml_qrqs.cfg (100%) rename raw/{ => daily}/era5_ml_tw.cfg (100%) rename raw/{ => daily}/era5_pl_hourly.cfg (100%) rename raw/{ => daily}/era5_sl_hourly.cfg (100%) rename raw/{ => monthly}/era5_ml_lnsp.cfg (100%) rename raw/{ => monthly}/era5_ml_zs.cfg (100%) rename raw/{ => monthly}/era5_sfc.cfg (100%) rename raw/{ => monthly}/era5_sfc_cape.cfg (100%) rename raw/{ => monthly}/era5_sfc_cisst.cfg (100%) rename raw/{ => monthly}/era5_sfc_pcp.cfg (100%) rename raw/{ => monthly}/era5_sfc_rad.cfg (100%) rename raw/{ => monthly}/era5_sfc_soil.cfg (100%) rename raw/{ => monthly}/era5_sfc_tcol.cfg (100%) create mode 100644 src/arco_era5/download.py create mode 100644 src/arco_era5/sanity.py create mode 100644 src/era5-sanity.py diff --git a/Dockerfile b/Dockerfile index ea9bf19..859daea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -ARG py_version=3.9 +ARG py_version=3.8 FROM apache/beam_python${py_version}_sdk:2.40.0 as beam_sdk FROM continuumio/miniconda3:latest diff --git a/README.md b/README.md index 90714f9..5d3d0d8 100644 --- a/README.md +++ b/README.md @@ -746,7 +746,6 @@ This feature is works in 4 parts. 1. Acquiring raw data from CDS, facilitated by [`weather-dl`](https://weather-tools.readthedocs.io/en/latest/weather_dl/README.html) tool. 2. Splitting raw data using [`weather-sp`](https://weather-tools.readthedocs.io/en/latest/weather_sp/README.html). 3. Ingest this splitted data into a zarr file. - 4. [**WIP**] Ingest [`AR`](gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3) data into BigQuery with the assistance of the [`weather-mv`](https://weather-tools.readthedocs.io/en/latest/weather_mv/README.html). #### How to Run. 1. Set up a Cloud project with sufficient permissions to use cloud storage (such as [GCS](https://cloud.google.com/storage)) and a Beam runner (such as [Dataflow](https://cloud.google.com/dataflow)). @@ -756,26 +755,18 @@ This feature is works in 4 parts. 3. Add the all `Copernicus` licenses into the [secret-manager](https://cloud.google.com/secret-manager) with value likes this: {"api_url": "URL", "api_key": "KEY"} > NOTE: for every API_KEY there must be unique secret-key. -4. Update all of these variable in [docker-file](data_automate/Dockerfile). +4. Update all of these variable in [docker-file](deployment/Dockerfile). * `PROJECT` * `REGION` * `BUCKET` * `MANIFEST_LOCATION` * `API_KEY_*` + * In case of multiple API keys, API_KEY must follow this format: `API_KEY_*`. here * can be numeric value i.e. 1, 2. + * API_KEY_* value is the resource name of [secret-manager key](https://cloud.google.com/secret-manager) and it's value looks like this :: ```projects/PROJECT_NAME/secrets/SECRET_KEY_NAME/versions/1``` * `WEATHER_TOOLS_SDK_CONTAINER_IMAGE` + * Is made using this [dockerfile](https://github.com/google/weather-tools/blob/setuptools/Dockerfile) and is stored in a docker registry. * `ARCO_ERA5_SDK_CONTAINER_IMAGE` - * `BQ_TABLES_LIST` - * `REGION_LIST` - - > * In case of multiple API keys, API_KEY must follow this format: `API_KEY_*`. here * can be numeric value i.e. 1, 2. - > * API_KEY_* value is the resource name of [secret-manager key](https://cloud.google.com/secret-manager) and it's value looks like this :: ```projects/PROJECT_NAME/secrets/SECRET_KEY_NAME/versions/1``` - > * `BQ_TABLES_LIST` is list of the BigQuery table in which data is ingested and it's value is like this :: - ```'["PROJECT.DATASET.TABLE1", "PROJECT.DATASET.TABLE2", ..., "PROJECT.DATASET.TABLE6"]'```. - > * `REGION_LIST` is list of the GCP_region in which the job of ingestion will run :: - ```'["us-east1", "us-west4",..., "us-west2"]'```. - > * Size of `BQ_TABLES_LIST` and `REGION_LIST` must be **6** as total 6 zarr file processed in the current pipeline and also, data ingestion in Bigquery are corresponding to `ZARR_FILES_LIST` of [raw-to-zarr-to-bq.py](/arco-era5/src/raw-to-zarr-to-bq.py) so add table name in `BQ_TABLES_LIST` accordingly. - > * `WEATHER_TOOLS_SDK_CONTAINER_IMAGE` is made using this [dockerfile](https://github.com/google/weather-tools/blob/setuptools/Dockerfile) and is stored in a docker registry. - > * `ARCO_ERA5_SDK_CONTAINER_IMAGE` is made using this [dockerfile](https://github.com/google-research/arco-era5/blob/main/Dockerfile) and is stored in a registry. + * Is made using this [dockerfile](https://github.com/google-research/arco-era5/blob/main/Dockerfile) and is stored in a registry. 5. Create docker image. @@ -787,37 +778,17 @@ export REPO= eg:arco-era5-raw-to-zarr-to-bq gcloud builds submit . --tag "gcr.io/$PROJECT_ID/$REPO:latest" ``` -7. Create a VM using above created docker-image +7. Run script to create cloud run jobs. [create_job](deployment/create_job.py) ``` -export ZONE= eg: us-central1-a -export SERVICE_ACCOUNT= # Let's keep this as Compute Engine Default Service Account -export IMAGE_PATH= # The above created image-path - -gcloud compute instances create-with-container arco-era5-raw-to-zarr-to-bq \ --project=$PROJECT_ID \ ---zone=$ZONE \ ---machine-type=n2-standard-4 \ ---network-interface=network-tier=PREMIUM,subnet=default \ ---maintenance-policy=MIGRATE \ ---provisioning-model=STANDARD \ ---service-account=$SERVICE_ACCOUNT \ ---scopes=https://www.googleapis.com/auth/cloud-platform \ ---image=projects/cos-cloud/global/images/cos-stable-109-17800-0-45 \ ---boot-disk-size=200GB \ ---boot-disk-type=pd-balanced \ ---boot-disk-device-name=arco-era5-raw-to-zarr-to-bq \ ---container-image=$IMAGE_PATH \ ---container-restart-policy=on-failure \ ---container-tty \ ---no-shielded-secure-boot \ ---shielded-vtpm \ ---shielded-integrity-monitoring \ ---labels=goog-ec-src=vm_add-gcloud,container-vm=cos-stable-109-17800-0-45 \ ---metadata-from-file=startup-script=start-up.sh +python deployment/create_job.py ``` -8. Once VM is created, the script will execute on `7th day of every month` as this is default set in the [cron-file](data_automate/cron-file).Also you can see the logs after connecting to VM through SSH. -> Log will be shown at this(`/var/log/cron.log`) file. -> Better if we SSH after 5-10 minutes of VM creation. +8. There will be 5 different cloud run jobs. + - `arco-era5-zarr-ingestion` - For zarr data ingestion. + - `arco-era5t-daily-executor` - Triggers daily to process era5t-daily data. + - `arco-era5t-monthly-executor` - Triggers monthly to process era5t-monthly data. + - `arco-era5-sanity` - Sanity job to validate the data era5 vs era5t and replace in case of difference. + - `arco-era5-executor` - Triggers every month to run a sanity job for every zarr available. ### Making the dataset "High Resolution" & beyond... This phase of the project is under active development! If you would like to lend a hand in any way, please check out our diff --git a/deployment/Dockerfile b/deployment/Dockerfile new file mode 100644 index 0000000..e79cbb9 --- /dev/null +++ b/deployment/Dockerfile @@ -0,0 +1,40 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +FROM continuumio/miniconda3:latest + +# Add the mamba solver for faster builds +RUN conda install -n base conda-libmamba-solver +RUN conda config --set solver libmamba + +# Clone the weather-tools and create conda env using environment.yml of weather-tools. +ARG weather_tools_git_rev=main +RUN git clone https://github.com/google/weather-tools.git /weather +WORKDIR /weather +RUN git checkout "${weather_tools_git_rev}" +RUN rm -r /weather/weather_*/test_data/ +RUN conda env create -f environment.yml --debug + +# Activate the conda env and update the PATH +ARG CONDA_ENV_NAME=weather-tools +RUN echo "source activate ${CONDA_ENV_NAME}" >> ~/.bashrc +ENV PATH /opt/conda/envs/${CONDA_ENV_NAME}/bin:$PATH +RUN pip install -e . + +ARG arco_era5_git_rev=era5t-support +RUN git clone https://github.com/google-research/arco-era5.git /arco-era5 +WORKDIR /arco-era5 +RUN git checkout "${arco_era5_git_rev}" +RUN pip install -e . diff --git a/deployment/create_job.py b/deployment/create_job.py new file mode 100644 index 0000000..89761ed --- /dev/null +++ b/deployment/create_job.py @@ -0,0 +1,111 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from google.cloud import run_v2 +import constants + + +def create_job(job_id: str, template: run_v2.ExecutionTemplate): + # Create a client + client = run_v2.JobsClient() + + # Initialize request argument(s) + job = run_v2.Job() + job.template = template + + request = run_v2.CreateJobRequest( + parent=f"projects/{constants.PROJECT}/locations/{constants.REGION}", + job=job, + job_id=job_id, + ) + + # Make the request + operation = client.create_job(request=request) + + print("Waiting for operation to complete...") + + response = operation.result() + + # Handle the response + print(response) + + +def graphnn_job_creator( + job_id: str, + timeout_sec: int, + container_args: list = None, + contaner_env: list = None, + container_memory_limit: str = None, + max_retries: int = 0 +): + # Create an ExecutionTemplate + template = run_v2.ExecutionTemplate() + template.task_count = constants.TASK_COUNT + + template.template.timeout = {"seconds": timeout_sec} + template.template.max_retries = max_retries + + # Set container details + container = run_v2.Container() + container.name = constants.CONTAINER_NAME + container.image = constants.CLOUD_RUN_CONTAINER_IMAGE + container.command = constants.CONTAINER_COMMAND + container.args = container_args + + # Set environment variables (example) + container.env = contaner_env + + # Set resource limits (example) + container.resources.limits = { + "cpu": constants.CONTAINER_CPU_LIMIT, + "memory": container_memory_limit, + } + + # Add the container to the template + template.template.containers.append(container) + + create_job(job_id, template) + + +if __name__ == "__main__": + graphnn_job_creator( + job_id=constants.SANITY_JOB_ID, + timeout_sec=constants.TIMEOUT_SECONDS, + container_memory_limit=constants.CONTAINER_MEMORY_LIMIT, + ) + graphnn_job_creator( + job_id=constants.INGESTION_JOB_ID, + timeout_sec=constants.TIMEOUT_SECONDS, + container_memory_limit=constants.CONTAINER_MEMORY_LIMIT, + ) + graphnn_job_creator( + job_id=constants.DAILY_EXECUTOR_JOB_ID, + timeout_sec=constants.TIMEOUT_SECONDS, + container_args=constants.DAILY_EXECUTOR_JOB_CONTAINER_ARGS, + contaner_env=constants.JOB_CONTAINER_ENV_VARIABLES + constants.ERA5T_API_KEYS, + container_memory_limit=constants.CONTAINER_MEMORY_LIMIT, + ) + graphnn_job_creator( + job_id=constants.MONTHLY_EXECUTOR_JOB_ID, + timeout_sec=constants.TIMEOUT_SECONDS, + container_args=constants.MONTHLY_EXECUTOR_JOB_CONTAINER_ARGS, + contaner_env=constants.JOB_CONTAINER_ENV_VARIABLES + constants.ERA5T_API_KEYS, + container_memory_limit=constants.CONTAINER_MEMORY_LIMIT, + ) + graphnn_job_creator( + job_id=constants.EXECUTOR_JOB_ID, + timeout_sec=constants.TIMEOUT_SECONDS, + container_args=constants.EXECUTOR_JOB_CONTAINER_ARGS, + contaner_env=constants.JOB_CONTAINER_ENV_VARIABLES + constants.ERA5_API_KEYS, + container_memory_limit=constants.CONTAINER_MEMORY_LIMIT, + ) diff --git a/environment.yml b/environment.yml index e46f5c0..df2ebc0 100644 --- a/environment.yml +++ b/environment.yml @@ -15,8 +15,9 @@ name: era5 channels: - conda-forge dependencies: - - python=3.9.17 + - python=3.8.19 - metview-batch==5.17.0 - pip: - cython==0.29.34 + - setuptools==70.3.0 - metview diff --git a/raw/era5_ml_dve.cfg b/raw/daily/era5_ml_dve.cfg similarity index 100% rename from raw/era5_ml_dve.cfg rename to raw/daily/era5_ml_dve.cfg diff --git a/raw/era5_ml_o3q.cfg b/raw/daily/era5_ml_o3q.cfg similarity index 100% rename from raw/era5_ml_o3q.cfg rename to raw/daily/era5_ml_o3q.cfg diff --git a/raw/era5_ml_qrqs.cfg b/raw/daily/era5_ml_qrqs.cfg similarity index 100% rename from raw/era5_ml_qrqs.cfg rename to raw/daily/era5_ml_qrqs.cfg diff --git a/raw/era5_ml_tw.cfg b/raw/daily/era5_ml_tw.cfg similarity index 100% rename from raw/era5_ml_tw.cfg rename to raw/daily/era5_ml_tw.cfg diff --git a/raw/era5_pl_hourly.cfg b/raw/daily/era5_pl_hourly.cfg similarity index 100% rename from raw/era5_pl_hourly.cfg rename to raw/daily/era5_pl_hourly.cfg diff --git a/raw/era5_sl_hourly.cfg b/raw/daily/era5_sl_hourly.cfg similarity index 100% rename from raw/era5_sl_hourly.cfg rename to raw/daily/era5_sl_hourly.cfg diff --git a/raw/era5_ml_lnsp.cfg b/raw/monthly/era5_ml_lnsp.cfg similarity index 100% rename from raw/era5_ml_lnsp.cfg rename to raw/monthly/era5_ml_lnsp.cfg diff --git a/raw/era5_ml_zs.cfg b/raw/monthly/era5_ml_zs.cfg similarity index 100% rename from raw/era5_ml_zs.cfg rename to raw/monthly/era5_ml_zs.cfg diff --git a/raw/era5_sfc.cfg b/raw/monthly/era5_sfc.cfg similarity index 100% rename from raw/era5_sfc.cfg rename to raw/monthly/era5_sfc.cfg diff --git a/raw/era5_sfc_cape.cfg b/raw/monthly/era5_sfc_cape.cfg similarity index 100% rename from raw/era5_sfc_cape.cfg rename to raw/monthly/era5_sfc_cape.cfg diff --git a/raw/era5_sfc_cisst.cfg b/raw/monthly/era5_sfc_cisst.cfg similarity index 100% rename from raw/era5_sfc_cisst.cfg rename to raw/monthly/era5_sfc_cisst.cfg diff --git a/raw/era5_sfc_pcp.cfg b/raw/monthly/era5_sfc_pcp.cfg similarity index 100% rename from raw/era5_sfc_pcp.cfg rename to raw/monthly/era5_sfc_pcp.cfg diff --git a/raw/era5_sfc_rad.cfg b/raw/monthly/era5_sfc_rad.cfg similarity index 100% rename from raw/era5_sfc_rad.cfg rename to raw/monthly/era5_sfc_rad.cfg diff --git a/raw/era5_sfc_soil.cfg b/raw/monthly/era5_sfc_soil.cfg similarity index 100% rename from raw/era5_sfc_soil.cfg rename to raw/monthly/era5_sfc_soil.cfg diff --git a/raw/era5_sfc_tcol.cfg b/raw/monthly/era5_sfc_tcol.cfg similarity index 100% rename from raw/era5_sfc_tcol.cfg rename to raw/monthly/era5_sfc_tcol.cfg diff --git a/setup.py b/setup.py index c6ca301..4ecb19f 100644 --- a/setup.py +++ b/setup.py @@ -129,7 +129,8 @@ def run(self): 'immutabledict', 'xarray-beam', 'scipy', - 'fsspec==2023.4.0' + 'fsspec==2023.4.0', + 'google-cloud-run' ], tests_require=['pytest'], cmdclass={ diff --git a/src/arco_era5/__init__.py b/src/arco_era5/__init__.py index 2a2005f..570991d 100644 --- a/src/arco_era5/__init__.py +++ b/src/arco_era5/__init__.py @@ -11,9 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from .constant import variables_full_names, zarr_files +from .constant import ARCO_ERA5_ZARR_FILES, variables_full_names, zarr_files from .data_availability import check_data_availability -from .ingest_data_in_zarr import ingest_data_in_zarr_dataflow_job +from .download import raw_data_download_dataflow_job, data_splitting_dataflow_job +from .ingest_data_in_zarr import perform_data_operations from .pangeo import run, parse_args from .resize_zarr import resize_zarr_target, update_zarr_metadata from .source_data import ( @@ -33,6 +34,7 @@ LoadTemporalDataForDateDoFn, _read_nc_dataset ) +from .sanity import generate_raw_paths, OpenLocal, run_sanity_job, update_zarr from .update_ar import UpdateSlice as ARUpdateSlice from .update_co import GenerateOffset, UpdateSlice as COUpdateSlice, generate_input_paths from .update_config_files import ( @@ -49,5 +51,6 @@ replace_non_alphanumeric_with_hyphen, subprocess_run, convert_to_date, - parse_arguments_raw_to_zarr_to_bq + parse_arguments_raw_to_zarr_to_bq, + ExecTypes ) diff --git a/src/arco_era5/constant.py b/src/arco_era5/constant.py index bbb83c9..4228e8f 100644 --- a/src/arco_era5/constant.py +++ b/src/arco_era5/constant.py @@ -30,3 +30,12 @@ zarr_files = {'ml_wind': 'gs://gcp-public-data-arco-era5/co/model-level-wind.zarr-v2/', 'ml_moisture': 'gs://gcp-public-data-arco-era5/co/model-level-moisture.zarr-v2/', 'sl_surface': 'gs://gcp-public-data-arco-era5/co/single-level-surface.zarr-v2/'} + +ARCO_ERA5_ZARR_FILES = { + "ar": "gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3", + "ml_moisture": "gs://gcp-public-data-arco-era5/co/model-level-moisture.zarr-v2", + "ml_wind": "gs://gcp-public-data-arco-era5/co/model-level-wind.zarr-v2", + "sl_forecast": "gs://gcp-public-data-arco-era5/co/single-level-forecast.zarr-v2", + "sl_reanalysis": "gs://gcp-public-data-arco-era5/co/single-level-reanalysis.zarr-v2", + "sl_surface": "gs://gcp-public-data-arco-era5/co/single-level-surface.zarr-v2" +} diff --git a/src/arco_era5/data_availability.py b/src/arco_era5/data_availability.py index 95ec6f8..031037e 100644 --- a/src/arco_era5/data_availability.py +++ b/src/arco_era5/data_availability.py @@ -14,6 +14,7 @@ import datetime import gcsfs import logging +import os import typing as t @@ -22,17 +23,14 @@ SINGLE_LEVEL_VARIABLES, MULTILEVEL_VARIABLES, PRESSURE_LEVELS_GROUPS, + SINGLE_LEVEL_SUBDIR_TEMPLATE, + MULTILEVEL_SUBDIR_TEMPLATE ) from .update_co import generate_input_paths +from .utils import ExecTypes logger = logging.getLogger(__name__) -# File Templates -PRESSURELEVEL_DIR_TEMPLATE = ( - "gs://gcp-public-data-arco-era5/raw/date-variable-pressure_level/{year:04d}/{month:02d}/{day:02d}/{chunk}/{pressure}.nc") -AR_SINGLELEVEL_DIR_TEMPLATE = ( - "gs://gcp-public-data-arco-era5/raw/date-variable-single_level/{year:04d}/{month:02d}/{day:02d}/{chunk}/surface.nc") - # Data Chunks MODEL_LEVEL_CHUNKS = ["dve", "tw", "o3q", "qrqs"] SINGLE_LEVEL_CHUNKS = [ @@ -52,7 +50,22 @@ PRESSURE_LEVEL = PRESSURE_LEVELS_GROUPS["full_37"] -def check_data_availability(data_date_range: t.List[datetime.datetime]) -> bool: +def generate_input_paths_ar(data_date_range: t.List[datetime.datetime], root_path: str = GCP_DIRECTORY): + paths = [] + for date in data_date_range: + for chunk in MULTILEVEL_VARIABLES + SINGLE_LEVEL_VARIABLES: + if chunk in MULTILEVEL_VARIABLES: + for pressure in PRESSURE_LEVEL: + relative_path = MULTILEVEL_SUBDIR_TEMPLATE.format(year=date.year, month=date.month, day=date.day, variable=chunk, pressure_level=pressure) + paths.append(os.path.join(root_path, relative_path)) + else: + chunk = 'geopotential' if chunk == 'geopotential_at_surface' else chunk + relative_path = SINGLE_LEVEL_SUBDIR_TEMPLATE.format(year=date.year, month=date.month, day=date.day, variable=chunk) + paths.append(os.path.join(root_path, relative_path)) + return paths + + +def check_data_availability(data_date_range: t.List[datetime.datetime], mode: str, root_path: str) -> bool: """Checks the availability of data for a given date range. Args: @@ -65,24 +78,12 @@ def check_data_availability(data_date_range: t.List[datetime.datetime]) -> bool: fs = gcsfs.GCSFileSystem() start_date = data_date_range[0].strftime("%Y/%m/%d") end_date = data_date_range[-1].strftime("%Y/%m/%d") - all_uri = generate_input_paths(start_date, end_date, GCP_DIRECTORY, MODEL_LEVEL_CHUNKS) - all_uri.extend(generate_input_paths(start_date, end_date, GCP_DIRECTORY, SINGLE_LEVEL_CHUNKS, True)) - - for date in data_date_range: - for chunk in MULTILEVEL_VARIABLES + SINGLE_LEVEL_VARIABLES: - if chunk in MULTILEVEL_VARIABLES: - for pressure in PRESSURE_LEVEL: - all_uri.append( - PRESSURELEVEL_DIR_TEMPLATE.format(year=date.year, - month=date.month, - day=date.day, chunk=chunk, - pressure=pressure)) - else: - if chunk == 'geopotential_at_surface': - chunk = 'geopotential' - all_uri.append( - AR_SINGLELEVEL_DIR_TEMPLATE.format( - year=date.year, month=date.month, day=date.day, chunk=chunk)) + all_uri = [] + if mode == ExecTypes.ERA5T_DAILY.value or mode == ExecTypes.ERA5.value: + all_uri.extend(generate_input_paths(start_date, end_date, root_path, MODEL_LEVEL_CHUNKS)) + all_uri.extend(generate_input_paths_ar(data_date_range, root_path)) + if mode == ExecTypes.ERA5.value or mode == ExecTypes.ERA5T_MONTHLY.value: + all_uri.extend(generate_input_paths(start_date, end_date, root_path, SINGLE_LEVEL_CHUNKS, True)) data_is_missing = False for path in all_uri: diff --git a/src/arco_era5/download.py b/src/arco_era5/download.py new file mode 100644 index 0000000..1311ce2 --- /dev/null +++ b/src/arco_era5/download.py @@ -0,0 +1,74 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import datetime +import os + +from concurrent.futures import ThreadPoolExecutor + +from .utils import ExecTypes, subprocess_run + +DIRECTORY = "/arco-era5/raw" +PROJECT = os.environ.get("PROJECT") +REGION = os.environ.get("REGION") +BUCKET = os.environ.get("BUCKET") +MANIFEST_LOCATION = os.environ.get("MANIFEST_LOCATION") +PYTHON_PATH = os.environ.get("PYTHON_PATH") +WEATHER_TOOLS_SDK_CONTAINER_IMAGE = os.environ.get("WEATHER_TOOLS_SDK_CONTAINER_IMAGE") + +SPLITTING_DATASETS = ['soil', 'pcp'] + +def raw_data_download_dataflow_job(mode: str, first_day: datetime.date): + """Launches a Dataflow job to process weather data.""" + job_name = f"raw-data-download-arco-era5-{first_day.strftime('%Y-%m')}" + if mode == ExecTypes.ERA5T_DAILY.value: + job_name = f"raw-data-download-arco-era5-{first_day.strftime('%Y-%m-%d')}" + + if mode == ExecTypes.ERA5.value: + config_path = f"{DIRECTORY}/*/*.cfg" + else: + config_path = f"{DIRECTORY}/{mode}/*.cfg" + + command = ( + f"{PYTHON_PATH} /weather/weather_dl/weather-dl {config_path} " + f"--runner DataflowRunner --project {PROJECT} --region {REGION} --temp_location " + f'"gs://{BUCKET}/tmp/" --disk_size_gb 260 --job_name {job_name} ' + f"--sdk_container_image {WEATHER_TOOLS_SDK_CONTAINER_IMAGE} --experiment use_runner_v2 " + f"--manifest-location {MANIFEST_LOCATION} " + ) + subprocess_run(command) + +def data_splitting_dataflow_job(date: str, root_path: str): + """Launches a Dataflow job to splitting soil & pcp weather data.""" + year = date[:4] + month = year + date[5:7] + typeOfLevel = '{' + 'typeOfLevel' + '}' + shortName = '{' + 'shortName' + '}' + zero = '{' + '0' + '}' + first = '{' + '1' + '}' + commands = [] + for DATASET in SPLITTING_DATASETS: + command = ( + f'{PYTHON_PATH} /weather/weather_sp/weather-sp --input-pattern ' + f' "{root_path}/ERA5GRIB/HRES/Month/{year}/{month}_hres_{DATASET}.grb2" ' + f'--output-template "{root_path}/ERA5GRIB/HRES/Month/{first}/{zero}.grb2_{typeOfLevel}_{shortName}.grib" ' + f'--runner DataflowRunner --project {PROJECT} --region {REGION} ' + f'--temp_location gs://{BUCKET}/tmp --disk_size_gb 3600 ' + f'--job_name split-{DATASET}-data-{month} ' + f'--sdk_container_image {WEATHER_TOOLS_SDK_CONTAINER_IMAGE} ' + ) + commands.append(command) + + with ThreadPoolExecutor(max_workers=4) as tp: + for command in commands: + tp.submit(subprocess_run, command) diff --git a/src/arco_era5/ingest_data_in_zarr.py b/src/arco_era5/ingest_data_in_zarr.py index ffd5051..3bffe51 100644 --- a/src/arco_era5/ingest_data_in_zarr.py +++ b/src/arco_era5/ingest_data_in_zarr.py @@ -14,10 +14,18 @@ import logging import os -from .utils import replace_non_alphanumeric_with_hyphen, subprocess_run +from .resize_zarr import update_zarr_metadata +from .utils import ExecTypes, replace_non_alphanumeric_with_hyphen, run_cloud_job logger = logging.getLogger(__name__) +PROJECT = os.environ.get("PROJECT") +REGION = os.environ.get("REGION") +BUCKET = os.environ.get("BUCKET") +INGESTION_JOB_ID = os.environ.get("INGESTION_JOB_ID") +ROOT_PATH = os.environ.get("ROOT_PATH") +ARCO_ERA5_SDK_CONTAINER_IMAGE = os.environ.get("ARCO_ERA5_SDK_CONTAINER_IMAGE") + AR_FILE_PATH = '/arco-era5/src/update-ar-data.py' CO_FILE_PATH = '/arco-era5/src/update-co-data.py' CO_FILES_MAPPING = { @@ -46,10 +54,40 @@ 'single-level-surface': ['lnsp', 'zs'] } +def generate_override_args( + file_path: str, + target_path: str, + start_date: str, + end_date: str, + root_path: str, + init_date: str, + bucket: str, + project: str, + region: str, + job_name: str +) -> list: + args = [ + file_path, + "--output_path", target_path, + "-s", start_date, + "-e", end_date, + "--root_path", root_path, + "--init_date", init_date, + "--temp_location", f"gs://{bucket}/temp", + "--runner", "DataflowRunner", + "--project", project, + "--region", region, + "--experiments", "use_runner_v2", + "--disk_size_gb", "250", + "--setup_file", "/arco-era5/setup.py", + "--job_name", job_name, + "--number_of_worker_harness_threads", "1" + ] + return args def ingest_data_in_zarr_dataflow_job(target_path: str, region: str, start_date: str, - end_date: str, init_date: str, project: str, - bucket: str, sdk_container_image: str, python_path: str) -> None: + end_date: str, root_path: str, init_date: str, project: str, + bucket: str, ingestion_job_id: str, mode: str) -> None: """Ingests data into a Zarr store and runs a Dataflow job. Args: @@ -64,35 +102,47 @@ def ingest_data_in_zarr_dataflow_job(target_path: str, region: str, start_date: """ job_name = target_path.split('/')[-1] job_name = os.path.splitext(job_name)[0] + process_date = start_date[:7] + if mode == ExecTypes.ERA5T_DAILY.value: + process_date = start_date job_name = ( - f"zarr-data-ingestion-{replace_non_alphanumeric_with_hyphen(job_name)}-{start_date}-to-{end_date}" + f"zarr-data-ingestion-{replace_non_alphanumeric_with_hyphen(job_name)}-{process_date}" ) + override_args = generate_override_args(CO_FILE_PATH, target_path, start_date, end_date, root_path, init_date, bucket, project, region, job_name) if '/ar/' in target_path: logger.info(f"Data ingestion for {target_path} of AR data.") - command = ( - f"{python_path} {AR_FILE_PATH} --output_path {target_path} " - f"-s {start_date} -e {end_date} --pressure_levels_group full_37 " - f"--temp_location gs://{bucket}/temp --runner DataflowRunner " - f"--project {project} --region {region} --experiments use_runner_v2 " - f"--worker_machine_type n2-highmem-32 --disk_size_gb 250 " - f"--setup_file /arco-era5/setup.py " - f"--job_name {job_name} --number_of_worker_harness_threads 1 " - f"--init_date {init_date}") + override_args[0] = AR_FILE_PATH + override_args.extend([ + "--pressure_levels_group", "full_37", + "--machine_type", "n2-highmem-32" + ]) else: chunks = CO_FILES_MAPPING[target_path.split('/')[-1].split('.')[0]] - chunks = " ".join(chunks) time_per_day = 2 if 'single-level-forecast' in target_path else 24 + override_args.extend([ + "--time_per_day", str(time_per_day), + "--machine_type", "n2-highmem-8", + "--sdk_container_image", ARCO_ERA5_SDK_CONTAINER_IMAGE, + "--c" + ]) + override_args.extend(chunks) logger.info(f"Data ingestion for {target_path} of CO data.") - command = ( - f"{python_path} {CO_FILE_PATH} --output_path {target_path} " - f"-s {start_date} -e {end_date} -c {chunks} " - f"--time_per_day {time_per_day} " - f"--temp_location gs://{bucket}/temp --runner DataflowRunner " - f"--project {project} --region {region} --experiments use_runner_v2 " - f"--worker_machine_type n2-highmem-8 --disk_size_gb 250 " - f"--setup_file /arco-era5/setup.py " - f"--job_name {job_name} --number_of_worker_harness_threads 1 " - f"--sdk_container_image {sdk_container_image} " - f"--init_date {init_date}") + + run_cloud_job(project, region, ingestion_job_id, override_args) + + +def perform_data_operations(z_file: str, start_date: str, + end_date: str, init_date: str, mode: str): + # Function to process a single pair of z_file and table + try: + logger.info(f"Data ingesting for {z_file} is started.") + ingest_data_in_zarr_dataflow_job(z_file, REGION, start_date, end_date, ROOT_PATH, init_date, + PROJECT, BUCKET, INGESTION_JOB_ID, mode) + logger.info(f"Data ingesting for {z_file} is completed.") + logger.info(f"update metadata for zarr file: {z_file} started.") - subprocess_run(command) + update_zarr_metadata(z_file, end_date, mode) + logger.info(f"update metadata for zarr file: {z_file} completed.") + except Exception as e: + logger.error( + f"An error occurred in process_zarr for {z_file}: {str(e)}") diff --git a/src/arco_era5/resize_zarr.py b/src/arco_era5/resize_zarr.py index b534dcc..fb300a4 100644 --- a/src/arco_era5/resize_zarr.py +++ b/src/arco_era5/resize_zarr.py @@ -21,7 +21,7 @@ import typing as t from gcsfs import GCSFileSystem -from .utils import convert_to_date +from .utils import convert_to_date, ExecTypes logger = logging.getLogger(__name__) @@ -149,12 +149,18 @@ def resize_zarr_target(target_store: str, end_date: datetime, init_date: str, logger.info(f"Data is already resized for {target_store}.") -def update_zarr_metadata(url: str, time_end: datetime.date, metadata_key: str = '.zmetadata') -> None: +def update_zarr_metadata(url: str, time_end: datetime.date, mode: str, metadata_key: str = '.zmetadata') -> None: try: - attrs = {"valid_time_start": "1940-01-01", - "valid_time_stop": str(time_end), - "last_updated": str(datetime.datetime.utcnow()) - } + attrs = { + "valid_time_start": "1940-01-01", + "last_updated": str(datetime.datetime.now(datetime.timezone.utc)) + } + + if mode == ExecTypes.ERA5.value: + attrs['valid_time_stop'] = str(time_end) + else: + attrs['valid_time_stop_era5t'] = str(time_end) + root_group = zarr.open(url) # update zarr_store/.zattrs file. diff --git a/src/arco_era5/sanity.py b/src/arco_era5/sanity.py new file mode 100644 index 0000000..476c502 --- /dev/null +++ b/src/arco_era5/sanity.py @@ -0,0 +1,155 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import os +import zarr + +import apache_beam as beam +import numpy as np +import typing as t +import xarray as xr + +from dataclasses import dataclass + +from .data_availability import generate_input_paths_ar +from .ingest_data_in_zarr import CO_FILES_MAPPING, replace_non_alphanumeric_with_hyphen +from .update_co import generate_offsets_from_url, generate_input_paths +from .source_data import HOURS_PER_DAY, offset_along_time_axis, GCP_DIRECTORY +from .utils import copy, date_range, opener, run_cloud_job + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +PROJECT = os.environ.get("PROJECT") +REGION = os.environ.get("REGION") +BUCKET = os.environ.get("BUCKET") +SANITY_JOB_ID = os.environ.get("SANITY_JOB_ID") + +SANITY_JOB_FILE = "/arco-era5/src/era5-sanity.py" + +HARNESS_THREADS = { + 'model-level-moisture': 1, + 'model-level-wind': 4 +} + + +def generate_raw_paths(start_date: str, end_date: str, target_path: str, is_single_level: bool, is_analysis_ready: bool, root_path: str = GCP_DIRECTORY): + if is_analysis_ready: + data_date_range = date_range(start_date, end_date) + paths = generate_input_paths_ar(data_date_range, root_path) + else: + chunks = CO_FILES_MAPPING[target_path.split('/')[-1].split('.')[0]] + paths = generate_input_paths(start_date, end_date, root_path, chunks, is_single_level) + return paths + +def parse_ar_url(url: str, init_date: str, da: np.ndarray): + year, month, day, variable, file_name = url.rsplit("/", 5)[1:] + time_offset_start = offset_along_time_axis(init_date, int(year), int(month), int(day)) + time_offset_end = time_offset_start + HOURS_PER_DAY + if file_name == "surface.nc": + return (slice(time_offset_start, time_offset_end)), variable, da + else: + level = int(file_name.split(".")[0]) + da = np.expand_dims(da, axis=1) + return (slice(time_offset_start, time_offset_end), slice(level, level + 1)), variable, da + +def open_dataset(path: str): + ds = xr.open_dataset(path, engine="scipy" if ".nc" in path else "cfgrib").load() + return ds + +@dataclass +class OpenLocal(beam.DoFn): + + target_path: str + init_date: str + timestamps_per_file: int + is_single_level: bool + is_analysis_ready: bool + + def process(self, paths: t.Tuple[str, str]): + + path1, path2 = paths + + with opener(path1) as file1: + ds1 = open_dataset(file1) + + with opener(path2) as file2: + ds2 = open_dataset(file2) + + for vname in ds1.data_vars: + if ds1[vname].equals(ds2[vname]): + beam.metrics.Metrics.counter('Success', 'Equal').inc() + logger.info(f"For {path1} variable {vname} is equal.") + else: + beam.metrics.Metrics.counter('Success', 'Different').inc() + logger.info(f"For {path1} variable {vname} is not equal.") + if self.is_analysis_ready: + region, variable, da = parse_ar_url(file1, self.init_date, ds2[vname].values) + yield self.target_path, variable, region, da, path1, path2 + else: + start, end, _ = generate_offsets_from_url(file1, self.init_date, self.timestamps_per_file, self.is_single_level) + region = (slice(start, end)) + yield self.target_path, vname, region, ds2[vname].values, path1, path2 + +def update_zarr(target_path: str, vname: str, region: t.Union[t.Tuple[slice], t.Tuple[slice, slice]], da: np.ndarray, path1: str, path2: str): + zf = zarr.open(target_path) + zv = zf[vname] + logger.info(f"Zarr Data Check for {vname} is {np.array_equal(zv[:], da, equal_nan=True)}") + zv[region] = da + copy(path2, path1) + +def generate_override_args( + file_path: str, + target_path: str, + temp_path: str, + init_date: str, + bucket: str, + project: str, + region: str, + job_name: str +) -> list: + args = [ + file_path, + "--target_path", target_path, + "--temp_path", temp_path, + "--init_date", init_date, + "--temp_location", f"gs://{bucket}/temp", + "--runner", "DataflowRunner", + "--project", project, + "--region", region, + "--experiments", "use_runner_v2", + "--machine_type", "n2-highmem-16", + "--disk_size_gb", "250", + "--setup_file", "/arco-era5/setup.py", + "--job_name", job_name, + ] + return args + +def run_sanity_job(target_path: str, temp_path: str, init_date: str): + """Trigger job for era5 data sanity.""" + + target_name = target_path.split('/')[-1].split('.')[0] + job_name = f"arco-era5-3m-sanity-{replace_non_alphanumeric_with_hyphen(target_name)}" + + override_args = generate_override_args( + SANITY_JOB_FILE, target_path, temp_path, init_date, BUCKET, PROJECT, REGION, job_name + ) + + if target_name in HARNESS_THREADS: + override_args.extend(['--number_of_worker_harness_threads', str(HARNESS_THREADS[target_name])]) + + if "single-level-forecast" in target_path: + override_args.extend(['--timestamps_per_day', "2"]) + + run_cloud_job(PROJECT, REGION, SANITY_JOB_ID, override_args) diff --git a/src/arco_era5/update_co.py b/src/arco_era5/update_co.py index 871ec2f..6d1d183 100644 --- a/src/arco_era5/update_co.py +++ b/src/arco_era5/update_co.py @@ -28,7 +28,7 @@ from dataclasses import dataclass from itertools import product -from .utils import date_range +from .utils import date_range, opener logger = logging.getLogger(__name__) @@ -97,38 +97,6 @@ def convert_to_date(date_str: str, format: str = '%Y-%m-%d') -> datetime.datetim return datetime.datetime.strptime(date_str, format) -def copy(src: str, dst: str) -> None: - """A method for generating the offset along with time dimension. - - Args: - src (str): The cloud storage path to the grib file. - dst (str): A temp location to copy the file. - """ - cmd = 'gcloud alpha storage cp' - try: - subprocess.run(cmd.split() + [src, dst], check=True, capture_output=True, - text=True, input="n/n") - return - except subprocess.CalledProcessError as e: - msg = f"Failed to copy file {src!r} to {dst!r} Error {e}" - logger.error(msg) - - -@contextmanager -def opener(fname: str) -> t.Any: - """A method to copy remote file into temp. - - Args: - url (str): The cloud storage path to the grib file. - """ - _, suffix = os.path.splitext(fname) - with tempfile.NamedTemporaryFile(suffix=suffix) as ntf: - tmp_name = ntf.name - logger.info(f"Copying '{fname}' to local file '{tmp_name}'") - copy(fname, tmp_name) - yield tmp_name - - def generate_input_paths(start: str, end: str, root_path: str, chunks: t.List[str], is_single_level: bool = False) -> t.List[str]: """A method for generating the url using the combination of chunks and time range. @@ -159,6 +127,22 @@ def generate_input_paths(start: str, end: str, root_path: str, chunks: t.List[st return input_paths +def generate_offsets_from_url(url: str, init_date: str, timestamps_per_file: int, is_single_level: bool): + file_name = url.rsplit('/', 1)[1].rsplit('.', 1)[0] + int_date, chunk = file_name.split('_hres_') + if "_" in chunk: + chunk = chunk.replace(".grb2_", "_") + if is_single_level: + int_date += "01" + start_date = convert_to_date(int_date, '%Y%m%d') + days_diff = start_date - convert_to_date(init_date) + start = days_diff.days * timestamps_per_file + end = start + timestamps_per_file * ( + calendar.monthrange(start_date.year, + start_date.month)[1] if is_single_level else 1) + return start, end, chunk + + @dataclass class GenerateOffset(beam.PTransform): """A Beam PTransform for generating the offset along with time dimension.""" @@ -176,18 +160,9 @@ def apply(self, url: str) -> t.Tuple[str, slice, t.List[str]]: Returns: t.Tuple: url with included variables and time offset. """ - file_name = url.rsplit('/', 1)[1].rsplit('.', 1)[0] - int_date, chunk = file_name.split('_hres_') - if "_" in chunk: - chunk = chunk.replace(".grb2_", "_") - if self.is_single_level: - int_date += "01" - start_date = convert_to_date(int_date, '%Y%m%d') - days_diff = start_date - convert_to_date(self.init_date) - start = days_diff.days * self.timestamps_per_file - end = start + self.timestamps_per_file * ( - calendar.monthrange(start_date.year, - start_date.month)[1] if self.is_single_level else 1) + start, end, chunk = generate_offsets_from_url( + url, self.init_date, self.timestamps_per_file, self.is_single_level + ) return url, slice(start, end), VARIABLE_DICT[chunk] def expand(self, pcoll: beam.PCollection) -> beam.PCollection: diff --git a/src/arco_era5/update_config_files.py b/src/arco_era5/update_config_files.py index 9cb2e41..a895bde 100644 --- a/src/arco_era5/update_config_files.py +++ b/src/arco_era5/update_config_files.py @@ -14,12 +14,28 @@ # ============================================================================== import configparser import datetime +import glob import json +import logging import os +import re import typing as t from google.cloud import secretmanager +from .utils import ExecTypes + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +FILES_TO_UPDATE = { + ExecTypes.ERA5.value: ["dve", "o3q", "qrqs", "tw", "pl", "sl", "lnsp", "zs", "sfc"], + ExecTypes.ERA5T_DAILY.value: ["dve", "o3q", "qrqs", "tw", "pl", "sl"], + ExecTypes.ERA5T_MONTHLY.value: ["lnsp", "zs", "sfc"] +} + +API_KEY_PATTERN = re.compile(r"^API_KEY_\d+$") + class ConfigArgs(t.TypedDict): """A class representing the configuration arguments for the new_config_file @@ -28,14 +44,14 @@ class ConfigArgs(t.TypedDict): Attributes: year_wise_date (bool): True if the configuration file contains 'year', 'month' and 'day', False otherwise. - first_day_third_prev (datetime.date): The first day of the third previous month. - last_day_third_prev (datetime.date): The last day of the third previous month. + first_day (datetime.date): The first day of the third previous month. + last_day (datetime.date): The last day of the third previous month. sl_year (str): The year of the third previous month in 'YYYY' format. sl_month (str): The month of the third previous month in 'MM' format. """ year_wise_date: bool - first_day_third_prev: datetime.date - last_day_third_prev: datetime.date + first_day: datetime.date + last_day: datetime.date sl_year: str sl_month: str @@ -44,19 +60,19 @@ class MonthDates(t.TypedDict): """A class representing the first and third previous month's dates. Attributes: - first_day_third_prev (datetime.date): The first day of the third previous month. - last_day_third_prev (datetime.date): The last day of the third previous month. + first_day (datetime.date): The first day of the third previous month. + last_day (datetime.date): The last day of the third previous month. sl_year (str): The year of the third previous month in 'YYYY' format. sl_month (str): The month of the third previous month in 'MM' format. """ - first_day_third_prev: datetime.date - last_day_third_prev: datetime.date + first_day: datetime.date + last_day: datetime.date sl_year: str sl_month: str def new_config_file(config_file: str, field_name: str, additional_content: str, - config_args: ConfigArgs) -> None: + config_args: ConfigArgs, temp_path: str = None) -> None: """Modify the specified configuration file with new values. Parameters: @@ -70,21 +86,25 @@ def new_config_file(config_file: str, field_name: str, additional_content: str, # Unpack the values from config_args dictionary year_wise_date = config_args["year_wise_date"] - first_day_third_prev = config_args["first_day_third_prev"] - last_day_third_prev = config_args["last_day_third_prev"] + first_day = config_args["first_day"] + last_day = config_args["last_day"] sl_year = config_args["sl_year"] sl_month = config_args["sl_month"] - config = configparser.ConfigParser() + config = configparser.ConfigParser(interpolation=None) config.read(config_file) + if temp_path: + target_path = config.get("parameters", "target_path") + config.set("parameters", "target_path", target_path.replace("gs://gcp-public-data-arco-era5/raw", temp_path)) + if year_wise_date: config.set("selection", "year", sl_year) config.set("selection", "month", sl_month) config.set("selection", "day", "all") else: config.set("selection", field_name, - f"{first_day_third_prev}/to/{last_day_third_prev}") + f"{first_day}/to/{last_day}") sections_list = additional_content.split("\n\n") for section in sections_list[:-1]: @@ -115,37 +135,63 @@ def get_month_range(date: datetime.date) -> t.Tuple[datetime.date, datetime.date return first_day, last_day -def get_previous_month_dates() -> MonthDates: - """Return a dictionary containing the first and third previous month's dates from - the current date. +def get_previous_month_dates(mode: str) -> MonthDates: + """Return a dictionary containing the first and last date to process. Returns: dict: A dictionary containing the following key-value pairs: - - 'first_day_third_prev': The first day of the third previous month - (datetime.date). - - 'last_day_third_prev': The last day of the third previous month - (datetime.date). - - 'sl_year': The year of the third previous month in 'YYYY' format (str). - - 'sl_month': The month of the third previous month in 'MM' format (str). + - 'first_day': The first day (datetime.date). + - 'last_day': The last day (datetime.date). + - 'sl_year': The year of the date in 'YYYY' format (str). + - 'sl_month': The month of the date in 'MM' format (str). """ today = datetime.date.today() - # Calculate the correct previous third month considering months from 1 to 12 - third_prev_month = today - datetime.timedelta(days=2*366/12) - first_day_third_prev, last_day_third_prev = get_month_range(third_prev_month) - first_date_third_prev = first_day_third_prev - sl_year, sl_month = str(first_date_third_prev)[:4], str(first_date_third_prev)[5:7] + if mode == ExecTypes.ERA5T_DAILY.value: + # Get date before 6 days + third_prev_month = today - datetime.timedelta(days=6) + first_day = last_day = third_prev_month + elif mode == ExecTypes.ERA5T_MONTHLY.value: + # Get date range for previous month + third_prev_month = today + first_day, last_day = get_month_range(third_prev_month) + else: + # Calculate the correct previous third month considering months from 1 to 12 + third_prev_month = today - datetime.timedelta(days=2*366/12) + first_day, last_day = get_month_range(third_prev_month) + sl_year, sl_month = str(first_day)[:4], str(first_day)[5:7] return { - 'first_day_third_prev': first_day_third_prev, - 'last_day_third_prev': last_day_third_prev, + 'first_day': first_day, + 'last_day': last_day, 'sl_year': sl_year, 'sl_month': sl_month, } +def get_api_keys() -> t.List[str]: + api_key_list = [] + for env_var in os.environ: + if API_KEY_PATTERN.match(env_var): + api_key_value = os.environ.get(env_var) + api_key_list.append(api_key_value) + return api_key_list + + +def generate_additional_content() -> str: + """Generate additional_content including API KEYS.""" + api_key_list = get_api_keys() + + additional_content = "" + for count, secret_key in enumerate(api_key_list): + secret_key_value = get_secret(secret_key) + additional_content += f'parameters.api{count}\n\ + api_url={secret_key_value["api_url"]}\napi_key={secret_key_value["api_key"]}\n\n' + return additional_content + + def update_config_file(directory: str, field_name: str, - additional_content: str) -> None: + mode: str, temp_path: str = None) -> None: """Update the configuration files in the specified directory. Parameters: @@ -154,22 +200,30 @@ def update_config_file(directory: str, field_name: str, additional_content (str): The additional content to be added under the '[selection]' section. """ - dates_data = get_previous_month_dates() + dates_data = get_previous_month_dates(mode) config_args = { - "first_day_third_prev": dates_data['first_day_third_prev'], - "last_day_third_prev": dates_data['last_day_third_prev'], + "first_day": dates_data['first_day'], + "last_day": dates_data['last_day'], "sl_year": dates_data['sl_year'], "sl_month": dates_data['sl_month'], } - for filename in os.listdir(directory): + files_to_update = FILES_TO_UPDATE[mode] + if mode == ExecTypes.ERA5.value: + all_files = glob.glob(f"{directory}/*/*.cfg") + else: + all_files = glob.glob(f"{directory}/{mode}/*.cfg") + directory = f"{directory}/{mode}" + + additional_content = generate_additional_content() + + for filename in all_files: config_args["year_wise_date"] = False - if filename.endswith(".cfg"): + if any(chunk in filename for chunk in files_to_update): if "lnsp" in filename or "zs" in filename or "sfc" in filename: config_args["year_wise_date"] = True - config_file = os.path.join(directory, filename) # Pass the data as keyword arguments to the new_config_file function - new_config_file(config_file, field_name, additional_content, - config_args=config_args) + new_config_file(filename, field_name, additional_content, + config_args=config_args, temp_path=temp_path) def get_secret(secret_key: str) -> dict: @@ -207,7 +261,7 @@ def remove_license_from_config_file(config_file_path: str, num_licenses: int) -> config.write(file, space_around_delimiters=False) -def remove_licenses_from_directory(directory_path: str, num_licenses: int) -> None: +def remove_licenses_from_directory(directory_path: str) -> None: """Remove licenses from all configuration files in a directory. Args: @@ -216,6 +270,7 @@ def remove_licenses_from_directory(directory_path: str, num_licenses: int) -> No configuration file. """ + num_licenses = len(get_api_keys()) for filename in os.listdir(directory_path): if filename.endswith(".cfg"): config_file_path = os.path.join(directory_path, filename) diff --git a/src/arco_era5/update_config_files_test.py b/src/arco_era5/update_config_files_test.py index 2d52efe..04a3def 100644 --- a/src/arco_era5/update_config_files_test.py +++ b/src/arco_era5/update_config_files_test.py @@ -31,8 +31,8 @@ def setUp(self): time=00/to/23\nparam=138/155\n" ) self.config_args = { - "first_day_third_prev": datetime.date(2023, 5, 1), - "last_day_third_prev": datetime.date(2023, 5, 31), + "first_day": datetime.date(2023, 5, 1), + "last_day": datetime.date(2023, 5, 31), "sl_year": "2023", "sl_month": "05", "year_wise_date": False @@ -59,7 +59,7 @@ def test_new_config_file(self): self.assertIn(section_name, config.sections()) self.assertEqual( config.get("selection", "date"), - f'{self.config_args["first_day_third_prev"]}/to/{self.config_args["last_day_third_prev"]}', + f'{self.config_args["first_day"]}/to/{self.config_args["last_day"]}', ) self.assertEqual(config.get(section_name, 'api_url'), section_api_url) @@ -88,8 +88,8 @@ def test_get_month_range(self): def test_get_previous_month_dates(self): # Test get_previous_month_dates function prev_month_data = get_previous_month_dates() - self.assertIn("first_day_third_prev", prev_month_data) - self.assertIn("last_day_third_prev", prev_month_data) + self.assertIn("first_day", prev_month_data) + self.assertIn("last_day", prev_month_data) self.assertIn("sl_year", prev_month_data) self.assertIn("sl_month", prev_month_data) diff --git a/src/arco_era5/utils.py b/src/arco_era5/utils.py index e9270d8..161e3ee 100644 --- a/src/arco_era5/utils.py +++ b/src/arco_era5/utils.py @@ -14,16 +14,29 @@ import argparse import datetime import logging +import os import re import subprocess import sys +import tempfile import pandas as pd import typing as t +from enum import Enum +from contextlib import contextmanager +from google.cloud import run_v2 + logger = logging.getLogger(__name__) +class ExecTypes(Enum): + + ERA5 = "era5" + ERA5T_DAILY = "daily" + ERA5T_MONTHLY = "monthly" + + def date_range(start_date: str, end_date: str, freq: str = "D") -> t.List[datetime.datetime]: """Generates a list of datetime objects within a given date range. @@ -108,5 +121,59 @@ def parse_arguments_raw_to_zarr_to_bq(desc: str) -> t.Tuple[argparse.Namespace, parser.add_argument("--init_date", type=str, default='1900-01-01', help="Date to initialize the zarr store.") + parser.add_argument("--mode", type=str, default="era5", + help="Mode to execute the flow. Supported values era5, daily, monthly") return parser.parse_known_args() + + +def copy(src: str, dst: str) -> None: + """A method for generating the offset along with time dimension. + + Args: + src (str): The cloud storage path to the grib file. + dst (str): A temp location to copy the file. + """ + cmd = 'gsutil -m cp' + try: + subprocess.run(cmd.split() + [src, dst], check=True, capture_output=True, + text=True, input="n/n") + return + except subprocess.CalledProcessError as e: + msg = f"Failed to copy file {src!r} to {dst!r} Error {e}" + logger.error(msg) + + +@contextmanager +def opener(fname: str) -> t.Any: + """A method to copy remote file into temp. + + Args: + url (str): The cloud storage path to the grib file. + """ + _, suffix = os.path.splitext(fname) + with tempfile.NamedTemporaryFile(suffix=suffix) as ntf: + tmp_name = ntf.name + logger.info(f"Copying '{fname}' to local file '{tmp_name}'") + copy(fname, tmp_name) + yield tmp_name + + +def run_cloud_job(project: str, region: str, job: str, override_args: t.List[str]): + client = run_v2.JobsClient() + job_executor_path = client.job_path( + project, region, job + ) + override_spec = { + "container_overrides": [{ "args": override_args }] + } + job_executor_request = run_v2.RunJobRequest( + name=job_executor_path, + overrides=override_spec, + ) + try: + response = client.run_job(request=job_executor_request) + if response.running(): + print("job_executor triggered.") + except Exception as e: + print(f"Error starting ee_job_executor: {e}") diff --git a/src/era5-sanity.py b/src/era5-sanity.py new file mode 100644 index 0000000..1b25384 --- /dev/null +++ b/src/era5-sanity.py @@ -0,0 +1,70 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import argparse + +import apache_beam as beam +import typing as t + +from arco_era5 import ExecTypes, generate_raw_paths, get_previous_month_dates, OpenLocal, update_zarr, update_zarr_metadata + +def parse_arguments(desc: str) -> t.Tuple[argparse.Namespace, + t.List[str]]: + """Parse command-line arguments for the data processing pipeline.""" + parser = argparse.ArgumentParser(description=desc) + + parser.add_argument("--temp_path", required=True, type=str, + help="Temporary path for era5 raw.") + parser.add_argument("--target_path", required=True, type=str, + help="Path to zarr store.") + parser.add_argument('-i', '--init_date', default='1900-01-01', + help='Start date, iso format string.') + parser.add_argument('--timestamps_per_day', type=int, default=24, + help='Timestamps Per Day.') + + return parser.parse_known_args() + + +if __name__ == "__main__": + + parsed_args, pipeline_args = parse_arguments("Parse arguments.") + + is_single_level = "single-level" in parsed_args.target_path + is_analysis_ready = "/ar/" in parsed_args.target_path + + dates = get_previous_month_dates("era5") + + original_paths = generate_raw_paths( + dates['first_day'], dates['last_day'], parsed_args.target_path, is_single_level, is_analysis_ready + ) + temp_paths = generate_raw_paths( + dates['first_day'], dates['last_day'], parsed_args.target_path, is_single_level, is_analysis_ready, parsed_args.temp_path + ) + + input_paths = [(original_path, temp_path) for original_path, temp_path in zip(original_paths, temp_paths)] + + with beam.Pipeline(argv=pipeline_args) as p: + _ = ( + p + | "Create" >> beam.Create(input_paths) + | "OpenLocal" >> beam.ParDo(OpenLocal( + target_path=parsed_args.target_path, + init_date=parsed_args.init_date, + timestamps_per_file=parsed_args.timestamps_per_day, + is_analysis_ready=is_analysis_ready, + is_single_level=is_single_level + )) + | "WriteToZarr" >> beam.MapTuple(update_zarr) + ) + + update_zarr_metadata(parsed_args.target_path, dates['last_day'], mode=ExecTypes.ERA5.value) diff --git a/src/raw-to-zarr-to-bq.py b/src/raw-to-zarr-to-bq.py index b560c15..665dbb4 100644 --- a/src/raw-to-zarr-to-bq.py +++ b/src/raw-to-zarr-to-bq.py @@ -12,24 +12,25 @@ # See the License for the specific language governing permissions and # limitations under the License. import datetime -import json import logging import os -import re + +import typing as t from concurrent.futures import ThreadPoolExecutor from arco_era5 import ( check_data_availability, date_range, - ingest_data_in_zarr_dataflow_job, get_previous_month_dates, - get_secret, parse_arguments_raw_to_zarr_to_bq, remove_licenses_from_directory, - replace_non_alphanumeric_with_hyphen, - update_zarr_metadata, - subprocess_run, + run_sanity_job, update_config_file, + ExecTypes, + raw_data_download_dataflow_job, + data_splitting_dataflow_job, + perform_data_operations, + ARCO_ERA5_ZARR_FILES as ZARR_FILES ) # Logger Configuration @@ -38,175 +39,81 @@ DIRECTORY = "/arco-era5/raw" FIELD_NAME = "date" -PROJECT = os.environ.get("PROJECT") -REGION = os.environ.get("REGION") -BUCKET = os.environ.get("BUCKET") -MANIFEST_LOCATION = os.environ.get("MANIFEST_LOCATION") -PYTHON_PATH = os.environ.get("PYTHON_PATH") -WEATHER_TOOLS_SDK_CONTAINER_IMAGE = os.environ.get("WEATHER_TOOLS_SDK_CONTAINER_IMAGE") -ARCO_ERA5_SDK_CONTAINER_IMAGE = os.environ.get("ARCO_ERA5_SDK_CONTAINER_IMAGE") -API_KEY_PATTERN = re.compile(r"^API_KEY_\d+$") -API_KEY_LIST = [] - -SPLITTING_DATASETS = ['soil', 'pcp'] -ZARR_FILES_LIST = [ - 'gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3', - 'gs://gcp-public-data-arco-era5/co/model-level-moisture.zarr-v2', - 'gs://gcp-public-data-arco-era5/co/model-level-wind.zarr-v2', - 'gs://gcp-public-data-arco-era5/co/single-level-forecast.zarr-v2', - 'gs://gcp-public-data-arco-era5/co/single-level-reanalysis.zarr-v2', - 'gs://gcp-public-data-arco-era5/co/single-level-surface.zarr-v2' -] -BQ_TABLES_LIST = json.loads(os.environ.get("BQ_TABLES_LIST")) -REGION_LIST = json.loads(os.environ.get("REGION_LIST")) - -dates_data = get_previous_month_dates() - - -def raw_data_download_dataflow_job(): - """Launches a Dataflow job to process weather data.""" - current_day = datetime.date.today() - job_name = f"raw-data-download-arco-era5-{current_day.month}-{current_day.year}" - - command = ( - f"{PYTHON_PATH} /weather/weather_dl/weather-dl /arco-era5/raw/*.cfg " - f"--runner DataflowRunner --project {PROJECT} --region {REGION} --temp_location " - f'"gs://{BUCKET}/tmp/" --disk_size_gb 260 --job_name {job_name} ' - f"--sdk_container_image {WEATHER_TOOLS_SDK_CONTAINER_IMAGE} --experiment use_runner_v2 " - f"--manifest-location {MANIFEST_LOCATION} " - ) - subprocess_run(command) - - -def data_splitting_dataflow_job(date: str): - """Launches a Dataflow job to splitting soil & pcp weather data.""" - year = date[:4] - month = year + date[5:7] - typeOfLevel = '{' + 'typeOfLevel' + '}' - shortName = '{' + 'shortName' + '}' - zero = '{' + '0' + '}' - first = '{' + '1' + '}' - commands = [] - for DATASET in SPLITTING_DATASETS: - command = ( - f'{PYTHON_PATH} /weather/weather_sp/weather-sp --input-pattern ' - f' "gs://gcp-public-data-arco-era5/raw/ERA5GRIB/HRES/Month/{year}/{month}_hres_{DATASET}.grb2" ' - f'--output-template "gs://gcp-public-data-arco-era5/raw/ERA5GRIB/HRES/Month/{first}/{zero}.grb2_{typeOfLevel}_{shortName}.grib" ' - f'--runner DataflowRunner --project {PROJECT} --region {REGION} ' - f'--temp_location gs://{BUCKET}/tmp --disk_size_gb 3600 ' - f'--job_name split-{DATASET}-data-{month} ' - f'--sdk_container_image {WEATHER_TOOLS_SDK_CONTAINER_IMAGE} ' - ) - commands.append(command) - - with ThreadPoolExecutor(max_workers=4) as tp: - for command in commands: - tp.submit(subprocess_run, command) - - -def ingest_data_in_bigquery_dataflow_job(zarr_file: str, table_name: str, region: str, - zarr_kwargs: str) -> None: - """Ingests data from a Zarr file into BigQuery and runs a Dataflow job. - - Args: - zarr_file (str): The Zarr file path. - table_name (str): The name of the BigQuery table. - zarr_kwargs (Any): Additional arguments for the Zarr ingestion. - - Returns: - None - """ - if '/ar/' in zarr_file: - job_name = zarr_file.split('/')[-1] - job_name = os.path.splitext(job_name)[0] - job_name = ( - f"data-ingestion-into-bq-{replace_non_alphanumeric_with_hyphen(job_name)}") - - command = ( - f"{PYTHON_PATH} /weather/weather_mv/weather-mv bq --uris {zarr_file} " - f"--output_table {table_name} --runner DataflowRunner --project {PROJECT} " - f"--region {region} --temp_location gs://{BUCKET}/tmp --job_name {job_name} " - f"--use-local-code --zarr --disk_size_gb 500 --machine_type n2-highmem-4 " - f"--number_of_worker_harness_threads 1 --zarr_kwargs {zarr_kwargs} " - ) +TEMP_PATH_FOR_RAW_DATA = os.environ.get("TEMP_PATH_FOR_RAW_DATA") +ROOT_PATH = os.environ.get("ROOT_PATH") - subprocess_run(command) +def get_zarr_files_to_process(mode: str) -> t.List[str]: + """Get list of zarr files to process.""" + if mode == ExecTypes.ERA5T_DAILY.value: + return [ZARR_FILES["ml_wind"], ZARR_FILES['ml_moisture'], ZARR_FILES['ar']] + elif mode == ExecTypes.ERA5T_MONTHLY.value: + return [ZARR_FILES["sl_surface"], ZARR_FILES['sl_forecast'], ZARR_FILES['sl_reanalysis']] + else: + return ZARR_FILES.values() -def perform_data_operations(z_file: str, table: str, region: str, start_date: str, - end_date: str, init_date: str): - # Function to process a single pair of z_file and table - try: - logger.info(f"Data ingesting for {z_file} is started.") - ingest_data_in_zarr_dataflow_job(z_file, region, start_date, end_date, init_date, - PROJECT, BUCKET, ARCO_ERA5_SDK_CONTAINER_IMAGE, PYTHON_PATH) - logger.info(f"Data ingesting for {z_file} is completed.") - logger.info(f"update metadata for zarr file: {z_file} started.") - update_zarr_metadata(z_file, end_date) - logger.info(f"update metadata for zarr file: {z_file} completed.") - start = f' "start_date": "{start_date}" ' - end = f'"end_date": "{end_date}" ' - zarr_kwargs = "'{" + f'{start},{end}' + "}'" - # TODO([#414](https://github.com/google/weather-tools/issues/414)): Faster ingestion into BQ by converting - # the chunk into pd.Dataframe - # logger.info(f"Data ingesting into BQ table: {table} started.") - # ingest_data_in_bigquery_dataflow_job(z_file, table, region, zarr_kwargs) - # logger.info(f"Data ingesting into BQ table: {table} completed.") - except Exception as e: - logger.error( - f"An error occurred in process_zarr_and_table for {z_file}: {str(e)}") + +def retry_downloads(root_path: str, mode: str): + """Start download and splitting jobs.""" + data_is_missing = True + while data_is_missing: + logger.info("Raw data downloading started.") + raw_data_download_dataflow_job(mode, dates_data['first_day']) + logger.info("Raw data downloaded successfully.") + if mode != ExecTypes.ERA5T_DAILY.value: + logger.info("Raw data Splitting started.") + data_splitting_dataflow_job( + dates_data['first_day'].strftime("%Y/%m"), root_path) + logger.info("Raw data Splitting successfully.") + logger.info("Data availability check started.") + data_is_missing = check_data_availability(data_date_range, mode, root_path) + logger.info("Data availability check completed successfully.") + + +def start_data_ingestion(zarr_files_to_process: t.List[str], first_day: datetime.date, last_day: datetime.date, init_date: str, mode: str): + """Start data ingestion for zarr datasets.""" + first_day = first_day.strftime("%Y-%m-%d") + last_day = last_day.strftime("%Y-%m-%d") + with ThreadPoolExecutor(max_workers=8) as tp: + for z_file in zarr_files_to_process: + if parsed_args.mode == ExecTypes.ERA5.value: + tp.submit(run_sanity_job, z_file, TEMP_PATH_FOR_RAW_DATA, parsed_args.init_date) + else: + tp.submit(perform_data_operations, z_file, first_day, last_day, init_date, mode) if __name__ == "__main__": try: parsed_args, unknown_args = parse_arguments_raw_to_zarr_to_bq("Parse arguments.") + dates_data = get_previous_month_dates(parsed_args.mode) + logger.info(f"Automatic update for ARCO-ERA5 started for {dates_data['sl_month']}.") data_date_range = date_range( - dates_data["first_day_third_prev"], dates_data["last_day_third_prev"] + dates_data["first_day"], dates_data["last_day"] ) - for env_var in os.environ: - if API_KEY_PATTERN.match(env_var): - api_key_value = os.environ.get(env_var) - API_KEY_LIST.append(api_key_value) - - additional_content = "" - for count, secret_key in enumerate(API_KEY_LIST): - secret_key_value = get_secret(secret_key) - additional_content += f'parameters.api{count}\n\ - api_url={secret_key_value["api_url"]}\napi_key={secret_key_value["api_key"]}\n\n' logger.info("Config file updation started.") - update_config_file(DIRECTORY, FIELD_NAME, additional_content) + temp_path = TEMP_PATH_FOR_RAW_DATA if parsed_args.mode == ExecTypes.ERA5.value else None + update_config_file(DIRECTORY, FIELD_NAME, parsed_args.mode, temp_path) logger.info("Config file updation completed.") - logger.info("Raw data downloading started.") - raw_data_download_dataflow_job() - logger.info("Raw data downloaded successfully.") - logger.info("Raw data Splitting started.") - data_splitting_dataflow_job( - dates_data['first_day_third_prev'].strftime("%Y/%m")) - logger.info("Raw data Splitting successfully.") + root_path = TEMP_PATH_FOR_RAW_DATA if parsed_args.mode == ExecTypes.ERA5.value else ROOT_PATH + + retry_downloads(root_path, parsed_args.mode) - logger.info("Data availability check started.") - data_is_missing = True - while data_is_missing: - data_is_missing = check_data_availability(data_date_range) - if data_is_missing: - logger.warning("Data is missing.") - raw_data_download_dataflow_job() - data_splitting_dataflow_job( - dates_data['first_day_third_prev'].strftime("%Y/%m")) - logger.info("Data availability check completed successfully.") - - remove_licenses_from_directory(DIRECTORY, len(API_KEY_LIST)) + remove_licenses_from_directory(DIRECTORY) logger.info("All licenses removed from the config file.") - with ThreadPoolExecutor(max_workers=8) as tp: - for z_file, table, region in zip(ZARR_FILES_LIST, BQ_TABLES_LIST, - REGION_LIST): - tp.submit(perform_data_operations, z_file, table, region, - dates_data["first_day_third_prev"], - dates_data["last_day_third_prev"], parsed_args.init_date) + zarr_files_to_process = get_zarr_files_to_process(parsed_args.mode) + + start_data_ingestion( + zarr_files_to_process, + dates_data["first_day"], + dates_data['last_day'], + parsed_args.init_date, + parsed_args.mode + ) + logger.info(f"Automatic update for ARCO-ERA5 completed for {dates_data['sl_month']}.") except Exception as e: diff --git a/src/update-ar-data.py b/src/update-ar-data.py index de6b370..735c326 100644 --- a/src/update-ar-data.py +++ b/src/update-ar-data.py @@ -32,6 +32,8 @@ def parse_arguments(desc: str) -> t.Tuple[argparse.Namespace, t.List[str]]: parser.add_argument("--output_path", type=str, required=True, help="Path to the destination Zarr archive.") + parser.add_argument('--root_path', type=str, default=GCP_DIRECTORY, + help='Root path to raw files.') parser.add_argument('-s', "--start_date", required=True, help='Start date, iso format string.') parser.add_argument('-e', "--end_date", required=True, @@ -51,7 +53,7 @@ def parse_arguments(desc: str) -> t.Tuple[argparse.Namespace, t.List[str]]: p | "CreateDayIterator" >> beam.Create(daily_date_iterator(known_args.start_date, known_args.end_date)) | "LoadDataForDay" >> beam.ParDo(LoadTemporalDataForDateDoFn( - data_path=GCP_DIRECTORY, start_date=known_args.init_date, + data_path=known_args.root_path, start_date=known_args.init_date, pressure_levels_group=known_args.pressure_levels_group)) | "UpdateSlice" >> ARUpdateSlice(target=known_args.output_path, init_date=known_args.init_date) ) diff --git a/src/update-co-data.py b/src/update-co-data.py index a1b5e27..e30b015 100644 --- a/src/update-co-data.py +++ b/src/update-co-data.py @@ -45,6 +45,8 @@ def parse_args(desc: str) -> Tuple[argparse.Namespace, List[str]]: parser.add_argument('--output_path', type=str, required=True, help='Path to output Zarr in Cloud bucket.') + parser.add_argument('--root_path', type=str, default=GCP_DIRECTORY, + help='Root path to raw files.') parser.add_argument('-s', '--start_date', required=True, help='Start date, iso format string.') parser.add_argument('-e', '--end_date', required=True, @@ -69,7 +71,7 @@ def parse_args(desc: str) -> Tuple[argparse.Namespace, List[str]]: if known_args.chunks == model_level_default_chunks: known_args.chunks = single_level_default_chunks - files = generate_input_paths(known_args.start_date, known_args.end_date, GCP_DIRECTORY, + files = generate_input_paths(known_args.start_date, known_args.end_date, known_args.root_path, known_args.chunks, is_single_level=is_single_level) with beam.Pipeline(argv=unknown_args) as p: From bc88a1a33adf83e9d9da6fcc6c93e78b9247c769 Mon Sep 17 00:00:00 2001 From: Darshan Date: Wed, 19 Feb 2025 12:05:02 +0000 Subject: [PATCH 2/5] Added constant file --- README.md | 10 ++++-- deployment/Dockerfile | 1 + deployment/constants.py | 67 ++++++++++++++++++++++++++++++++++++++++ deployment/create_job.py | 12 +++---- 4 files changed, 81 insertions(+), 9 deletions(-) create mode 100644 deployment/constants.py diff --git a/README.md b/README.md index 5d3d0d8..0d8e2b6 100644 --- a/README.md +++ b/README.md @@ -755,7 +755,7 @@ This feature is works in 4 parts. 3. Add the all `Copernicus` licenses into the [secret-manager](https://cloud.google.com/secret-manager) with value likes this: {"api_url": "URL", "api_key": "KEY"} > NOTE: for every API_KEY there must be unique secret-key. -4. Update all of these variable in [docker-file](deployment/Dockerfile). +4. Update all of these variable in [docker-file](deployment/constants.py). * `PROJECT` * `REGION` * `BUCKET` @@ -778,17 +778,21 @@ export REPO= eg:arco-era5-raw-to-zarr-to-bq gcloud builds submit . --tag "gcr.io/$PROJECT_ID/$REPO:latest" ``` -7. Run script to create cloud run jobs. [create_job](deployment/create_job.py) +6. Run script to create cloud run jobs. [create_job](deployment/create_job.py) ``` python deployment/create_job.py ``` -8. There will be 5 different cloud run jobs. +7. There will be 5 different cloud run jobs. - `arco-era5-zarr-ingestion` - For zarr data ingestion. - `arco-era5t-daily-executor` - Triggers daily to process era5t-daily data. - `arco-era5t-monthly-executor` - Triggers monthly to process era5t-monthly data. - `arco-era5-sanity` - Sanity job to validate the data era5 vs era5t and replace in case of difference. - `arco-era5-executor` - Triggers every month to run a sanity job for every zarr available. +8. Set up cloud schedulers to trigger above jobs on specified frequencies. + - `arco-era5t-daily-executor` - Schedule a daily trigger for `era5t-daily` data. + - `arco-era5t-monthly-executor` - Schedule a monthly trigger for `era5t-monthly` data. + - `arco-era5-executor` - Schedule a monthly trigger to do era5 vs era5t sanity for `current_month - 3` month. ### Making the dataset "High Resolution" & beyond... This phase of the project is under active development! If you would like to lend a hand in any way, please check out our diff --git a/deployment/Dockerfile b/deployment/Dockerfile index e79cbb9..2f15ddf 100644 --- a/deployment/Dockerfile +++ b/deployment/Dockerfile @@ -33,6 +33,7 @@ RUN echo "source activate ${CONDA_ENV_NAME}" >> ~/.bashrc ENV PATH /opt/conda/envs/${CONDA_ENV_NAME}/bin:$PATH RUN pip install -e . +# (TODO): Replace branch with main before merge. ARG arco_era5_git_rev=era5t-support RUN git clone https://github.com/google-research/arco-era5.git /arco-era5 WORKDIR /arco-era5 diff --git a/deployment/constants.py b/deployment/constants.py new file mode 100644 index 0000000..20fdf3a --- /dev/null +++ b/deployment/constants.py @@ -0,0 +1,67 @@ +PROJECT = "" +REGION = "" +BUCKET = "" +API_KEY_1 = "projects/PROJECT/secrets/SECRET_NAME/versions/1" +API_KEY_2 = "projects/PROJECT/secrets/SECRET_NAME/versions/1" +API_KEY_3 = "projects/PROJECT/secrets/SECRET_NAME/versions/1" +API_KEY_4 = "projects/PROJECT/secrets/SECRET_NAME/versions/1" +WEATHER_TOOLS_SDK_CONTAINER_IMAGE = "WEATHER_TOOLS_SDK_CONTAINER_IMAGE" +ARCO_ERA5_SDK_CONTAINER_IMAGE = "ARCO_ERA5_SDK_CONTAINER_IMAGE" +MANIFEST_LOCATION = "fs://manifest?projectId=PROJECT_ID" +TEMP_PATH_FOR_RAW_DATA="" +ROOT_PATH = "" + +TASK_COUNT = 1 +MAX_RETRIES = 1 + +CONTAINER_NAME = "arco-era5-container" +CLOUD_RUN_CONTAINER_IMAGE = "CLOUD_RUN_CONTAINER_IMAGE" +CONTAINER_COMMAND = ["python"] +CONTAINER_CPU_LIMIT = "8000m" + +INGESTION_JOB_ID = "arco-era5-zarr-ingestion" +SANITY_JOB_ID = "arco-era5-sanity" + +EXECUTOR_JOB_ID = "arco-era5-executor" +DAILY_EXECUTOR_JOB_ID = "arco-era5t-daily-executor" +MONTHLY_EXECUTOR_JOB_ID = "arco-era5t-monthly-executor" + +EXECUTOR_JOB_CONTAINER_ARGS = [ + "src/raw-to-zarr-to-bq.py" +] +DAILY_EXECUTOR_JOB_CONTAINER_ARGS = [ + "src/raw-to-zarr-to-bq.py", "--mode", "daily" +] +MONTHLY_EXECUTOR_JOB_CONTAINER_ARGS = [ + "src/raw-to-zarr-to-bq.py", "--mode", "monthly" +] + +JOB_CONTAINER_ENV_VARIABLES = [ + {"name": "PROJECT", "value": PROJECT}, + {"name": "REGION", "value": REGION}, + {"name": "BUCKET", "value": BUCKET}, + {"name": "WEATHER_TOOLS_SDK_CONTAINER_IMAGE", "value": WEATHER_TOOLS_SDK_CONTAINER_IMAGE}, + {"name": "MANIFEST_LOCATION", "value": MANIFEST_LOCATION}, + {"name": "PYTHON_PATH", "value": "python"}, + {"name": "TEMP_PATH_FOR_RAW_DATA", "value": TEMP_PATH_FOR_RAW_DATA}, + {"name": "INGESTION_JOB_ID", "value": INGESTION_JOB_ID}, + {"name": "SANITY_JOB_ID", "value": SANITY_JOB_ID}, + {"name": "ROOT_PATH", "value": ROOT_PATH}, + {"name": "ARCO_ERA5_SDK_CONTAINER_IMAGE", "value": ARCO_ERA5_SDK_CONTAINER_IMAGE} +] + +ERA5_API_KEYS = [ + {"name": "API_KEY_1", "value": API_KEY_3}, + {"name": "API_KEY_2", "value": API_KEY_4} +] + +ERA5T_API_KEYS = [ + {"name": "API_KEY_1", "value": API_KEY_1}, + {"name": "API_KEY_2", "value": API_KEY_2} +] + +CONTAINER_MEMORY_LIMIT = "34G" + +TIMEOUT_SECONDS = 604800 + +CONTAINER_MEMORY_LIMIT = "34G" diff --git a/deployment/create_job.py b/deployment/create_job.py index 89761ed..1fd100f 100644 --- a/deployment/create_job.py +++ b/deployment/create_job.py @@ -40,7 +40,7 @@ def create_job(job_id: str, template: run_v2.ExecutionTemplate): print(response) -def graphnn_job_creator( +def era5_job_creator( job_id: str, timeout_sec: int, container_args: list = None, @@ -78,31 +78,31 @@ def graphnn_job_creator( if __name__ == "__main__": - graphnn_job_creator( + era5_job_creator( job_id=constants.SANITY_JOB_ID, timeout_sec=constants.TIMEOUT_SECONDS, container_memory_limit=constants.CONTAINER_MEMORY_LIMIT, ) - graphnn_job_creator( + era5_job_creator( job_id=constants.INGESTION_JOB_ID, timeout_sec=constants.TIMEOUT_SECONDS, container_memory_limit=constants.CONTAINER_MEMORY_LIMIT, ) - graphnn_job_creator( + era5_job_creator( job_id=constants.DAILY_EXECUTOR_JOB_ID, timeout_sec=constants.TIMEOUT_SECONDS, container_args=constants.DAILY_EXECUTOR_JOB_CONTAINER_ARGS, contaner_env=constants.JOB_CONTAINER_ENV_VARIABLES + constants.ERA5T_API_KEYS, container_memory_limit=constants.CONTAINER_MEMORY_LIMIT, ) - graphnn_job_creator( + era5_job_creator( job_id=constants.MONTHLY_EXECUTOR_JOB_ID, timeout_sec=constants.TIMEOUT_SECONDS, container_args=constants.MONTHLY_EXECUTOR_JOB_CONTAINER_ARGS, contaner_env=constants.JOB_CONTAINER_ENV_VARIABLES + constants.ERA5T_API_KEYS, container_memory_limit=constants.CONTAINER_MEMORY_LIMIT, ) - graphnn_job_creator( + era5_job_creator( job_id=constants.EXECUTOR_JOB_ID, timeout_sec=constants.TIMEOUT_SECONDS, container_args=constants.EXECUTOR_JOB_CONTAINER_ARGS, From 1e4fe62a0af1eaca2d924a48e91903b03cfc0f79 Mon Sep 17 00:00:00 2001 From: Darshan Date: Thu, 20 Feb 2025 11:06:03 +0000 Subject: [PATCH 3/5] Added timeout comment --- deployment/constants.py | 1 + 1 file changed, 1 insertion(+) diff --git a/deployment/constants.py b/deployment/constants.py index 20fdf3a..6cd2467 100644 --- a/deployment/constants.py +++ b/deployment/constants.py @@ -62,6 +62,7 @@ CONTAINER_MEMORY_LIMIT = "34G" +# Timeout for a week TIMEOUT_SECONDS = 604800 CONTAINER_MEMORY_LIMIT = "34G" From 03de5a1df149a1ef6484efedd127ead4946be10c Mon Sep 17 00:00:00 2001 From: Darshan Date: Mon, 24 Feb 2025 06:37:31 +0000 Subject: [PATCH 4/5] Update job timeout to 24 hours --- deployment/constants.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deployment/constants.py b/deployment/constants.py index 6cd2467..8cfd55b 100644 --- a/deployment/constants.py +++ b/deployment/constants.py @@ -62,7 +62,7 @@ CONTAINER_MEMORY_LIMIT = "34G" -# Timeout for a week -TIMEOUT_SECONDS = 604800 +# Timeout for a day +TIMEOUT_SECONDS = 86400 CONTAINER_MEMORY_LIMIT = "34G" From 142029516f810ac52795caf80ce55a2ae7a0339b Mon Sep 17 00:00:00 2001 From: Darshan Date: Fri, 28 Feb 2025 06:45:17 +0000 Subject: [PATCH 5/5] Added function to remove temp file --- src/arco_era5/sanity.py | 13 +++++++++++-- src/arco_era5/source_data.py | 3 ++- src/arco_era5/utils.py | 14 +++++++++++++- 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/arco_era5/sanity.py b/src/arco_era5/sanity.py index 476c502..d06943d 100644 --- a/src/arco_era5/sanity.py +++ b/src/arco_era5/sanity.py @@ -26,7 +26,7 @@ from .ingest_data_in_zarr import CO_FILES_MAPPING, replace_non_alphanumeric_with_hyphen from .update_co import generate_offsets_from_url, generate_input_paths from .source_data import HOURS_PER_DAY, offset_along_time_axis, GCP_DIRECTORY -from .utils import copy, date_range, opener, run_cloud_job +from .utils import copy, date_range, opener, remove_file, run_cloud_job logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -45,6 +45,7 @@ def generate_raw_paths(start_date: str, end_date: str, target_path: str, is_single_level: bool, is_analysis_ready: bool, root_path: str = GCP_DIRECTORY): + """Generate raw input paths.""" if is_analysis_ready: data_date_range = date_range(start_date, end_date) paths = generate_input_paths_ar(data_date_range, root_path) @@ -54,6 +55,7 @@ def generate_raw_paths(start_date: str, end_date: str, target_path: str, is_sing return paths def parse_ar_url(url: str, init_date: str, da: np.ndarray): + """Parse raw file url for analysis ready data.""" year, month, day, variable, file_name = url.rsplit("/", 5)[1:] time_offset_start = offset_along_time_axis(init_date, int(year), int(month), int(day)) time_offset_end = time_offset_start + HOURS_PER_DAY @@ -65,11 +67,13 @@ def parse_ar_url(url: str, init_date: str, da: np.ndarray): return (slice(time_offset_start, time_offset_end), slice(level, level + 1)), variable, da def open_dataset(path: str): + """Open xarray dataset.""" ds = xr.open_dataset(path, engine="scipy" if ".nc" in path else "cfgrib").load() return ds @dataclass class OpenLocal(beam.DoFn): + """class to open raw files and compare the data.""" target_path: str init_date: str @@ -91,6 +95,7 @@ def process(self, paths: t.Tuple[str, str]): if ds1[vname].equals(ds2[vname]): beam.metrics.Metrics.counter('Success', 'Equal').inc() logger.info(f"For {path1} variable {vname} is equal.") + remove_file(path2) else: beam.metrics.Metrics.counter('Success', 'Different').inc() logger.info(f"For {path1} variable {vname} is not equal.") @@ -103,11 +108,14 @@ def process(self, paths: t.Tuple[str, str]): yield self.target_path, vname, region, ds2[vname].values, path1, path2 def update_zarr(target_path: str, vname: str, region: t.Union[t.Tuple[slice], t.Tuple[slice, slice]], da: np.ndarray, path1: str, path2: str): + """Function to update zarr data if difference found.""" zf = zarr.open(target_path) zv = zf[vname] - logger.info(f"Zarr Data Check for {vname} is {np.array_equal(zv[:], da, equal_nan=True)}") zv[region] = da + logger.info(f"Replacing {path1} with {path2}") copy(path2, path1) + logger.info(f"Removing temporary file {path2}.") + remove_file(path2) def generate_override_args( file_path: str, @@ -119,6 +127,7 @@ def generate_override_args( region: str, job_name: str ) -> list: + """Generate override args for cloud run job.""" args = [ file_path, "--target_path", target_path, diff --git a/src/arco_era5/source_data.py b/src/arco_era5/source_data.py index 15780c5..2171463 100644 --- a/src/arco_era5/source_data.py +++ b/src/arco_era5/source_data.py @@ -395,7 +395,8 @@ def _read_nc_dataset(gpath_file): # Should having only leading nans. b = dataarray.sel(expver=5).isnull().any(dim=all_dims_except_time_and_expver) disjoint_nans = bool((a ^ b).all().variable.values) - assert disjoint_nans, "The nans are not disjoint in expver=1 vs 5" + if not disjoint_nans: + logging.warning("The nans are not disjoint in expver=1 vs 5") dataarray = dataarray.sel(expver=1).combine_first(dataarray.sel(expver=5)) return dataarray diff --git a/src/arco_era5/utils.py b/src/arco_era5/utils.py index 161e3ee..b073192 100644 --- a/src/arco_era5/utils.py +++ b/src/arco_era5/utils.py @@ -128,7 +128,7 @@ def parse_arguments_raw_to_zarr_to_bq(desc: str) -> t.Tuple[argparse.Namespace, def copy(src: str, dst: str) -> None: - """A method for generating the offset along with time dimension. + """A method to copy remote file to local path. Args: src (str): The cloud storage path to the grib file. @@ -159,6 +159,18 @@ def opener(fname: str) -> t.Any: yield tmp_name +def remove_file(url: str): + """Remove file from remote location.""" + cmd = 'gsutil rm -rf' + try: + subprocess.run(cmd.split() + [url], check=True, capture_output=True, + text=True, input="n/n") + return + except subprocess.CalledProcessError as e: + msg = f"Failed to remove file {url!r} Error {e}" + logger.error(msg) + + def run_cloud_job(project: str, region: str, job: str, override_args: t.List[str]): client = run_v2.JobsClient() job_executor_path = client.job_path(