-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KFP] Obtain the Ray cluster run ID from the user for KFP v2. #956
Changes from 16 commits
79e7e08
070a9d8
31db4cc
5a8c36f
76620d4
84d48a0
3cec0ab
b420d1d
dc89ce6
c6af288
d7fa55d
1fdb7fa
5350481
6253bac
40c2889
c5117e5
5bba22c
a306cbf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,6 +62,7 @@ def super_pipeline( | |
p2_skip: bool = False, | ||
p2_noop_sleep_sec: int = 10, | ||
p2_ray_name: str = "noop-kfp-ray", | ||
p2_ray_run_id_KFPv2: str = "", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can it be p1_ parameter, so users don't have to add it for each step? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. Thanks |
||
p2_ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": "", "image": noop_image}, | ||
p2_ray_worker_options: dict = { | ||
"replicas": 2, | ||
|
@@ -75,6 +76,7 @@ def super_pipeline( | |
# Document ID step parameters | ||
p3_name: str = "doc_id", | ||
p3_ray_name: str = "docid-kfp-ray", | ||
p3_ray_run_id_KFPv2: str = "", | ||
p3_ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": "", "image": doc_id_image}, | ||
p3_ray_worker_options: dict = { | ||
"replicas": 2, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,7 +41,9 @@ Note: the project and the explanation below are based on [KFPv1](https://www.kub | |
import kfp.compiler as compiler | ||
import kfp.components as comp | ||
import kfp.dsl as dsl | ||
import os | ||
from kfp_support.workflow_support.runtime_utils import ( | ||
DEFAULT_KFP_COMPONENT_SPEC_PATH, | ||
ONE_HOUR_SEC, | ||
ONE_WEEK_SEC, | ||
ComponentUtils, | ||
|
@@ -56,18 +58,24 @@ Ray cluster. For each step we have to define a component that will execute them: | |
|
||
```python | ||
# components | ||
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.0.2" | ||
# compute execution parameters. Here different transforms might need different implementations. As | ||
# a result, instead of creating a component we are creating it in place here. | ||
compute_exec_params_op = comp.func_to_container_op( | ||
func=ComponentUtils.default_compute_execution_params, base_image=base_kfp_image | ||
) | ||
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest" | ||
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH) | ||
# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the | ||
# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path. | ||
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use | ||
# this if/else statement and explicitly call the decorator. | ||
if os.getenv("KFPv2", "0") == "1": | ||
compute_exec_params_op = dsl.component_decorator.component( | ||
func=compute_exec_params_func, base_image=base_kfp_image | ||
) | ||
else: | ||
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) | ||
# create Ray cluster | ||
create_ray_op = comp.load_component_from_file("../../../kfp_ray_components/createRayComponent.yaml") | ||
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") | ||
# execute job | ||
execute_ray_jobs_op = comp.load_component_from_file("../../../kfp_ray_components/executeRayJobComponent.yaml") | ||
execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "executeRayJobComponent.yaml") | ||
# clean up Ray | ||
cleanup_ray_op = comp.load_component_from_file("../../../kfp_ray_components/cleanupRayComponent.yaml") | ||
cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml") | ||
# Task name is part of the pipeline name, the ray cluster name and the job name in DMF. | ||
TASK_NAME: str = "noop" | ||
``` | ||
|
@@ -84,6 +92,7 @@ The input parameters section defines all the parameters required for the pipelin | |
```python | ||
# Ray cluster | ||
ray_name: str = "noop-kfp-ray", # name of Ray cluster | ||
ray_run_id_KFPv2: str = "", | ||
ray_head_options: str = '{"cpu": 1, "memory": 4, \ | ||
"image": "' + task_image + '" }', | ||
ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, \ | ||
|
@@ -94,6 +103,7 @@ The input parameters section defines all the parameters required for the pipelin | |
data_s3_access_secret: str = "s3-secret", | ||
data_max_files: int = -1, | ||
data_num_samples: int = -1, | ||
data_checkpointing: bool = False, | ||
# orchestrator | ||
actor_options: str = "{'num_cpus': 0.8}", | ||
pipeline_id: str = "pipeline_id", | ||
|
@@ -107,6 +117,7 @@ The input parameters section defines all the parameters required for the pipelin | |
The parameters used here are as follows: | ||
|
||
* ray_name: name of the Ray cluster | ||
* ray_run_id_KFPv2: Ray cluster unique ID used only in KFP v2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is not in the pipeline parameters; it should be around line 91 |
||
* ray_head_options: head node options, containing the following: | ||
* cpu - number of cpus | ||
* memory - memory | ||
|
@@ -148,21 +159,39 @@ Now, when all components and input parameters are defined, we can implement pipe | |
component execution and parameters submitted to every component. | ||
|
||
```python | ||
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create | ||
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to | ||
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert | ||
# a unique string created at run creation time. | ||
if os.getenv("KFPv2", "0") == "1": | ||
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " | ||
"same version of the same pipeline !!!") | ||
run_id = ray_run_id_KFPv2 | ||
else: | ||
run_id = dsl.RUN_ID_PLACEHOLDER | ||
roytman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# create clean_up task | ||
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url, additional_params=additional_params) | ||
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) | ||
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) | ||
# pipeline definition | ||
with dsl.ExitHandler(clean_up_task): | ||
# compute execution params | ||
compute_exec_params = compute_exec_params_op( | ||
worker_options=ray_worker_options, | ||
actor_options=actor_options, | ||
worker_options=ray_worker_options, | ||
actor_options=runtime_actor_options, | ||
data_s3_config=data_s3_config, | ||
data_max_files=data_max_files, | ||
data_num_samples=data_num_samples, | ||
data_checkpointing=data_checkpointing, | ||
runtime_pipeline_id=runtime_pipeline_id, | ||
runtime_job_id=run_id, | ||
runtime_code_location=runtime_code_location, | ||
noop_sleep_sec=noop_sleep_sec, | ||
) | ||
ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2) | ||
# start Ray cluster | ||
ray_cluster = create_ray_op( | ||
ray_name=ray_name, | ||
run_id=dsl.RUN_ID_PLACEHOLDER, | ||
run_id=run_id, | ||
ray_head_options=ray_head_options, | ||
ray_worker_options=ray_worker_options, | ||
server_url=server_url, | ||
|
@@ -173,7 +202,7 @@ component execution and parameters submitted to every component. | |
# Execute job | ||
execute_job = execute_ray_jobs_op( | ||
ray_name=ray_name, | ||
run_id=dsl.RUN_ID_PLACEHOLDER, | ||
run_id=run_id, | ||
additional_params=additional_params, | ||
# note that the parameters below are specific for NOOP transform | ||
exec_params={ | ||
|
@@ -183,7 +212,7 @@ component execution and parameters submitted to every component. | |
"num_workers": compute_exec_params.output, | ||
"worker_options": actor_options, | ||
"pipeline_id": pipeline_id, | ||
"job_id": dsl.RUN_ID_PLACEHOLDER, | ||
"job_id": run_id, | ||
"code_location": code_location, | ||
"noop_sleep_sec": noop_sleep_sec, | ||
}, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,23 +77,11 @@ def compute_exec_params_func( | |
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use | ||
# this if/else statement and explicitly call the decorator. | ||
if os.getenv("KFPv2", "0") == "1": | ||
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create | ||
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to | ||
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at | ||
# compilation time. | ||
import uuid | ||
|
||
compute_exec_params_op = dsl.component_decorator.component( | ||
func=compute_exec_params_func, base_image=base_kfp_image | ||
) | ||
print( | ||
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " | ||
+ "same version of the same pipeline !!!" | ||
) | ||
run_id = uuid.uuid4().hex | ||
else: | ||
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) | ||
run_id = dsl.RUN_ID_PLACEHOLDER | ||
|
||
|
||
# create Ray cluster | ||
|
@@ -113,6 +101,7 @@ def compute_exec_params_func( | |
) | ||
def code2parquet( | ||
ray_name: str = "code2parquet-kfp-ray", # name of Ray cluster | ||
ray_run_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we share it with both versions if we don't need it for KFPv1? |
||
# Add image_pull_secret and image_pull_policy to ray workers if needed | ||
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, | ||
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, | ||
|
@@ -139,6 +128,7 @@ def code2parquet( | |
""" | ||
Pipeline to execute NOOP transform | ||
:param ray_name: name of the Ray cluster | ||
:param ray_run_id_KFPv2: a unique string id used for the Ray cluster, applicable only in KFP v2. | ||
:param ray_head_options: head node options, containing the following: | ||
cpu - number of cpus | ||
memory - memory | ||
|
@@ -178,6 +168,16 @@ def code2parquet( | |
(here we are assuming that select language info is in S3, but potentially in the different bucket) | ||
:return: None | ||
""" | ||
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create | ||
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to | ||
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert | ||
# a unique string created at run creation time. | ||
if os.getenv("KFPv2", "0") == "1": | ||
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " | ||
"same version of the same pipeline !!!") | ||
run_id = ray_run_id_KFPv2 | ||
else: | ||
run_id = dsl.RUN_ID_PLACEHOLDER | ||
# create clean_up task | ||
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) | ||
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"and we cannot generate a unique string at run-time, see kubeflow/pipelines#10187"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks