Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate pipelines tests to DSPv2 (pipelines-api, pipelines-kfp) #1312

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 12 additions & 152 deletions ods_ci/libs/DataSciencePipelinesAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,15 @@
def __init__(self):
self.route = ""
self.sa_token = None

@keyword
def wait_until_openshift_pipelines_operator_is_deployed(self):
"""
when creating at the first time, it can take like 1 minute to have the pods ready
"""
deployment_count = 0
count = 0
while deployment_count != 1 and count < 30:
deployments = []
response, _ = self.run_oc("oc get deployment -n openshift-operators openshift-pipelines-operator -o json")
try:
response = json.loads(response)
if (
response["metadata"]["name"] == "openshift-pipelines-operator"
and "readyReplicas" in response["status"]
and response["status"]["readyReplicas"] == 1
):
deployments.append(response)
except JSONDecodeError:
pass
deployment_count = len(deployments)
time.sleep(1)
count += 1
pipeline_run_crd_count = 0
count = 0
while pipeline_run_crd_count < 1 and count < 60:
# https://github.com/opendatahub-io/odh-dashboard/issues/1673
# It is possible to start the Pipeline Server without pipelineruns.tekton.dev CRD
pipeline_run_crd_count = self.count_pods("oc get crd pipelineruns.tekton.dev", 1)
time.sleep(1)
count += 1
assert pipeline_run_crd_count == 1
return self.count_running_pods(
"oc get pods -n openshift-operators -l name=openshift-pipelines-operator -o json",
"openshift-pipelines-operator",
"Running",
1,
)
self.sleep_time = 45

@keyword
def login_and_wait_dsp_route(
self,
user,
pwd,
project,
route_name="ds-pipeline-pipelines-definition",
route_name="ds-pipeline-dspa",
timeout=120,
):
print("Fetch token")
Expand Down Expand Up @@ -89,7 +51,7 @@

assert self.route != "", "Route must not be empty"
print(f"Waiting for Data Science Pipeline route to be ready to avoid firing false alerts: {self.route}")
time.sleep(45)
time.sleep(self.sleep_time)
status = -1
count = 0
while status != 200 and count < timeout:
Expand All @@ -102,8 +64,8 @@
# if you need to debug, try to print also the response
print(f"({count}): Data Science Pipeline HTTP Status: {status}")
if status != 200:
time.sleep(30)
count += 30
time.sleep(self.sleep_time)
count += self.sleep_time
return status

@keyword
Expand All @@ -121,112 +83,6 @@
time.sleep(1)
count += 1

@keyword
def create_pipeline(self, url_test_pipeline_run_yaml):
print("Creating a pipeline from data science pipelines stack")
test_pipeline_run_yaml, _ = self.do_get(url_test_pipeline_run_yaml)
filename = "test_pipeline_run_yaml.yaml"
with open(filename, "w", encoding="utf-8") as f:
f.write(test_pipeline_run_yaml)
with open(filename, "rb") as f:
response, _ = self.do_upload(
f"https://{self.route}/apis/v1beta1/pipelines/upload",
files={"uploadfile": f},
headers={"Authorization": f"Bearer {self.sa_token}"},
)
os.remove(filename)
pipeline_json = json.loads(response)
pipeline_id = pipeline_json["id"]
response, status = self.do_get(
f"https://{self.route}/apis/v1beta1/pipelines/{pipeline_id}",
headers={"Authorization": f"Bearer {self.sa_token}"},
)
assert status == 200
assert json.loads(response)["name"] == filename
return pipeline_id

@keyword
def create_run(self, pipeline_id):
print("Creating the run from uploaded pipeline")
response, status = self.do_post(
f"https://{self.route}/apis/v1beta1/runs",
headers={
"Authorization": f"Bearer {self.sa_token}",
"Content-Type": "application/json",
},
json={
"name": "test-pipeline-run",
"pipeline_spec": {"pipeline_id": f"{pipeline_id}"},
},
)
assert status == 200
run_json = json.loads(response)
run_id = run_json["run"]["id"]

response, status = self.do_get(
f"https://{self.route}/apis/v1beta1/runs/{run_id}",
headers={"Authorization": f"Bearer {self.sa_token}"},
)
assert status == 200

return run_id

@keyword
def check_run_status(self, run_id, timeout=160):
run_status = None
count = 0
run_finished_ok = False
while not run_finished_ok and count < timeout:
response, status = self.do_get(
f"https://{self.route}/apis/v1beta1/runs/{run_id}",
headers={"Authorization": f"Bearer {self.sa_token}"},
)
try:
run_json = json.loads(response)
if "run" in run_json and "status" in run_json["run"]:
run_status = run_json["run"]["status"]
except JSONDecodeError:
print(response, status)
print(f"Checking run status: {run_status}")
if run_status == "Failed":
break
# https://github.com/tektoncd/pipeline/blob/main/docs/pipelineruns.md#monitoring-execution-status
if run_status in ("Completed", "Succeeded"):
run_finished_ok = True
break
time.sleep(1)
count += 1
return run_finished_ok

@keyword
def delete_runs(self, run_id):
print("Deleting the runs")

response, status = self.do_delete(
f"https://{self.route}/apis/v1beta1/runs/{run_id}",
headers={"Authorization": f"Bearer {self.sa_token}"},
)
assert status == 200
response, status = self.do_get(
f"https://{self.route}/apis/v1beta1/runs/{run_id}",
headers={"Authorization": f"Bearer {self.sa_token}"},
)
assert status == 404

@keyword
def delete_pipeline(self, pipeline_id):
print("Deleting the pipeline")
response, status = self.do_delete(
f"https://{self.route}/apis/v1beta1/pipelines/{pipeline_id}",
headers={"Authorization": f"Bearer {self.sa_token}"},
)
assert status == 200
response, status = self.do_get(
f"https://{self.route}/apis/v1beta1/pipelines/{pipeline_id}",
headers={"Authorization": f"Bearer {self.sa_token}"},
)
assert status == 404

@keyword
def add_role_to_user(self, name, user, project):
output, error = self.run_oc(f"oc policy add-role-to-user {name} {user} -n {project} --role-namespace={project}")
Expand Down Expand Up @@ -309,8 +165,11 @@
output, error = process.communicate()
return self.byte_to_str(output), error

def do_get(self, url, headers=None):
response = requests.get(url, headers=headers, verify=self.get_cert())
def do_get(self, url, headers=None, skip_ssl=False):
if skip_ssl:
response = requests.get(url, headers=headers, verify=False)

Check failure

Code scanning / SonarCloud

Server certificates should be verified during SSL/TLS connections High

Enable server certificate validation on this SSL/TLS connection. See more on SonarCloud
else:
response = requests.get(url, headers=headers, verify=self.get_cert())
return self.byte_to_str(response.content), response.status_code

def do_post(self, url, headers, json):
Expand All @@ -330,14 +189,15 @@

def get_secret(self, project, name):
secret_json, _ = self.run_oc(f"oc get secret -n {project} {name} -o json")
assert len(secret_json) > 0
return json.loads(secret_json)

def get_cert(self):
cert_json = self.get_secret("openshift-ingress-operator", "router-ca")
cert = cert_json["data"]["tls.crt"]
decoded_cert = base64.b64decode(cert).decode("utf-8")

file_name = "/tmp/kft-cert"
file_name = "/tmp/kfp-cert"
cert_file = open(file_name, "w")
cert_file.write(decoded_cert)
cert_file.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import base64
import importlib
import json
import os
import sys

import time
from DataSciencePipelinesAPI import DataSciencePipelinesAPI
from robotlibcore import keyword
from urllib3.exceptions import MaxRetryError, SSLError


class DataSciencePipelinesKfpTekton:
class DataSciencePipelinesKfp:
base_image = (
"registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61"
)
Expand All @@ -19,43 +18,37 @@ def __init__(self):
self.client = None
self.api = None

def get_client(self, user, pwd, project, route_name):
def get_client(self, user, pwd, project, route_name='ds-pipeline-dspa'):
if self.client is None:
self.api = DataSciencePipelinesAPI()
self.api.login_and_wait_dsp_route(user, pwd, project, route_name)

# initialize global environment variables
# https://github.com/kubeflow/kfp-tekton/issues/1345
default_image = DataSciencePipelinesKfpTekton.base_image
default_image = DataSciencePipelinesKfp.base_image
os.environ["DEFAULT_STORAGE_CLASS"] = self.api.get_default_storage()
os.environ["TEKTON_BASH_STEP_IMAGE"] = default_image
os.environ["TEKTON_COPY_RESULTS_STEP_IMAGE"] = default_image
os.environ["CONDITION_IMAGE_NAME"] = default_image
# https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes
os.environ["DEFAULT_ACCESSMODES"] = "ReadWriteOnce"
import kfp_tekton
from kfp.client import Client

# the following fallback it is to simplify the test development
try:
# we assume it is a secured cluster
# ssl_ca_cert came from /path/to/python/lib/python3.x/site-packages/certifi/cacert.pem
# that certificate is "Mozilla's carefully curated collection of root certificates"
self.client = kfp_tekton.TektonClient(
host=f"https://{self.api.route}/", existing_token=self.api.sa_token
)
self.client = Client(host=f"https://{self.api.route}/", existing_token=self.api.sa_token)
except MaxRetryError as e:
# we assume it is a cluster with self-signed certs
if type(e.reason) == SSLError:
# try to retrieve the certificate
self.client = kfp_tekton.TektonClient(
host=f"https://{self.api.route}/",
existing_token=self.api.sa_token,
ssl_ca_cert=self.api.get_cert(),
)
self.client = Client(host=f"https://{self.api.route}/", existing_token=self.api.sa_token, ssl_ca_cert=self.api.get_cert())
return self.client, self.api

def get_bucket_name(self, api, project):
bucket_name, _ = api.run_oc(f"oc get dspa -n {project} pipelines-definition -o json")
bucket_name, _ = api.run_oc(f"oc get dspa -n {project} dspa -o json")
objectStorage = json.loads(bucket_name)["spec"]["objectStorage"]
if "minio" in objectStorage:
return objectStorage["minio"]["bucket"]
Expand All @@ -71,40 +64,87 @@ def import_souce_code(self, path):
return module

@keyword
def kfp_tekton_create_run_from_pipeline_func(
self, user, pwd, project, route_name, source_code, fn, current_path=None
def setup_client(self, user, pwd, project):
# force a new client
self.client = None
self.get_client(user, pwd, project)

@keyword
def import_run_pipeline(self, pipeline_url, pipeline_params):
print(f'pipeline_params({type(pipeline_params)}): {pipeline_params}')
print(f'downloading: {pipeline_url}')
test_pipeline_run_yaml, _ = self.api.do_get(pipeline_url, skip_ssl=True)
pipeline_file = "/tmp/test_pipeline_run_yaml.yaml"
with open(pipeline_file, "w", encoding="utf-8") as f:
f.write(test_pipeline_run_yaml)
print(f'{pipeline_url} content stored at {pipeline_file}')
print('create a run from pipeline')
response = self.client.create_run_from_pipeline_package(
pipeline_file=pipeline_file,
arguments=pipeline_params
)
print(response)
return response.run_id

@keyword
def check_run_status(self, run_id, timeout=160):
count = 0
while count < timeout:
response = self.client.get_run(run_id)
run_status = response.state
print(f"Checking run status: {run_status}")
if run_status == "FAILED":
break
if run_status == "SUCCEEDED":
break
time.sleep(1)
count += 1
return run_status

@keyword
def delete_run(self, run_id):
response = self.client.delete_run(run_id)
# means success
assert len(response) == 0

@keyword
def create_run_from_pipeline_func(
self, user, pwd, project, source_code, fn, pipeline_params={}, current_path=None, route_name='ds-pipeline-dspa'
):
print(f'pipeline_params: {pipeline_params}')
client, api = self.get_client(user, pwd, project, route_name)
mlpipeline_minio_artifact_secret = api.get_secret(project, "mlpipeline-minio-artifact")
mlpipeline_minio_artifact_secret = api.get_secret(project, "ds-pipeline-s3-dspa")
bucket_name = self.get_bucket_name(api, project)
# the current path is from where you are running the script
# sh ods_ci/run_robot_test.sh
# the current_path will be ods-ci
if current_path is None:
current_path = os.getcwd()
my_source = self.import_souce_code(
f"{current_path}/ods_ci/tests/Resources/Files/pipeline-samples/{source_code}"
f"{current_path}/ods_ci/tests/Resources/Files/pipeline-samples/v2/{source_code}"
)
pipeline = getattr(my_source, fn)

# pipeline_params
# there are some special keys to retrieve argument values dynamically
# in pipeline v2, we must match the parameters names
if 'mlpipeline_minio_artifact_secret' in pipeline_params:
pipeline_params['mlpipeline_minio_artifact_secret'] = str(mlpipeline_minio_artifact_secret["data"])
if 'bucket_name' in pipeline_params:
pipeline_params['bucket_name'] = bucket_name
if 'openshift_server' in pipeline_params:
pipeline_params['openshift_server'] = self.api.get_openshift_server()
if 'openshift_token' in pipeline_params:
pipeline_params['openshift_token'] = self.api.get_openshift_token()
print(f'pipeline_params modified with dynamic values: {pipeline_params}')

# create_run_from_pipeline_func will compile the code
# if you need to see the yaml, for debugging purpose, call: TektonCompiler().compile(pipeline, f'{fn}.yaml')
result = client.create_run_from_pipeline_func(
pipeline_func=pipeline,
arguments={
"mlpipeline_minio_artifact_secret": mlpipeline_minio_artifact_secret["data"],
"bucket_name": bucket_name,
"openshift_server": self.api.get_openshift_server(),
"openshift_token": self.api.get_openshift_token(),
},
arguments=pipeline_params
)
# easy to debug and double check failures
print(result)
return result
return result.run_id

# we are calling DataSciencePipelinesAPI because of https://github.com/kubeflow/kfp-tekton/issues/1223
# Waiting for a backport https://github.com/kubeflow/kfp-tekton/pull/1234
@keyword
def kfp_tekton_wait_for_run_completion(self, user, pwd, project, route_name, run_result, timeout=160):
_, api = self.get_client(user, pwd, project, route_name)
return api.check_run_status(run_result.run_id, timeout=timeout)
Loading
Loading