Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KFP] Obtain the Ray cluster run ID from the user for KFP v2. #956

Merged
merged 18 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions kfp/doc/simple_transform_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,16 @@ Ray cluster. For each step we have to define a component that will execute them:
```python
# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.0.2"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it the correct image name?

# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
compute_exec_params_op = comp.func_to_container_op(
func=ComponentUtils.default_compute_execution_params, base_image=base_kfp_image
)
# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the
# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path.
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
# create Ray cluster
create_ray_op = comp.load_component_from_file("../../../kfp_ray_components/createRayComponent.yaml")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be component_spec_path + "createRayClusterComponent.yaml"
and below

# execute job
Expand Down Expand Up @@ -148,6 +153,16 @@ Now, when all components and input parameters are defined, we can implement pipe
component execution and parameters submitted to every component.

```python
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert
# a unique string created at compilation time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at run creation time

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks

if os.getenv("KFPv2", "0") == "1":
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
"same version of the same pipeline !!!")
run_id = ray_id_KFPv2
else:
run_id = dsl.RUN_ID_PLACEHOLDER
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ RUN pip install --no-cache-dir pydantic==2.6.3
# remove credentials-containing file
RUN rm requirements.txt
# components
COPY ./src /pipelines/component/src
COPY --chmod=775 --chown=ray:root ./src /pipelines/component/src

# Set environment
ENV KFP_v2=$KFP_v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,26 @@ def run_test(pipeline_package_path: str, endpoint: str = "http://localhost:8080/
logger.info(f"Pipeline {pipeline_name} successfully completed")
return pipeline_name

def _set_run_id(pipeline_package_path: str):
"""
Assign a dummy run ID value for testing purposes. By default, this value
is empty and is set by the user during runtime.

:param pipeline_package_path: Local path to the pipeline package.
"""
import yaml

try:
stream = open(pipeline_package_path, "r")
docs = list(yaml.load_all(stream, yaml.FullLoader))
for doc in docs:
if "root" in doc:
doc["root"]["inputDefinitions"]["parameters"]["ray_id_KFPv2"]["defaultValue"] = "123"
with open(pipeline_package_path, "w") as outfile:
yaml.dump_all(docs, outfile)
except Exception as e:
logger.error(f"Failed to update run id value, exception {e}")
sys.exit(1)

if __name__ == "__main__":
import argparse
Expand All @@ -74,6 +94,7 @@ def run_test(pipeline_package_path: str, endpoint: str = "http://localhost:8080/
if pipeline is None:
sys.exit(1)
case "sanity-test":
_set_run_id(args.pipeline_package_path)
run = run_test(
endpoint=args.endpoint,
pipeline_package_path=args.pipeline_package_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ def runtime_name(ray_name: str = "", run_id: str = "") -> str:
# the return value plus namespace name will be the name of the Ray Route,
# which length is restricted to 64 characters,
# therefore we restrict the return name by 15 character.
if run_id != "":
return f"{ray_name[:9]}-{run_id[:5]}"
return ray_name[:15]
if run_id == "":
logger.error("Run ID must not be provided")
sys.exit(1)
return f"{ray_name[:9]}-{run_id[:5]}"

@staticmethod
def dict_to_req(d: dict[str, Any], executor: str = "transformer_launcher.py") -> str:
Expand Down
25 changes: 13 additions & 12 deletions kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,11 @@ def compute_exec_params_func(
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER

# create Ray cluster
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml")
Expand All @@ -111,9 +99,11 @@ def {{ pipeline_name }}(
ray_name: str = "{{ pipeline_name }}-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
{%- if image_pull_secret != "" %}
ray_id_KFPv2: str = "",
ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": "{{ image_pull_secret }}", "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": "{{ image_pull_secret }}", "image": task_image},
{%- else %}
ray_id_KFPv2: str = "",
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
{%- endif %}
Expand Down Expand Up @@ -142,6 +132,7 @@ def {{ pipeline_name }}(
"""
Pipeline to execute {{ pipeline_name }} transform
:param ray_name: name of the Ray cluster
:param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
Expand Down Expand Up @@ -177,6 +168,16 @@ def {{ pipeline_name }}(
{%- endfor %}
:return: None
"""
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert
# a unique string created at compilation time.
if os.getenv("KFPv2", "0") == "1":
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
"same version of the same pipeline !!!")
run_id = ray_id_KFPv2
else:
run_id = dsl.RUN_ID_PLACEHOLDER
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
Expand Down
24 changes: 12 additions & 12 deletions transforms/code/code2parquet/kfp_ray/code2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,11 @@ def compute_exec_params_func(
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER


# create Ray cluster
Expand All @@ -113,6 +101,7 @@ def compute_exec_params_func(
)
def code2parquet(
ray_name: str = "code2parquet-kfp-ray", # name of Ray cluster
ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
Expand All @@ -139,6 +128,7 @@ def code2parquet(
"""
Pipeline to execute NOOP transform
:param ray_name: name of the Ray cluster
:param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
Expand Down Expand Up @@ -178,6 +168,16 @@ def code2parquet(
(here we are assuming that select language info is in S3, but potentially in the different bucket)
:return: None
"""
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert
# a unique string created at compilation time.
if os.getenv("KFPv2", "0") == "1":
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
"same version of the same pipeline !!!")
run_id = ray_id_KFPv2
else:
run_id = dsl.RUN_ID_PLACEHOLDER
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
Expand Down
4 changes: 2 additions & 2 deletions transforms/code/code2parquet/ray/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ COPY --chmod=775 --chown=ray:root pyproject.toml pyproject.toml
RUN pip install --no-cache-dir -e .

# copy the main() entry point to the image
COPY src/code2parquet_transform_ray.py .
COPY --chmod=775 --chown=ray:root src/code2parquet_transform_ray.py .

# copy some of the samples in
COPY src/code2parquet_local_ray.py local/
COPY --chmod=775 --chown=ray:root src/code2parquet_local_ray.py local/

# copy test
COPY test/ test/
Expand Down
24 changes: 12 additions & 12 deletions transforms/code/code_quality/kfp_ray/code_quality_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,11 @@ def compute_exec_params_func(
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER


# create Ray cluster
Expand All @@ -112,6 +100,7 @@ def compute_exec_params_func(
def code_quality(
# Ray cluster
ray_name: str = "code_quality-kfp-ray", # name of Ray cluster
ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
Expand All @@ -136,6 +125,7 @@ def code_quality(
"""
Pipeline to execute Code Quality transform
:param ray_name: name of the Ray cluster
:param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
Expand Down Expand Up @@ -171,6 +161,16 @@ def code_quality(
:param cq_hf_token - Huggingface auth token to download and use the tokenizer
:return: None
"""
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert
# a unique string created at compilation time.
if os.getenv("KFPv2", "0") == "1":
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
"same version of the same pipeline !!!")
run_id = ray_id_KFPv2
else:
run_id = dsl.RUN_ID_PLACEHOLDER
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
Expand Down
8 changes: 4 additions & 4 deletions transforms/code/code_quality/ray/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ COPY --chmod=775 --chown=ray:root pyproject.toml pyproject.toml
RUN pip install --no-cache-dir -e .

# copy the main() entry point to the image
COPY ./src/code_quality_transform_ray.py .
COPY --chmod=775 --chown=ray:root ./src/code_quality_transform_ray.py .

# copy some of the samples in
COPY ./src/code_quality_local_ray.py local/
COPY --chmod=775 --chown=ray:root ./src/code_quality_local_ray.py local/

# copy test
COPY test/ test/
COPY test-data/ test-data/
COPY --chmod=775 --chown=ray:root test/ test/
COPY --chmod=775 --chown=ray:root test-data/ test-data/

# Set environment
ENV PYTHONPATH /home/ray
Expand Down
24 changes: 12 additions & 12 deletions transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,11 @@ def compute_exec_params_func(
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER


# create Ray cluster
Expand All @@ -120,6 +108,7 @@ def compute_exec_params_func(
def header_cleanser(
# Ray cluster
ray_name: str = "header_cleanser-kfp-ray", # name of Ray cluster
ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
Expand Down Expand Up @@ -148,6 +137,7 @@ def header_cleanser(
"""
Pipeline to execute Header Cleanser transform
:param ray_name: name of the Ray cluster
:param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
Expand Down Expand Up @@ -187,6 +177,16 @@ def header_cleanser(
:param skip_timeout - Hold value true or false to skip removing copyright/header or not when scaning timeout.
:return: None
"""
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert
# a unique string created at compilation time.
if os.getenv("KFPv2", "0") == "1":
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
"same version of the same pipeline !!!")
run_id = ray_id_KFPv2
else:
run_id = dsl.RUN_ID_PLACEHOLDER
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
Expand Down
2 changes: 1 addition & 1 deletion transforms/code/header_cleanser/ray/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ User ray

# copy source data
COPY ./src/header_cleanser_transform_ray.py .
COPY src/header_cleanser_local_ray.py local/
COPY --chmod=775 --chown=ray:root src/header_cleanser_local_ray.py local/

# copy test
COPY test/ test/
Expand Down
Loading