Skip to content

Commit

Permalink
Migrate to pipeline v2
Browse files Browse the repository at this point in the history
  • Loading branch information
diegolovison committed Mar 25, 2024
1 parent 439e2b3 commit fa2be33
Show file tree
Hide file tree
Showing 25 changed files with 411 additions and 599 deletions.
164 changes: 12 additions & 152 deletions ods_ci/libs/DataSciencePipelinesAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,15 @@ class DataSciencePipelinesAPI:
def __init__(self):
self.route = ""
self.sa_token = None

@keyword
def wait_until_openshift_pipelines_operator_is_deployed(self):
"""
when creating at the first time, it can take like 1 minute to have the pods ready
"""
deployment_count = 0
count = 0
while deployment_count != 1 and count < 30:
deployments = []
response, _ = self.run_oc("oc get deployment -n openshift-operators openshift-pipelines-operator -o json")
try:
response = json.loads(response)
if (
response["metadata"]["name"] == "openshift-pipelines-operator"
and "readyReplicas" in response["status"]
and response["status"]["readyReplicas"] == 1
):
deployments.append(response)
except JSONDecodeError:
pass
deployment_count = len(deployments)
time.sleep(1)
count += 1
pipeline_run_crd_count = 0
count = 0
while pipeline_run_crd_count < 1 and count < 60:
# https://github.com/opendatahub-io/odh-dashboard/issues/1673
# It is possible to start the Pipeline Server without pipelineruns.tekton.dev CRD
pipeline_run_crd_count = self.count_pods("oc get crd pipelineruns.tekton.dev", 1)
time.sleep(1)
count += 1
assert pipeline_run_crd_count == 1
return self.count_running_pods(
"oc get pods -n openshift-operators -l name=openshift-pipelines-operator -o json",
"openshift-pipelines-operator",
"Running",
1,
)
self.sleep_time = 45

@keyword
def login_and_wait_dsp_route(
self,
user,
pwd,
project,
route_name="ds-pipeline-pipelines-definition",
route_name="ds-pipeline-dspa",
timeout=120,
):
print("Fetch token")
Expand Down Expand Up @@ -89,7 +51,7 @@ def login_and_wait_dsp_route(

assert self.route != "", "Route must not be empty"
print(f"Waiting for Data Science Pipeline route to be ready to avoid firing false alerts: {self.route}")
time.sleep(45)
time.sleep(self.sleep_time)
status = -1
count = 0
while status != 200 and count < timeout:
Expand All @@ -102,8 +64,8 @@ def login_and_wait_dsp_route(
# if you need to debug, try to print also the response
print(f"({count}): Data Science Pipeline HTTP Status: {status}")
if status != 200:
time.sleep(30)
count += 30
time.sleep(self.sleep_time)
count += self.sleep_time
return status

@keyword
Expand All @@ -121,112 +83,6 @@ def remove_pipeline_project(self, project):
time.sleep(1)
count += 1

@keyword
def create_pipeline(self, url_test_pipeline_run_yaml):
print("Creating a pipeline from data science pipelines stack")
test_pipeline_run_yaml, _ = self.do_get(url_test_pipeline_run_yaml)
filename = "test_pipeline_run_yaml.yaml"
with open(filename, "w", encoding="utf-8") as f:
f.write(test_pipeline_run_yaml)
with open(filename, "rb") as f:
response, _ = self.do_upload(
f"https://{self.route}/apis/v1beta1/pipelines/upload",
files={"uploadfile": f},
headers={"Authorization": f"Bearer {self.sa_token}"},
)
os.remove(filename)
pipeline_json = json.loads(response)
pipeline_id = pipeline_json["id"]
response, status = self.do_get(
f"https://{self.route}/apis/v1beta1/pipelines/{pipeline_id}",
headers={"Authorization": f"Bearer {self.sa_token}"},
)
assert status == 200
assert json.loads(response)["name"] == filename
return pipeline_id

@keyword
def create_run(self, pipeline_id):
print("Creating the run from uploaded pipeline")
response, status = self.do_post(
f"https://{self.route}/apis/v1beta1/runs",
headers={
"Authorization": f"Bearer {self.sa_token}",
"Content-Type": "application/json",
},
json={
"name": "test-pipeline-run",
"pipeline_spec": {"pipeline_id": f"{pipeline_id}"},
},
)
assert status == 200
run_json = json.loads(response)
run_id = run_json["run"]["id"]

response, status = self.do_get(
f"https://{self.route}/apis/v1beta1/runs/{run_id}",
headers={"Authorization": f"Bearer {self.sa_token}"},
)
assert status == 200

return run_id

@keyword
def check_run_status(self, run_id, timeout=160):
run_status = None
count = 0
run_finished_ok = False
while not run_finished_ok and count < timeout:
response, status = self.do_get(
f"https://{self.route}/apis/v1beta1/runs/{run_id}",
headers={"Authorization": f"Bearer {self.sa_token}"},
)
try:
run_json = json.loads(response)
if "run" in run_json and "status" in run_json["run"]:
run_status = run_json["run"]["status"]
except JSONDecodeError:
print(response, status)
print(f"Checking run status: {run_status}")
if run_status == "Failed":
break
# https://github.com/tektoncd/pipeline/blob/main/docs/pipelineruns.md#monitoring-execution-status
if run_status in ("Completed", "Succeeded"):
run_finished_ok = True
break
time.sleep(1)
count += 1
return run_finished_ok

@keyword
def delete_runs(self, run_id):
print("Deleting the runs")

response, status = self.do_delete(
f"https://{self.route}/apis/v1beta1/runs/{run_id}",
headers={"Authorization": f"Bearer {self.sa_token}"},
)
assert status == 200
response, status = self.do_get(
f"https://{self.route}/apis/v1beta1/runs/{run_id}",
headers={"Authorization": f"Bearer {self.sa_token}"},
)
assert status == 404

@keyword
def delete_pipeline(self, pipeline_id):
print("Deleting the pipeline")
response, status = self.do_delete(
f"https://{self.route}/apis/v1beta1/pipelines/{pipeline_id}",
headers={"Authorization": f"Bearer {self.sa_token}"},
)
assert status == 200
response, status = self.do_get(
f"https://{self.route}/apis/v1beta1/pipelines/{pipeline_id}",
headers={"Authorization": f"Bearer {self.sa_token}"},
)
assert status == 404

@keyword
def add_role_to_user(self, name, user, project):
output, error = self.run_oc(f"oc policy add-role-to-user {name} {user} -n {project} --role-namespace={project}")
Expand Down Expand Up @@ -309,8 +165,11 @@ def run_oc(self, command):
output, error = process.communicate()
return self.byte_to_str(output), error

def do_get(self, url, headers=None):
response = requests.get(url, headers=headers, verify=self.get_cert())
def do_get(self, url, headers=None, skip_ssl=False):
if skip_ssl:
response = requests.get(url, headers=headers, verify=False)

Check failure

Code scanning / SonarCloud

Server certificates should be verified during SSL/TLS connections High

Enable server certificate validation on this SSL/TLS connection. See more on SonarCloud
else:
response = requests.get(url, headers=headers, verify=self.get_cert())
return self.byte_to_str(response.content), response.status_code

def do_post(self, url, headers, json):
Expand All @@ -330,14 +189,15 @@ def byte_to_str(self, content):

def get_secret(self, project, name):
secret_json, _ = self.run_oc(f"oc get secret -n {project} {name} -o json")
assert len(secret_json) > 0
return json.loads(secret_json)

def get_cert(self):
cert_json = self.get_secret("openshift-ingress-operator", "router-ca")
cert = cert_json["data"]["tls.crt"]
decoded_cert = base64.b64decode(cert).decode("utf-8")

file_name = "/tmp/kft-cert"
file_name = "/tmp/kfp-cert"
cert_file = open(file_name, "w")
cert_file.write(decoded_cert)
cert_file.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import base64
import importlib
import json
import os
import sys

import time
from DataSciencePipelinesAPI import DataSciencePipelinesAPI
from robotlibcore import keyword
from urllib3.exceptions import MaxRetryError, SSLError


class DataSciencePipelinesKfpTekton:
class DataSciencePipelinesKfp:
base_image = (
"registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61"
)
Expand All @@ -19,43 +18,37 @@ def __init__(self):
self.client = None
self.api = None

def get_client(self, user, pwd, project, route_name):
def get_client(self, user, pwd, project, route_name='ds-pipeline-dspa'):
if self.client is None:
self.api = DataSciencePipelinesAPI()
self.api.login_and_wait_dsp_route(user, pwd, project, route_name)

# initialize global environment variables
# https://github.com/kubeflow/kfp-tekton/issues/1345
default_image = DataSciencePipelinesKfpTekton.base_image
default_image = DataSciencePipelinesKfp.base_image
os.environ["DEFAULT_STORAGE_CLASS"] = self.api.get_default_storage()
os.environ["TEKTON_BASH_STEP_IMAGE"] = default_image
os.environ["TEKTON_COPY_RESULTS_STEP_IMAGE"] = default_image
os.environ["CONDITION_IMAGE_NAME"] = default_image
# https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes
os.environ["DEFAULT_ACCESSMODES"] = "ReadWriteOnce"
import kfp_tekton
from kfp.client import Client

# the following fallback it is to simplify the test development
try:
# we assume it is a secured cluster
# ssl_ca_cert came from /path/to/python/lib/python3.x/site-packages/certifi/cacert.pem
# that certificate is "Mozilla's carefully curated collection of root certificates"
self.client = kfp_tekton.TektonClient(
host=f"https://{self.api.route}/", existing_token=self.api.sa_token
)
self.client = Client(host=f"https://{self.api.route}/", existing_token=self.api.sa_token)
except MaxRetryError as e:
# we assume it is a cluster with self-signed certs
if type(e.reason) == SSLError:
# try to retrieve the certificate
self.client = kfp_tekton.TektonClient(
host=f"https://{self.api.route}/",
existing_token=self.api.sa_token,
ssl_ca_cert=self.api.get_cert(),
)
self.client = Client(host=f"https://{self.api.route}/", existing_token=self.api.sa_token, ssl_ca_cert=self.api.get_cert())
return self.client, self.api

def get_bucket_name(self, api, project):
bucket_name, _ = api.run_oc(f"oc get dspa -n {project} pipelines-definition -o json")
bucket_name, _ = api.run_oc(f"oc get dspa -n {project} dspa -o json")
objectStorage = json.loads(bucket_name)["spec"]["objectStorage"]
if "minio" in objectStorage:
return objectStorage["minio"]["bucket"]
Expand All @@ -71,40 +64,87 @@ def import_souce_code(self, path):
return module

@keyword
def kfp_tekton_create_run_from_pipeline_func(
self, user, pwd, project, route_name, source_code, fn, current_path=None
def setup_client(self, user, pwd, project):
# force a new client
self.client = None
self.get_client(user, pwd, project)

@keyword
def import_run_pipeline(self, pipeline_url, pipeline_params):
print(f'pipeline_params({type(pipeline_params)}): {pipeline_params}')
print(f'downloading: {pipeline_url}')
test_pipeline_run_yaml, _ = self.api.do_get(pipeline_url, skip_ssl=True)
pipeline_file = "/tmp/test_pipeline_run_yaml.yaml"
with open(pipeline_file, "w", encoding="utf-8") as f:
f.write(test_pipeline_run_yaml)
print(f'{pipeline_url} content stored at {pipeline_file}')
print('create a run from pipeline')
response = self.client.create_run_from_pipeline_package(
pipeline_file=pipeline_file,
arguments=pipeline_params
)
print(response)
return response.run_id

@keyword
def check_run_status(self, run_id, timeout=160):
count = 0
while count < timeout:
response = self.client.get_run(run_id)
run_status = response.state
print(f"Checking run status: {run_status}")
if run_status == "FAILED":
break
if run_status == "SUCCEEDED":
break
time.sleep(1)
count += 1
return run_status

@keyword
def delete_run(self, run_id):
response = self.client.delete_run(run_id)
# means success
assert len(response) == 0

@keyword
def create_run_from_pipeline_func(
self, user, pwd, project, source_code, fn, pipeline_params={}, current_path=None, route_name='ds-pipeline-dspa'
):
print(f'pipeline_params: {pipeline_params}')
client, api = self.get_client(user, pwd, project, route_name)
mlpipeline_minio_artifact_secret = api.get_secret(project, "mlpipeline-minio-artifact")
mlpipeline_minio_artifact_secret = api.get_secret(project, "ds-pipeline-s3-dspa")
bucket_name = self.get_bucket_name(api, project)
# the current path is from where you are running the script
# sh ods_ci/run_robot_test.sh
# the current_path will be ods-ci
if current_path is None:
current_path = os.getcwd()
my_source = self.import_souce_code(
f"{current_path}/ods_ci/tests/Resources/Files/pipeline-samples/{source_code}"
f"{current_path}/ods_ci/tests/Resources/Files/pipeline-samples/v2/{source_code}"
)
pipeline = getattr(my_source, fn)

# pipeline_params
# there are some special keys to retrieve argument values dynamically
# in pipeline v2, we must match the parameters names
if 'mlpipeline_minio_artifact_secret' in pipeline_params:
pipeline_params['mlpipeline_minio_artifact_secret'] = str(mlpipeline_minio_artifact_secret["data"])
if 'bucket_name' in pipeline_params:
pipeline_params['bucket_name'] = bucket_name
if 'openshift_server' in pipeline_params:
pipeline_params['openshift_server'] = self.api.get_openshift_server()
if 'openshift_token' in pipeline_params:
pipeline_params['openshift_token'] = self.api.get_openshift_token()
print(f'pipeline_params modified with dynamic values: {pipeline_params}')

# create_run_from_pipeline_func will compile the code
# if you need to see the yaml, for debugging purpose, call: TektonCompiler().compile(pipeline, f'{fn}.yaml')
result = client.create_run_from_pipeline_func(
pipeline_func=pipeline,
arguments={
"mlpipeline_minio_artifact_secret": mlpipeline_minio_artifact_secret["data"],
"bucket_name": bucket_name,
"openshift_server": self.api.get_openshift_server(),
"openshift_token": self.api.get_openshift_token(),
},
arguments=pipeline_params
)
# easy to debug and double check failures
print(result)
return result
return result.run_id

# we are calling DataSciencePipelinesAPI because of https://github.com/kubeflow/kfp-tekton/issues/1223
# Waiting for a backport https://github.com/kubeflow/kfp-tekton/pull/1234
@keyword
def kfp_tekton_wait_for_run_completion(self, user, pwd, project, route_name, run_result, timeout=160):
_, api = self.get_client(user, pwd, project, route_name)
return api.check_run_status(run_result.run_id, timeout=timeout)
Loading

0 comments on commit fa2be33

Please sign in to comment.