-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KFP] Obtain the Ray cluster run ID from the user for KFP v2. #956
Changes from 8 commits
79e7e08
070a9d8
31db4cc
5a8c36f
76620d4
84d48a0
3cec0ab
b420d1d
dc89ce6
c6af288
d7fa55d
1fdb7fa
5350481
6253bac
40c2889
c5117e5
5bba22c
a306cbf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,11 +57,16 @@ Ray cluster. For each step we have to define a component that will execute them: | |
```python | ||
# components | ||
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.0.2" | ||
# compute execution parameters. Here different transforms might need different implementations. As | ||
# a result, instead of creating a component we are creating it in place here. | ||
compute_exec_params_op = comp.func_to_container_op( | ||
func=ComponentUtils.default_compute_execution_params, base_image=base_kfp_image | ||
) | ||
# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the | ||
# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path. | ||
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use | ||
# this if/else statement and explicitly call the decorator. | ||
if os.getenv("KFPv2", "0") == "1": | ||
compute_exec_params_op = dsl.component_decorator.component( | ||
func=compute_exec_params_func, base_image=base_kfp_image | ||
) | ||
else: | ||
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) | ||
# create Ray cluster | ||
create_ray_op = comp.load_component_from_file("../../../kfp_ray_components/createRayComponent.yaml") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should it be |
||
# execute job | ||
|
@@ -107,6 +112,7 @@ The input parameters section defines all the parameters required for the pipelin | |
The parameters used here are as follows: | ||
|
||
* ray_name: name of the Ray cluster | ||
* ray_run_id_KFPv2: Ray cluster unique ID used only in KFP v2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is not in the pipeline parameters; it should be around line 91 |
||
* ray_head_options: head node options, containing the following: | ||
* cpu - number of cpus | ||
* memory - memory | ||
|
@@ -148,8 +154,18 @@ Now, when all components and input parameters are defined, we can implement pipe | |
component execution and parameters submitted to every component. | ||
|
||
```python | ||
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create | ||
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to | ||
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert | ||
# a unique string created at run creation time. | ||
if os.getenv("KFPv2", "0") == "1": | ||
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " | ||
"same version of the same pipeline !!!") | ||
run_id = ray_run_id_KFPv2 | ||
else: | ||
run_id = dsl.RUN_ID_PLACEHOLDER | ||
roytman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# create clean_up task | ||
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url, additional_params=additional_params) | ||
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) | ||
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) | ||
# pipeline definition | ||
with dsl.ExitHandler(clean_up_task): | ||
|
@@ -162,7 +178,7 @@ component execution and parameters submitted to every component. | |
# start Ray cluster | ||
ray_cluster = create_ray_op( | ||
ray_name=ray_name, | ||
run_id=dsl.RUN_ID_PLACEHOLDER, | ||
run_id=run_id, | ||
ray_head_options=ray_head_options, | ||
ray_worker_options=ray_worker_options, | ||
server_url=server_url, | ||
|
@@ -173,7 +189,7 @@ component execution and parameters submitted to every component. | |
# Execute job | ||
execute_job = execute_ray_jobs_op( | ||
ray_name=ray_name, | ||
run_id=dsl.RUN_ID_PLACEHOLDER, | ||
run_id=run_id, | ||
additional_params=additional_params, | ||
# note that the parameters below are specific for NOOP transform | ||
exec_params={ | ||
|
@@ -183,7 +199,7 @@ component execution and parameters submitted to every component. | |
"num_workers": compute_exec_params.output, | ||
"worker_options": actor_options, | ||
"pipeline_id": pipeline_id, | ||
"job_id": dsl.RUN_ID_PLACEHOLDER, | ||
"job_id": run_id, | ||
"code_location": code_location, | ||
"noop_sleep_sec": noop_sleep_sec, | ||
}, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,23 +73,11 @@ def compute_exec_params_func( | |
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use | ||
# this if/else statement and explicitly call the decorator. | ||
if os.getenv("KFPv2", "0") == "1": | ||
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create | ||
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to | ||
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at | ||
# compilation time. | ||
import uuid | ||
|
||
compute_exec_params_op = dsl.component_decorator.component( | ||
func=compute_exec_params_func, base_image=base_kfp_image | ||
) | ||
print( | ||
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " | ||
+ "same version of the same pipeline !!!" | ||
) | ||
run_id = uuid.uuid4().hex | ||
else: | ||
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) | ||
run_id = dsl.RUN_ID_PLACEHOLDER | ||
|
||
# create Ray cluster | ||
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") | ||
|
@@ -111,9 +99,11 @@ def {{ pipeline_name }}( | |
ray_name: str = "{{ pipeline_name }}-kfp-ray", # name of Ray cluster | ||
# Add image_pull_secret and image_pull_policy to ray workers if needed | ||
{%- if image_pull_secret != "" %} | ||
ray_run_id_KFPv2: str = "", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to define it twice? Here and at L 106? |
||
ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": "{{ image_pull_secret }}", "image": task_image}, | ||
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": "{{ image_pull_secret }}", "image": task_image}, | ||
{%- else %} | ||
ray_run_id_KFPv2: str = "", | ||
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, | ||
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, | ||
{%- endif %} | ||
|
@@ -142,6 +132,7 @@ def {{ pipeline_name }}( | |
""" | ||
Pipeline to execute {{ pipeline_name }} transform | ||
:param ray_name: name of the Ray cluster | ||
:param ray_run_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 | ||
:param ray_head_options: head node options, containing the following: | ||
cpu - number of cpus | ||
memory - memory | ||
|
@@ -177,6 +168,16 @@ def {{ pipeline_name }}( | |
{%- endfor %} | ||
:return: None | ||
""" | ||
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create | ||
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to | ||
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert | ||
# a unique string created at run creation time. | ||
if os.getenv("KFPv2", "0") == "1": | ||
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " | ||
"same version of the same pipeline !!!") | ||
run_id = ray_run_id_KFPv2 | ||
else: | ||
run_id = dsl.RUN_ID_PLACEHOLDER | ||
# create clean_up task | ||
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) | ||
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,23 +77,11 @@ def compute_exec_params_func( | |
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use | ||
# this if/else statement and explicitly call the decorator. | ||
if os.getenv("KFPv2", "0") == "1": | ||
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create | ||
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to | ||
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at | ||
# compilation time. | ||
import uuid | ||
|
||
compute_exec_params_op = dsl.component_decorator.component( | ||
func=compute_exec_params_func, base_image=base_kfp_image | ||
) | ||
print( | ||
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " | ||
+ "same version of the same pipeline !!!" | ||
) | ||
run_id = uuid.uuid4().hex | ||
else: | ||
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) | ||
run_id = dsl.RUN_ID_PLACEHOLDER | ||
|
||
|
||
# create Ray cluster | ||
|
@@ -113,6 +101,7 @@ def compute_exec_params_func( | |
) | ||
def code2parquet( | ||
ray_name: str = "code2parquet-kfp-ray", # name of Ray cluster | ||
ray_run_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we share it with both versions if we don't need it for KFPv1? |
||
# Add image_pull_secret and image_pull_policy to ray workers if needed | ||
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, | ||
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, | ||
|
@@ -139,6 +128,7 @@ def code2parquet( | |
""" | ||
Pipeline to execute NOOP transform | ||
:param ray_name: name of the Ray cluster | ||
:param ray_run_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe |
||
:param ray_head_options: head node options, containing the following: | ||
cpu - number of cpus | ||
memory - memory | ||
|
@@ -178,6 +168,16 @@ def code2parquet( | |
(here we are assuming that select language info is in S3, but potentially in the different bucket) | ||
:return: None | ||
""" | ||
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create | ||
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to | ||
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert | ||
# a unique string created at run creation time. | ||
if os.getenv("KFPv2", "0") == "1": | ||
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " | ||
"same version of the same pipeline !!!") | ||
run_id = ray_run_id_KFPv2 | ||
else: | ||
run_id = dsl.RUN_ID_PLACEHOLDER | ||
# create clean_up task | ||
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) | ||
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it the correct image name?