diff --git a/kfp/pipeline_generator/README.md b/kfp/pipeline_generator/README.md index 6408fd5e3..71c28d7d9 100644 --- a/kfp/pipeline_generator/README.md +++ b/kfp/pipeline_generator/README.md @@ -1,4 +1,4 @@ ## Steps to generate a new pipeline - create a `pipeline_definitions.yaml` file for the required task (similar to the example [pipeline_definitions.yaml for the noop task](./example/pipeline_definitions.yaml). -- execute `./run.sh `. When `pipeline_definitions_file_path` is the path of the `pipeline_definitions.yaml` file that defines the pipeline and `destination directory` is a directory where new pipeline file +- execute `./run.sh --config_file --output_dir_file `. When `pipeline_definitions_file_path` is the path of the `pipeline_definitions.yaml` file that defines the pipeline and `destination directory` is a directory where new pipeline file will be generated. diff --git a/kfp/pipeline_generator/example/pipeline_definitions.yaml b/kfp/pipeline_generator/example/pipeline_definitions.yaml index 1b9ab2929..c0e3dc51a 100644 --- a/kfp/pipeline_generator/example/pipeline_definitions.yaml +++ b/kfp/pipeline_generator/example/pipeline_definitions.yaml @@ -3,14 +3,14 @@ pipeline_parameters: description: "Pipeline for noop task" script_name: "noop_transform.py" prefix: "" - multi_s3: False + multi_s3: True compute_func_name: "" compute_func_import: "" component_spec_path: "" pipeline_common_input_parameters_values: - kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.1.1" - transform_image: "quay.io/dataprep1/data-prep-kit/noop:0.8.0" + kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0.dev6" + transform_image: "quay.io/dataprep1/data-prep-kit/noop-ray:0.9.0.dev6" s3_access_secret: "s3-secret" image_pull_secret: "prod-all-icr-io" input_folder: "test/noop/input/" diff --git a/kfp/pipeline_generator/pipeline.ptmpl b/kfp/pipeline_generator/pipeline.ptmpl deleted file mode 100644 index 3b1ecaaac..000000000 --- a/kfp/pipeline_generator/pipeline.ptmpl +++ /dev/null @@ -1,125 +0,0 @@ -# NOTE: This file is auto generated by Pipeline Generator. - -import kfp.compiler as compiler -import kfp.components as comp -import kfp.dsl as dsl -from kfp_support.workflow_support.utils import ( - ONE_HOUR_SEC, - ONE_WEEK_SEC, - ComponentUtils, -) -__compute_import__ - -task_image = "__transform_image__" - -# the name of the job script -EXEC_SCRIPT_NAME: str = "__script_name__" -PREFIX: str = "__prefix_name__" - -# components -base_kfp_image = "__kfp_base_image__" - -# path to kfp component specifications files -component_spec_path = "__component_spec_path__" - -# compute execution parameters. Here different transforms might need different implementations. As -# a result, instead of creating a component we are creating it in place here. -compute_exec_params_op = comp.func_to_container_op( - func=__compute_func_name__, base_image=base_kfp_image -) -# create Ray cluster -create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") -# execute job -execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "__execute_comp__") -# clean up Ray -cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml") - -# Task name is part of the pipeline name, the ray cluster name and the job name in DMF. -TASK_NAME: str = "__pipeline_name__" - - -# Pipeline to invoke execution on remote resource -@dsl.pipeline( - name=TASK_NAME + "-ray-pipeline", - description="__pipeline_description__", -) -def __pipeline_name__( - ray_name: str = "__pipeline_name__-kfp-ray", # name of Ray cluster - ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": "__image_pull_secret__", ' - '"image": "' + task_image + '", "image_pull_policy": "Always" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image_pull_secret": "__image_pull_secret__", "image": "' + task_image + '", "image_pull_policy": "Always" }', - server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", - # data access - data_s3_config: str = "{'input_folder': '__input_folder__', 'output_folder': '__output_folder__'}", - data_s3_access_secret: str = "__s3_access_secret__", - data_max_files: int = -1, - data_num_samples: int = -1, - data_checkpointing: bool = False, - data_data_sets: str = "", - data_files_to_use: str = "['.parquet']", - # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", - runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", - - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', - __pipeline_arguments__ -): - # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) - # pipeline definition - with dsl.ExitHandler(clean_up_task): - # compute execution params - compute_exec_params = compute_exec_params_op( - worker_options=ray_worker_options, - actor_options=runtime_actor_options, - ) - ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2) - # start Ray cluster - ray_cluster = create_ray_op( - ray_name=ray_name, - run_id=dsl.RUN_ID_PLACEHOLDER, - ray_head_options=ray_head_options, - ray_worker_options=ray_worker_options, - server_url=server_url, - additional_params=additional_params, - ) - ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2) - ray_cluster.after(compute_exec_params) - - # Execute job - execute_job = execute_ray_jobs_op( - ray_name=ray_name, - run_id=dsl.RUN_ID_PLACEHOLDER, - additional_params=additional_params, - exec_params={ - "data_s3_config": data_s3_config, - "data_max_files": data_max_files, - "data_num_samples": data_num_samples, - "data_checkpointing": data_checkpointing, - "data_data_sets": data_data_sets, - "data_files_to_use": data_files_to_use, - "runtime_num_workers": compute_exec_params.output, - "runtime_worker_options": runtime_actor_options, - "runtime_pipeline_id": runtime_pipeline_id, - "runtime_job_id": dsl.RUN_ID_PLACEHOLDER, - "runtime_code_location": runtime_code_location, - __execute_job_params__ - }, - exec_script_name=EXEC_SCRIPT_NAME, - server_url=server_url, - __prefix_execute__ - ) - ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC) - ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret) - __prefix_set_secret__ - execute_job.after(ray_cluster) - - # Configure the pipeline level to one week (in seconds) - dsl.get_pipeline_conf().set_timeout(ONE_WEEK_SEC) - -if __name__ == "__main__": - # Compiling the pipeline - compiler.Compiler().compile(__pipeline_name__, __file__.replace(".py", ".yaml")) diff --git a/kfp/pipeline_generator/pipeline_generator.py b/kfp/pipeline_generator/pipeline_generator.py index 80aad9199..c5fcf080b 100644 --- a/kfp/pipeline_generator/pipeline_generator.py +++ b/kfp/pipeline_generator/pipeline_generator.py @@ -1,10 +1,5 @@ -import json - -import yaml - - PRE_COMMIT = "./pre-commit-config.yaml" -PIPELINE_TEMPLATE_FILE = "pipeline.ptmpl" +PIPELINE_TEMPLATE_FILE = "simple_pipeline.py" INPUT_PARAMETERS = "input_parameters" PIPELINE_PARAMETERS = "pipeline_parameters" @@ -16,128 +11,53 @@ VALUE = "value" DESCRIPTION = "description" - -def get_pipeline_input_parameters(arguments) -> str: - ret_str = "" - ret_str += get_generic_params(arguments.get("pipeline_arguments", None)) - return ret_str - - -def get_generic_params(params) -> str: - ret_str = "" - if params is None: - return ret_str - for param in params: - ret_str += f"\n {param[NAME]}: {param[TYPE]} = " - if param[TYPE] == "str": - ret_str += f'"{param[VALUE]}"' - else: - ret_str += f"{param[VALUE]}" - ret_str += f", {param.get(DESCRIPTION, '')}" - return ret_str - - -def get_execute_job_params_guf(args) -> (str): - ret_execute_job_params = "" - if args is not None: - pargs = args.get("pipeline_arguments", None) - if pargs is not None: - for a in pargs: - ret_execute_job_params += f'"{a[NAME]}": {a[NAME]},\n' - return ret_execute_job_params - - if __name__ == "__main__": + import yaml import argparse - import os - from pathlib import Path + from jinja2 import Environment, FileSystemLoader - from pre_commit.main import main + environment = Environment(loader=FileSystemLoader("templates/")) + template = environment.get_template(PIPELINE_TEMPLATE_FILE) parser = argparse.ArgumentParser(description="Kubeflow pipeline generator for Foundation Models") parser.add_argument("-c", "--config_file", type=str, default="") parser.add_argument("-od", "--output_dir_file", type=str, default="") - args = parser.parse_args() - # open configuration file + with open(args.config_file, "r") as file: pipeline_definitions = yaml.safe_load(file) pipeline_parameters = pipeline_definitions[PIPELINE_PARAMETERS] common_input_params_values = pipeline_definitions[PIPELINE_COMMON_INPUT_PARAMETERS_VALUES] - - # Pipeline template file - fin = open(PIPELINE_TEMPLATE_FILE, "rt") - - # Output file to write the pipeline - fout = open(f"{pipeline_parameters[NAME]}_wf.py", "wt") - - # define the generated pipeline input parameters - transform_input_parameters = get_pipeline_input_parameters( - pipeline_definitions[PIPELINE_TRANSFORM_INPUT_PARAMETERS] - ) - - # define arguments to the Ray execution job - execute_job_params = get_execute_job_params_guf(pipeline_definitions[PIPELINE_TRANSFORM_INPUT_PARAMETERS]) + pipeline_transform_input_parameters = pipeline_definitions[PIPELINE_TRANSFORM_INPUT_PARAMETERS] component_spec_path = pipeline_parameters.get("component_spec_path", "") if component_spec_path == "": - component_spec_path = "../../../../../kfp/kfp_ray_components/" - - compute_func_name = pipeline_parameters.get("compute_func_name", "") - if compute_func_name == "": - compute_func_name = "ComponentUtils.default_compute_execution_params" - - compute_func_import = pipeline_parameters.get("compute_func_import", "") - - execute_comp_file = "executeRayJobComponent.yaml" - prefix_name = "" - prefix_execute = "" - prefix_set_secret = "" - if pipeline_parameters.get("multi_s3", False) == True: - execute_comp_file = "executeRayJobComponent_multi_s3.yaml" - prefix_name = pipeline_parameters.get("prefix", "") - prefix_execute = "prefix=PREFIX" - prefix_set_secret = ( - f"ComponentUtils.set_s3_env_vars_to_component(execute_job, {prefix_name}_s3_access_secret, prefix=PREFIX)" - ) - - # For each line in the template file - for line in fin: - # read replace the string and write to output pipeline file - fout.write( - line.replace("__pipeline_name__", pipeline_parameters[NAME]) - .replace("__pipeline_description__", pipeline_parameters["description"]) - .replace("__pipeline_arguments__", transform_input_parameters) - .replace("__execute_job_params__", execute_job_params) - .replace("__compute_func_name__", compute_func_name) - .replace("__component_spec_path__", component_spec_path) - .replace("__compute_import__", compute_func_import) - .replace("__script_name__", pipeline_parameters["script_name"]) - .replace("__image_pull_secret__", common_input_params_values["image_pull_secret"]) - .replace("__s3_access_secret__", common_input_params_values["s3_access_secret"]) - .replace("__input_folder__", common_input_params_values.get("input_folder", "")) - .replace("__output_folder__", common_input_params_values.get("output_folder", "")) - .replace("__transform_image__", common_input_params_values["transform_image"]) - .replace("__kfp_base_image__", common_input_params_values["kfp_base_image"]) - .replace("__execute_comp__", execute_comp_file) - .replace("__prefix_name__", prefix_name) - .replace("__prefix_execute__", prefix_execute) - .replace("__prefix_set_secret__", prefix_set_secret) - ) - # Move the generated file to the output directory - curr_dir = os.getcwd() - src_file = Path(f"{curr_dir}/{pipeline_parameters[NAME]}_wf.py") - dst_file = Path(f"{args.output_dir_file}/{pipeline_parameters[NAME]}_wf.py") - src_file.rename(dst_file) + component_spec_path = "../../../../kfp/kfp_ray_components/" + + content = template.render( + transform_image=common_input_params_values["transform_image"], + script_name=pipeline_parameters["script_name"], + kfp_base_image=common_input_params_values["kfp_base_image"], + component_spec_path=component_spec_path, + pipeline_arguments=pipeline_transform_input_parameters["pipeline_arguments"], + pipeline_name=pipeline_parameters[NAME], + pipeline_description=pipeline_parameters["description"], + input_folder=common_input_params_values.get("input_folder", ""), + output_folder=common_input_params_values.get("output_folder", ""), + s3_access_secret=common_input_params_values["s3_access_secret"], + image_pull_secret=common_input_params_values["image_pull_secret"], + multi_s3=pipeline_parameters["multi_s3"] + ) - fout.close() + output_file = f"{args.output_dir_file}/{pipeline_parameters[NAME]}_wf.py" + with open(output_file, mode="w", encoding="utf-8") as message: + message.write(content) + print(f"... wrote {output_file}") import sys - from pre_commit.main import main - print(f"Pipeline ${dst_file} auto generation completed") - # format the pipeline python file - args = ["run", "--file", f"{dst_file}", "-c", PRE_COMMIT] + print(f"Pipeline ${output_file} auto generation completed") + args = ["run", "--file", f"{output_file}", "-c", PRE_COMMIT] sys.exit(main(args)) diff --git a/kfp/pipeline_generator/run.sh b/kfp/pipeline_generator/run.sh index cba8ab41c..7364bb441 100755 --- a/kfp/pipeline_generator/run.sh +++ b/kfp/pipeline_generator/run.sh @@ -1,11 +1,64 @@ #!/bin/bash -DEF_FILE=$1 -DIST_DIR=$2 +POSITIONAL_ARGS=() + +while [[ $# -gt 0 ]]; do + case $1 in + -c|--config_file) + DEF_FILE="$2" + if [[ "$2" = -* ]] + then + echo "ERROR: config_file value not provided." + exit 1 + fi + shift # past argument + shift # past value + ;; + -od|--output_dir_file) + DIST_DIR="$2" + if [[ "$2" = -* ]] + then + echo "ERROR: output_dir_file value not provided." + exit 1 + fi + shift # past argument + shift # past value + ;; + -h|--help) + echo "-c/--config_file(required): file path to config_file(pipeline_definition.yaml)." + echo "-od/--output_dir_file(required): output folder path to store generated pipeline." + exit 1 + ;; + -*|--*) + echo "Unknown option $1" + exit 1 + ;; + *) + POSITIONAL_ARGS+=("$1") # save positional arg + shift # past argument + ;; + esac +done + + +if [ -z ${DEF_FILE+x} ] +then +echo "ERROR: config_file is not defined." +exit 1 +fi + +if [ -z ${DIST_DIR+x} ] +then +echo "ERROR: output_dir_file is not defined." +exit 1 +fi + + ROOT_DIR=${PWD} mkdir -p ${ROOT_DIR}/${DIST_DIR}/ python3 -m venv venv source venv/bin/activate pip install pre-commit -python3 pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/ +pip install jinja2 +python3 pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/ \ No newline at end of file diff --git a/kfp/pipeline_generator/templates/simple_pipeline.py b/kfp/pipeline_generator/templates/simple_pipeline.py new file mode 100644 index 000000000..1b53e6115 --- /dev/null +++ b/kfp/pipeline_generator/templates/simple_pipeline.py @@ -0,0 +1,213 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +import kfp.compiler as compiler +import kfp.components as comp +import kfp.dsl as dsl +from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils + + +task_image = "{{ transform_image }}" + +# the name of the job script +EXEC_SCRIPT_NAME: str = "{{ script_name }}" + +# components +base_kfp_image = "{{ kfp_base_image }}" + +# path to kfp component specifications files +component_spec_path = "{{ component_spec_path }}" + +# compute execution parameters. Here different transforms might need different implementations. As +# a result, instead of creating a component we are creating it in place here. +def compute_exec_params_func( + worker_options: str, + actor_options: str, + data_s3_config: str, + data_max_files: int, + data_num_samples: int, + runtime_pipeline_id: str, + runtime_job_id: str, + runtime_code_location: str, + {%- for pipeline_argument in pipeline_arguments %} + {{ pipeline_argument.name }}: {{ pipeline_argument.type }}, + {%- endfor %} +) -> dict: + from runtime_utils import KFPUtils + + return { + "data_s3_config": data_s3_config, + "data_max_files": data_max_files, + "data_num_samples": data_num_samples, + "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), + "runtime_worker_options": actor_options, + "runtime_pipeline_id": runtime_pipeline_id, + "runtime_job_id": runtime_job_id, + "runtime_code_location": runtime_code_location, + {%- for pipeline_argument in pipeline_arguments %} + "{{ pipeline_argument.name }}": {{ pipeline_argument.name }}, + {%- endfor %} + } + + +# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the +# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path. +# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use +# this if/else statement and explicitly call the decorator. +if os.getenv("KFPv2", "0") == "1": + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at + # compilation time. + import uuid + + compute_exec_params_op = dsl.component_decorator.component( + func=compute_exec_params_func, base_image=base_kfp_image + ) + print( + "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + + "same version of the same pipeline !!!" + ) + run_id = uuid.uuid4().hex +else: + compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) + run_id = dsl.RUN_ID_PLACEHOLDER + +# create Ray cluster +create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") +# execute job +execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "executeRayJobComponent.yaml") +# clean up Ray +cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml") + +# Task name is part of the pipeline name, the ray cluster name and the job name in DMF. +TASK_NAME: str = "{{ pipeline_name }}" + + +@dsl.pipeline( + name=TASK_NAME + "-ray-pipeline", + description="{{ pipeline_description }}", +) +def {{ pipeline_name }}( + # Ray cluster + ray_name: str = "{{ pipeline_name }}-kfp-ray", + ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": "", "image": "' + task_image + '" }', + ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' + '"image_pull_secret": "{{ image_pull_secret }}", "image": "' + task_image + '"}', + server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", + # data access + {% if multi_s3 == False %} + data_s3_config: str = "{'input_folder': '{{ input_folder }}', 'output_folder': '{{ output_folder }}'}", + {% else %} + data_s3_config: str = ["{'input_folder': '{{ input_folder }}', 'output_folder': '{{ output_folder }}'}"], + {% endif %} + data_s3_access_secret: str = "{{ s3_access_secret }}", + data_max_files: int = -1, + data_num_samples: int = -1, + # orchestrator + runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_pipeline_id: str = "pipeline_id", + runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + # {{ pipeline_name }} parameters + {%- for pipeline_argument in pipeline_arguments %} + {{ pipeline_argument.name }}: {{ pipeline_argument.type }}{% if pipeline_argument.value is defined %}{% if pipeline_argument.type == "int" %} = {{ pipeline_argument.value }}{% else %} = "{{ pipeline_argument.value }}"{% endif %}{% endif %}, + {%- endfor %} + # additional parameters + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', +): + """ + Pipeline to execute {{ pipeline_name }} transform + :param ray_name: name of the Ray cluster + :param ray_head_options: head node options, containing the following: + cpu - number of cpus + memory - memory + image - image to use + image_pull_secret - image pull secret + :param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following: + replicas - number of replicas to create + max_replicas - max number of replicas + min_replicas - min number of replicas + cpu - number of cpus + memory - memory + image - image to use + image_pull_secret - image pull secret + :param server_url - server url + :param additional_params: additional (support) parameters, containing the following: + wait_interval - wait interval for API server, sec + wait_cluster_ready_tmout - time to wait for cluster ready, sec + wait_cluster_up_tmout - time to wait for cluster up, sec + wait_job_ready_tmout - time to wait for job ready, sec + wait_print_tmout - time between prints, sec + http_retries - http retries for API server calls + :param data_s3_access_secret - s3 access secret + :param data_s3_config - s3 configuration + :param data_max_files - max files to process + :param data_num_samples - num samples to process + :param runtime_actor_options - actor options + :param runtime_pipeline_id - pipeline id + :param runtime_code_location - code location + {%- for pipeline_argument in pipeline_arguments %} + :param {{ pipeline_argument.name }} - {{ pipeline_argument.description }} + {%- endfor %} + :return: None + """ + # create clean_up task + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) + ComponentUtils.add_settings_to_component(clean_up_task, 60) + # pipeline definition + with dsl.ExitHandler(clean_up_task): + # compute execution params + compute_exec_params = compute_exec_params_op( + worker_options=ray_worker_options, + actor_options=runtime_actor_options, + data_s3_config=data_s3_config, + data_max_files=data_max_files, + data_num_samples=data_num_samples, + runtime_pipeline_id=runtime_pipeline_id, + runtime_job_id=run_id, + runtime_code_location=runtime_code_location, + {%- for pipeline_argument in pipeline_arguments %} + {{ pipeline_argument.name }}={{ pipeline_argument.name }}, + {%- endfor %} + ) + + ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2) + # start Ray cluster + ray_cluster = create_ray_op( + ray_name=ray_name, + run_id=run_id, + ray_head_options=ray_head_options, + ray_worker_options=ray_worker_options, + server_url=server_url, + additional_params=additional_params, + ) + ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2) + ray_cluster.after(compute_exec_params) + + # Execute job + execute_job = execute_ray_jobs_op( + ray_name=ray_name, + run_id=run_id, + additional_params=additional_params, + exec_params=compute_exec_params.output, + exec_script_name=EXEC_SCRIPT_NAME, + server_url=server_url, + ) + ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC) + ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret) + execute_job.after(ray_cluster) + +if __name__ == "__main__": + # Compiling the pipeline + compiler.Compiler().compile({{ pipeline_name }}, __file__.replace(".py", ".yaml")) \ No newline at end of file